diff options
Diffstat (limited to 'src/mongo/util/concurrency')
22 files changed, 1546 insertions, 1415 deletions
diff --git a/src/mongo/util/concurrency/mapsf.h b/src/mongo/util/concurrency/mapsf.h index 6aa6617e229..294bd55c0b7 100644 --- a/src/mongo/util/concurrency/mapsf.h +++ b/src/mongo/util/concurrency/mapsf.h @@ -32,63 +32,65 @@ namespace mongo { - /** Thread safe map. - Be careful not to use this too much or it could make things slow; - if not a hot code path no problem. - Examples: +/** Thread safe map. + Be careful not to use this too much or it could make things slow; + if not a hot code path no problem. + Examples: - mapsf< std::map<int,int>, int, int > mp; + mapsf< std::map<int,int>, int, int > mp; - int x = mp.get(); + int x = mp.get(); - std::map< std::map<int,int>, int, int > two; - mp.swap(two); + std::map< std::map<int,int>, int, int > two; + mp.swap(two); - { - mapsf< std::map<int,int>, int, int >::ref r(mp); - r[9] = 1; - std::map<int,int>::iterator i = r.r.begin(); - } - */ - template< class M > - struct mapsf { - MONGO_DISALLOW_COPYING(mapsf); + { + mapsf< std::map<int,int>, int, int >::ref r(mp); + r[9] = 1; + std::map<int,int>::iterator i = r.r.begin(); + } +*/ +template <class M> +struct mapsf { + MONGO_DISALLOW_COPYING(mapsf); - SimpleMutex m; - M val; - friend struct ref; - public: + SimpleMutex m; + M val; + friend struct ref; - typedef typename M::const_iterator const_iterator; - typedef typename M::key_type key_type; - typedef typename M::mapped_type mapped_type; +public: + typedef typename M::const_iterator const_iterator; + typedef typename M::key_type key_type; + typedef typename M::mapped_type mapped_type; - mapsf() : m("mapsf") { } - void swap(M& rhs) { - stdx::lock_guard<SimpleMutex> lk(m); - val.swap(rhs); - } - bool empty() { - stdx::lock_guard<SimpleMutex> lk(m); - return val.empty(); - } - // safe as we pass by value: - mapped_type get(key_type k) { - stdx::lock_guard<SimpleMutex> lk(m); - const_iterator i = val.find(k); - if( i == val.end() ) - return mapped_type(); - return i->second; + mapsf() : m("mapsf") {} + void swap(M& rhs) { + stdx::lock_guard<SimpleMutex> lk(m); + val.swap(rhs); + } + bool empty() { + stdx::lock_guard<SimpleMutex> lk(m); + return val.empty(); + } + // safe as we pass by value: + mapped_type get(key_type k) { + stdx::lock_guard<SimpleMutex> lk(m); + const_iterator i = val.find(k); + if (i == val.end()) + return mapped_type(); + return i->second; + } + // think about deadlocks when using ref. the other methods + // above will always be safe as they are "leaf" operations. + struct ref { + stdx::lock_guard<SimpleMutex> lk; + + public: + M& r; + ref(mapsf& m) : lk(m.m), r(m.val) {} + mapped_type& operator[](const key_type& k) { + return r[k]; } - // think about deadlocks when using ref. the other methods - // above will always be safe as they are "leaf" operations. - struct ref { - stdx::lock_guard<SimpleMutex> lk; - public: - M &r; - ref(mapsf &m) : lk(m.m), r(m.val) { } - mapped_type& operator[](const key_type& k) { return r[k]; } - }; }; - +}; } diff --git a/src/mongo/util/concurrency/mutex.h b/src/mongo/util/concurrency/mutex.h index 1e21f81f392..65b0ae6b73a 100644 --- a/src/mongo/util/concurrency/mutex.h +++ b/src/mongo/util/concurrency/mutex.h @@ -38,64 +38,66 @@ namespace mongo { - /** The concept with SimpleMutex is that it is a basic lock/unlock - * with no special functionality (such as try and try - * timeout). Thus it can be implemented using OS-specific - * facilities in all environments (if desired). On Windows, - * the implementation below is faster than boost mutex. - */ +/** The concept with SimpleMutex is that it is a basic lock/unlock + * with no special functionality (such as try and try + * timeout). Thus it can be implemented using OS-specific + * facilities in all environments (if desired). On Windows, + * the implementation below is faster than boost mutex. +*/ #if defined(_WIN32) - class SimpleMutex { - MONGO_DISALLOW_COPYING(SimpleMutex); - public: - SimpleMutex() { - InitializeCriticalSection( &_cs ); - } +class SimpleMutex { + MONGO_DISALLOW_COPYING(SimpleMutex); - ~SimpleMutex() { - if ( ! StaticObserver::_destroyingStatics ) { - DeleteCriticalSection(&_cs); - } - } +public: + SimpleMutex() { + InitializeCriticalSection(&_cs); + } - void lock() { - EnterCriticalSection( &_cs ); - } - void unlock() { - LeaveCriticalSection( &_cs ); + ~SimpleMutex() { + if (!StaticObserver::_destroyingStatics) { + DeleteCriticalSection(&_cs); } + } + + void lock() { + EnterCriticalSection(&_cs); + } + void unlock() { + LeaveCriticalSection(&_cs); + } - private: - CRITICAL_SECTION _cs; - }; +private: + CRITICAL_SECTION _cs; +}; #else - class SimpleMutex { - MONGO_DISALLOW_COPYING(SimpleMutex); - public: - SimpleMutex() { - verify( pthread_mutex_init(&_lock,0) == 0 ); - } +class SimpleMutex { + MONGO_DISALLOW_COPYING(SimpleMutex); - ~SimpleMutex() { - if ( ! StaticObserver::_destroyingStatics ) { - verify( pthread_mutex_destroy(&_lock) == 0 ); - } - } +public: + SimpleMutex() { + verify(pthread_mutex_init(&_lock, 0) == 0); + } - void lock() { - verify( pthread_mutex_lock(&_lock) == 0 ); + ~SimpleMutex() { + if (!StaticObserver::_destroyingStatics) { + verify(pthread_mutex_destroy(&_lock) == 0); } + } - void unlock() { - verify( pthread_mutex_unlock(&_lock) == 0 ); - } + void lock() { + verify(pthread_mutex_lock(&_lock) == 0); + } + + void unlock() { + verify(pthread_mutex_unlock(&_lock) == 0); + } - private: - pthread_mutex_t _lock; - }; +private: + pthread_mutex_t _lock; +}; #endif -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/util/concurrency/mvar.h b/src/mongo/util/concurrency/mvar.h index aa703b3c704..7f592c1f432 100644 --- a/src/mongo/util/concurrency/mvar.h +++ b/src/mongo/util/concurrency/mvar.h @@ -34,100 +34,100 @@ namespace mongo { - /* This is based on haskell's MVar synchronization primitive: - * http://www.haskell.org/ghc/docs/latest/html/libraries/base/Control-Concurrent-MVar.html - * - * It is a thread-safe queue that can hold at most one object. - * You can also think of it as a box that can be either full or empty. - */ - - template <typename T> - class MVar { - public: - enum State {EMPTY=0, FULL}; - - // create an empty MVar - MVar() - : _state(EMPTY) - {} - - // creates a full MVar - MVar(const T& val) - : _state(FULL) - , _value(val) - {} - - // puts val into the MVar and returns true or returns false if full - // never blocks - bool tryPut(const T& val) { - // intentionally repeat test before and after lock - if (_state == FULL) return false; - Mutex::scoped_lock lock(_mutex); - if (_state == FULL) return false; - - _state = FULL; - _value = val; - - // unblock threads waiting to 'take' - _condition.notify_all(); - - return true; - } - - // puts val into the MVar - // will block if the MVar is already full - void put(const T& val) { - Mutex::scoped_lock lock(_mutex); - while (!tryPut(val)) { - // unlocks lock while waiting and relocks before returning - _condition.wait(lock); - } - } - - // takes val out of the MVar and returns true or returns false if empty - // never blocks - bool tryTake(T& out) { - // intentionally repeat test before and after lock - if (_state == EMPTY) return false; - Mutex::scoped_lock lock(_mutex); - if (_state == EMPTY) return false; - - _state = EMPTY; - out = _value; - - // unblock threads waiting to 'put' - _condition.notify_all(); +/* This is based on haskell's MVar synchronization primitive: + * http://www.haskell.org/ghc/docs/latest/html/libraries/base/Control-Concurrent-MVar.html + * + * It is a thread-safe queue that can hold at most one object. + * You can also think of it as a box that can be either full or empty. + */ - return true; +template <typename T> +class MVar { +public: + enum State { EMPTY = 0, FULL }; + + // create an empty MVar + MVar() : _state(EMPTY) {} + + // creates a full MVar + MVar(const T& val) : _state(FULL), _value(val) {} + + // puts val into the MVar and returns true or returns false if full + // never blocks + bool tryPut(const T& val) { + // intentionally repeat test before and after lock + if (_state == FULL) + return false; + Mutex::scoped_lock lock(_mutex); + if (_state == FULL) + return false; + + _state = FULL; + _value = val; + + // unblock threads waiting to 'take' + _condition.notify_all(); + + return true; + } + + // puts val into the MVar + // will block if the MVar is already full + void put(const T& val) { + Mutex::scoped_lock lock(_mutex); + while (!tryPut(val)) { + // unlocks lock while waiting and relocks before returning + _condition.wait(lock); } - - // takes val out of the MVar - // will block if the MVar is empty - T take() { - T ret = T(); - - Mutex::scoped_lock lock(_mutex); - while (!tryTake(ret)) { - // unlocks lock while waiting and relocks before returning - _condition.wait(lock); - } - - return ret; + } + + // takes val out of the MVar and returns true or returns false if empty + // never blocks + bool tryTake(T& out) { + // intentionally repeat test before and after lock + if (_state == EMPTY) + return false; + Mutex::scoped_lock lock(_mutex); + if (_state == EMPTY) + return false; + + _state = EMPTY; + out = _value; + + // unblock threads waiting to 'put' + _condition.notify_all(); + + return true; + } + + // takes val out of the MVar + // will block if the MVar is empty + T take() { + T ret = T(); + + Mutex::scoped_lock lock(_mutex); + while (!tryTake(ret)) { + // unlocks lock while waiting and relocks before returning + _condition.wait(lock); } + return ret; + } - // Note: this is fast because there is no locking, but state could - // change before you get a chance to act on it. - // Mainly useful for sanity checks / asserts. - State getState() { return _state; } + // Note: this is fast because there is no locking, but state could + // change before you get a chance to act on it. + // Mainly useful for sanity checks / asserts. + State getState() { + return _state; + } - private: - State _state; - T _value; - typedef boost::recursive_mutex Mutex; - Mutex _mutex; - boost::condition _condition; - }; +private: + State _state; + T _value; + typedef boost::recursive_mutex Mutex; + Mutex _mutex; + boost::condition _condition; +}; } diff --git a/src/mongo/util/concurrency/old_thread_pool.cpp b/src/mongo/util/concurrency/old_thread_pool.cpp index e25855770eb..55bd763e542 100644 --- a/src/mongo/util/concurrency/old_thread_pool.cpp +++ b/src/mongo/util/concurrency/old_thread_pool.cpp @@ -43,10 +43,7 @@ namespace mongo { class OldThreadPool::Worker { public: explicit Worker(OldThreadPool& owner, const std::string& threadName) - : _owner(owner) - , _is_done(true) - , _thread(stdx::bind(&Worker::loop, this, threadName)) - {} + : _owner(owner), _is_done(true), _thread(stdx::bind(&Worker::loop, this, threadName)) {} // destructor will block until current operation is completed // Acts as a "join" on this thread @@ -66,7 +63,7 @@ public: private: OldThreadPool& _owner; MVar<Task> _task; - bool _is_done; // only used for error detection + bool _is_done; // only used for error detection stdx::thread _thread; void loop(const std::string& threadName) { @@ -74,18 +71,15 @@ private: while (true) { Task task = _task.take(); if (!task) - break; // ends the thread + break; // ends the thread try { task(); - } - catch (DBException& e) { + } catch (DBException& e) { log() << "Unhandled DBException: " << e.toString(); - } - catch (std::exception& e) { + } catch (std::exception& e) { log() << "Unhandled std::exception in worker thread: " << e.what(); - } - catch (...) { + } catch (...) { log() << "Unhandled non-exception in worker thread"; } _is_done = true; @@ -102,22 +96,17 @@ OldThreadPool::OldThreadPool(int nThreads, const std::string& threadNamePrefix) OldThreadPool::OldThreadPool(const DoNotStartThreadsTag&, int nThreads, const std::string& threadNamePrefix) - : _tasksRemaining(0) - , _nThreads(nThreads) - , _threadNamePrefix(threadNamePrefix) { -} + : _tasksRemaining(0), _nThreads(nThreads), _threadNamePrefix(threadNamePrefix) {} void OldThreadPool::startThreads() { stdx::lock_guard<stdx::mutex> lock(_mutex); for (int i = 0; i < _nThreads; ++i) { - const std::string threadName(_threadNamePrefix.empty() ? - _threadNamePrefix : - str::stream() << _threadNamePrefix << i); + const std::string threadName(_threadNamePrefix.empty() ? _threadNamePrefix : str::stream() + << _threadNamePrefix << i); Worker* worker = new Worker(*this, threadName); if (_tasks.empty()) { _freeWorkers.push_front(worker); - } - else { + } else { worker->set_task(_tasks.front()); _tasks.pop_front(); } @@ -129,7 +118,7 @@ OldThreadPool::~OldThreadPool() { invariant(_tasksRemaining == 0); - while(!_freeWorkers.empty()) { + while (!_freeWorkers.empty()) { delete _freeWorkers.front(); _freeWorkers.pop_front(); } @@ -137,7 +126,7 @@ OldThreadPool::~OldThreadPool() { void OldThreadPool::join() { stdx::unique_lock<stdx::mutex> lock(_mutex); - while(_tasksRemaining) { + while (_tasksRemaining) { _condition.wait(lock); } } @@ -150,8 +139,7 @@ void OldThreadPool::schedule(Task task) { if (!_freeWorkers.empty()) { _freeWorkers.front()->set_task(task); _freeWorkers.pop_front(); - } - else { + } else { _tasks.push_back(task); } } @@ -163,15 +151,14 @@ void OldThreadPool::task_done(Worker* worker) { if (!_tasks.empty()) { worker->set_task(_tasks.front()); _tasks.pop_front(); - } - else { + } else { _freeWorkers.push_front(worker); } _tasksRemaining--; - if(_tasksRemaining == 0) + if (_tasksRemaining == 0) _condition.notify_all(); } -} //namespace mongo +} // namespace mongo diff --git a/src/mongo/util/concurrency/old_thread_pool.h b/src/mongo/util/concurrency/old_thread_pool.h index 9ece85425b7..3d564d92c8d 100644 --- a/src/mongo/util/concurrency/old_thread_pool.h +++ b/src/mongo/util/concurrency/old_thread_pool.h @@ -37,66 +37,79 @@ namespace mongo { - /** - * Implementation of a fixed-size pool of threads that can perform scheduled - * tasks. - */ - class OldThreadPool { - MONGO_DISALLOW_COPYING(OldThreadPool); - public: - typedef stdx::function<void(void)> Task; //nullary function or functor - struct DoNotStartThreadsTag {}; - - explicit OldThreadPool(int nThreads=8, const std::string& threadNamePrefix=""); - explicit OldThreadPool(const DoNotStartThreadsTag&, - int nThreads=8, - const std::string& threadNamePrefix=""); - - // blocks until all tasks are complete (tasks_remaining() == 0) - // You should not call schedule while in the destructor - ~OldThreadPool(); - - // Launches the worker threads; call exactly once, if and only if - // you used the DoNotStartThreadsTag form of the constructor. - void startThreads(); - - // blocks until all tasks are complete (tasks_remaining() == 0) - // does not prevent new tasks from being scheduled so could wait forever. - // Also, new tasks could be scheduled after this returns. - void join(); - - // task will be copied a few times so make sure it's relatively cheap - void schedule(Task task); - - // Helpers that wrap schedule and stdx::bind. - // Functor and args will be copied a few times so make sure it's relatively cheap - template<typename F, typename A> - void schedule(F f, A a) { schedule(stdx::bind(f,a)); } - template<typename F, typename A, typename B> - void schedule(F f, A a, B b) { schedule(stdx::bind(f,a,b)); } - template<typename F, typename A, typename B, typename C> - void schedule(F f, A a, B b, C c) { schedule(stdx::bind(f,a,b,c)); } - template<typename F, typename A, typename B, typename C, typename D> - void schedule(F f, A a, B b, C c, D d) { schedule(stdx::bind(f,a,b,c,d)); } - template<typename F, typename A, typename B, typename C, typename D, typename E> - void schedule(F f, A a, B b, C c, D d, E e) { schedule(stdx::bind(f,a,b,c,d,e)); } - - int tasks_remaining() { return _tasksRemaining; } - - private: - class Worker; - stdx::mutex _mutex; - stdx::condition_variable _condition; - - std::list<Worker*> _freeWorkers; //used as LIFO stack (always front) - std::list<Task> _tasks; //used as FIFO queue (push_back, pop_front) - int _tasksRemaining; // in queue + currently processing - int _nThreads; // only used for sanity checking. could be removed in the future. - const std::string _threadNamePrefix; // used for logging/diagnostics - - // should only be called by a worker from the worker's thread - void task_done(Worker* worker); - friend class Worker; - }; - -} //namespace mongo +/** + * Implementation of a fixed-size pool of threads that can perform scheduled + * tasks. + */ +class OldThreadPool { + MONGO_DISALLOW_COPYING(OldThreadPool); + +public: + typedef stdx::function<void(void)> Task; // nullary function or functor + struct DoNotStartThreadsTag {}; + + explicit OldThreadPool(int nThreads = 8, const std::string& threadNamePrefix = ""); + explicit OldThreadPool(const DoNotStartThreadsTag&, + int nThreads = 8, + const std::string& threadNamePrefix = ""); + + // blocks until all tasks are complete (tasks_remaining() == 0) + // You should not call schedule while in the destructor + ~OldThreadPool(); + + // Launches the worker threads; call exactly once, if and only if + // you used the DoNotStartThreadsTag form of the constructor. + void startThreads(); + + // blocks until all tasks are complete (tasks_remaining() == 0) + // does not prevent new tasks from being scheduled so could wait forever. + // Also, new tasks could be scheduled after this returns. + void join(); + + // task will be copied a few times so make sure it's relatively cheap + void schedule(Task task); + + // Helpers that wrap schedule and stdx::bind. + // Functor and args will be copied a few times so make sure it's relatively cheap + template <typename F, typename A> + void schedule(F f, A a) { + schedule(stdx::bind(f, a)); + } + template <typename F, typename A, typename B> + void schedule(F f, A a, B b) { + schedule(stdx::bind(f, a, b)); + } + template <typename F, typename A, typename B, typename C> + void schedule(F f, A a, B b, C c) { + schedule(stdx::bind(f, a, b, c)); + } + template <typename F, typename A, typename B, typename C, typename D> + void schedule(F f, A a, B b, C c, D d) { + schedule(stdx::bind(f, a, b, c, d)); + } + template <typename F, typename A, typename B, typename C, typename D, typename E> + void schedule(F f, A a, B b, C c, D d, E e) { + schedule(stdx::bind(f, a, b, c, d, e)); + } + + int tasks_remaining() { + return _tasksRemaining; + } + +private: + class Worker; + stdx::mutex _mutex; + stdx::condition_variable _condition; + + std::list<Worker*> _freeWorkers; // used as LIFO stack (always front) + std::list<Task> _tasks; // used as FIFO queue (push_back, pop_front) + int _tasksRemaining; // in queue + currently processing + int _nThreads; // only used for sanity checking. could be removed in the future. + const std::string _threadNamePrefix; // used for logging/diagnostics + + // should only be called by a worker from the worker's thread + void task_done(Worker* worker); + friend class Worker; +}; + +} // namespace mongo diff --git a/src/mongo/util/concurrency/rwlock.h b/src/mongo/util/concurrency/rwlock.h index 22373efd421..b3988efa06b 100644 --- a/src/mongo/util/concurrency/rwlock.h +++ b/src/mongo/util/concurrency/rwlock.h @@ -39,217 +39,253 @@ namespace mongo { - class RWLock : public RWLockBase { - enum { NilState, UpgradableState, Exclusive } x; // only bother to set when doing upgradable related things - public: - const char * const _name; - RWLock(const char *name) : _name(name) { - x = NilState; - } - void lock() { - RWLockBase::lock(); - } - void unlock() { - RWLockBase::unlock(); - } +class RWLock : public RWLockBase { + enum { + NilState, + UpgradableState, + Exclusive + } x; // only bother to set when doing upgradable related things +public: + const char* const _name; + RWLock(const char* name) : _name(name) { + x = NilState; + } + void lock() { + RWLockBase::lock(); + } + void unlock() { + RWLockBase::unlock(); + } - void lock_shared() { RWLockBase::lock_shared(); } - void unlock_shared() { RWLockBase::unlock_shared(); } - private: - void lockAsUpgradable() { RWLockBase::lockAsUpgradable(); } - void unlockFromUpgradable() { // upgradable -> unlocked - RWLockBase::unlockFromUpgradable(); - } - public: - void upgrade() { // upgradable -> exclusive lock - verify( x == UpgradableState ); - RWLockBase::upgrade(); - x = Exclusive; - } + void lock_shared() { + RWLockBase::lock_shared(); + } + void unlock_shared() { + RWLockBase::unlock_shared(); + } - bool lock_shared_try( int millis ) { return RWLockBase::lock_shared_try(millis); } +private: + void lockAsUpgradable() { + RWLockBase::lockAsUpgradable(); + } + void unlockFromUpgradable() { // upgradable -> unlocked + RWLockBase::unlockFromUpgradable(); + } - bool lock_try( int millis = 0 ) { - return RWLockBase::lock_try(millis); - } +public: + void upgrade() { // upgradable -> exclusive lock + verify(x == UpgradableState); + RWLockBase::upgrade(); + x = Exclusive; + } - /** acquire upgradable state. You must be unlocked before creating. - unlocks on destruction, whether in upgradable state or upgraded to exclusive - in the interim. - */ - class Upgradable { - MONGO_DISALLOW_COPYING(Upgradable); - RWLock& _r; - public: - Upgradable(RWLock& r) : _r(r) { - r.lockAsUpgradable(); - verify( _r.x == NilState ); - _r.x = RWLock::UpgradableState; - } - ~Upgradable() { - if( _r.x == RWLock::UpgradableState ) { - _r.x = NilState; - _r.unlockFromUpgradable(); - } - else { - //TEMP verify( _r.x == Exclusive ); // has been upgraded - _r.x = NilState; - _r.unlock(); - } - } - }; - }; + bool lock_shared_try(int millis) { + return RWLockBase::lock_shared_try(millis); + } - /** throws on failure to acquire in the specified time period. */ - class rwlock_try_write { - MONGO_DISALLOW_COPYING(rwlock_try_write); - public: - struct exception { }; - rwlock_try_write(RWLock& l, int millis = 0) : _l(l) { - if( !l.lock_try(millis) ) - throw exception(); - } - ~rwlock_try_write() { _l.unlock(); } - private: - RWLock& _l; - }; + bool lock_try(int millis = 0) { + return RWLockBase::lock_try(millis); + } - class rwlock_shared { - MONGO_DISALLOW_COPYING(rwlock_shared); - public: - rwlock_shared(RWLock& rwlock) : _r(rwlock) {_r.lock_shared(); } - ~rwlock_shared() { _r.unlock_shared(); } - private: + /** acquire upgradable state. You must be unlocked before creating. + unlocks on destruction, whether in upgradable state or upgraded to exclusive + in the interim. + */ + class Upgradable { + MONGO_DISALLOW_COPYING(Upgradable); RWLock& _r; - }; - /* scoped lock for RWLock */ - class rwlock { - MONGO_DISALLOW_COPYING(rwlock); public: - /** - * @param write acquire write lock if true sharable if false - * @param lowPriority if > 0, will try to get the lock non-greedily for that many ms - */ - rwlock( const RWLock& lock , bool write, /* bool alreadyHaveLock = false , */int lowPriorityWaitMS = 0 ) - : _lock( (RWLock&)lock ) , _write( write ) { - { - if ( _write ) { - _lock.lock(); - } - else { - _lock.lock_shared(); - } - } + Upgradable(RWLock& r) : _r(r) { + r.lockAsUpgradable(); + verify(_r.x == NilState); + _r.x = RWLock::UpgradableState; } - ~rwlock() { - if ( _write ) - _lock.unlock(); - else - _lock.unlock_shared(); + ~Upgradable() { + if (_r.x == RWLock::UpgradableState) { + _r.x = NilState; + _r.unlockFromUpgradable(); + } else { + // TEMP verify( _r.x == Exclusive ); // has been upgraded + _r.x = NilState; + _r.unlock(); + } } - private: - RWLock& _lock; - const bool _write; }; +}; - // ---------------------------------------------------------------------------------------- +/** throws on failure to acquire in the specified time period. */ +class rwlock_try_write { + MONGO_DISALLOW_COPYING(rwlock_try_write); - /** recursive on shared locks is ok for this implementation */ - class RWLockRecursive : protected RWLockBase { - protected: - ThreadLocalValue<int> _state; - void lock(); // not implemented - Lock() should be used; didn't overload this name to avoid mistakes - virtual void Lock() { RWLockBase::lock(); } - public: - virtual ~RWLockRecursive() { } - const char * const _name; - RWLockRecursive(const char *name) : _name(name) { } +public: + struct exception {}; + rwlock_try_write(RWLock& l, int millis = 0) : _l(l) { + if (!l.lock_try(millis)) + throw exception(); + } + ~rwlock_try_write() { + _l.unlock(); + } + +private: + RWLock& _l; +}; + +class rwlock_shared { + MONGO_DISALLOW_COPYING(rwlock_shared); + +public: + rwlock_shared(RWLock& rwlock) : _r(rwlock) { + _r.lock_shared(); + } + ~rwlock_shared() { + _r.unlock_shared(); + } + +private: + RWLock& _r; +}; - void assertAtLeastReadLocked() { - verify( _state.get() != 0 ); +/* scoped lock for RWLock */ +class rwlock { + MONGO_DISALLOW_COPYING(rwlock); + +public: + /** + * @param write acquire write lock if true sharable if false + * @param lowPriority if > 0, will try to get the lock non-greedily for that many ms + */ + rwlock(const RWLock& lock, + bool write, + /* bool alreadyHaveLock = false , */ int lowPriorityWaitMS = 0) + : _lock((RWLock&)lock), _write(write) { + { + if (_write) { + _lock.lock(); + } else { + _lock.lock_shared(); + } + } + } + ~rwlock() { + if (_write) + _lock.unlock(); + else + _lock.unlock_shared(); + } + +private: + RWLock& _lock; + const bool _write; +}; + +// ---------------------------------------------------------------------------------------- + +/** recursive on shared locks is ok for this implementation */ +class RWLockRecursive : protected RWLockBase { +protected: + ThreadLocalValue<int> _state; + void + lock(); // not implemented - Lock() should be used; didn't overload this name to avoid mistakes + virtual void Lock() { + RWLockBase::lock(); + } + +public: + virtual ~RWLockRecursive() {} + const char* const _name; + RWLockRecursive(const char* name) : _name(name) {} + + void assertAtLeastReadLocked() { + verify(_state.get() != 0); + } + void assertExclusivelyLocked() { + verify(_state.get() < 0); + } + + class Exclusive { + MONGO_DISALLOW_COPYING(Exclusive); + RWLockRecursive& _r; + + public: + Exclusive(RWLockRecursive& r) : _r(r) { + int s = _r._state.get(); + dassert(s <= 0); + if (s == 0) + _r.Lock(); + _r._state.set(s - 1); } - void assertExclusivelyLocked() { - verify( _state.get() < 0 ); + ~Exclusive() { + int s = _r._state.get(); + DEV wassert(s < 0); // wassert: don't throw from destructors + ++s; + _r._state.set(s); + if (s == 0) + _r.unlock(); } + }; - class Exclusive { - MONGO_DISALLOW_COPYING(Exclusive); - RWLockRecursive& _r; - public: - Exclusive(RWLockRecursive& r) : _r(r) { - int s = _r._state.get(); - dassert( s <= 0 ); - if( s == 0 ) - _r.Lock(); - _r._state.set(s-1); + class Shared { + MONGO_DISALLOW_COPYING(Shared); + RWLockRecursive& _r; + bool _alreadyLockedExclusiveByUs; + + public: + Shared(RWLockRecursive& r) : _r(r) { + int s = _r._state.get(); + _alreadyLockedExclusiveByUs = s < 0; + if (!_alreadyLockedExclusiveByUs) { + dassert(s >= 0); // -1 would mean exclusive + if (s == 0) + _r.lock_shared(); + _r._state.set(s + 1); } - ~Exclusive() { - int s = _r._state.get(); - DEV wassert( s < 0 ); // wassert: don't throw from destructors - ++s; + } + ~Shared() { + if (_alreadyLockedExclusiveByUs) { + DEV wassert(_r._state.get() < 0); + } else { + int s = _r._state.get() - 1; + DEV wassert(s >= 0); _r._state.set(s); - if ( s == 0 ) - _r.unlock(); + if (s == 0) + _r.unlock_shared(); } - }; - - class Shared { - MONGO_DISALLOW_COPYING(Shared); - RWLockRecursive& _r; - bool _alreadyLockedExclusiveByUs; - public: - Shared(RWLockRecursive& r) : _r(r) { - int s = _r._state.get(); - _alreadyLockedExclusiveByUs = s < 0; - if( !_alreadyLockedExclusiveByUs ) { - dassert( s >= 0 ); // -1 would mean exclusive - if( s == 0 ) - _r.lock_shared(); - _r._state.set(s+1); - } - } - ~Shared() { - if( _alreadyLockedExclusiveByUs ) { - DEV wassert( _r._state.get() < 0 ); - } - else { - int s = _r._state.get() - 1; - DEV wassert( s >= 0 ); - _r._state.set(s); - if( s == 0 ) - _r.unlock_shared(); - } - } - }; + } }; +}; - class RWLockRecursiveNongreedy : public RWLockRecursive { - virtual void Lock() { - bool got = false; - for ( int i=0; i<lowPriorityWaitMS; i++ ) { - if ( lock_try(0) ) { - got = true; - break; - } - int sleep = 1; - if ( i > ( lowPriorityWaitMS / 20 ) ) - sleep = 10; - sleepmillis(sleep); - i += ( sleep - 1 ); - } - if ( ! got ) { - RWLockBase::lock(); +class RWLockRecursiveNongreedy : public RWLockRecursive { + virtual void Lock() { + bool got = false; + for (int i = 0; i < lowPriorityWaitMS; i++) { + if (lock_try(0)) { + got = true; + break; } + int sleep = 1; + if (i > (lowPriorityWaitMS / 20)) + sleep = 10; + sleepmillis(sleep); + i += (sleep - 1); } + if (!got) { + RWLockBase::lock(); + } + } - public: - const int lowPriorityWaitMS; - RWLockRecursiveNongreedy(const char *nm, int lpwaitms) : RWLockRecursive(nm), lowPriorityWaitMS(lpwaitms) { } - const char * implType() const { return RWLockRecursive::implType(); } - - //just for testing: - bool __lock_try( int millis ) { return RWLockRecursive::lock_try(millis); } - }; +public: + const int lowPriorityWaitMS; + RWLockRecursiveNongreedy(const char* nm, int lpwaitms) + : RWLockRecursive(nm), lowPriorityWaitMS(lpwaitms) {} + const char* implType() const { + return RWLockRecursive::implType(); + } + // just for testing: + bool __lock_try(int millis) { + return RWLockRecursive::lock_try(millis); + } +}; } diff --git a/src/mongo/util/concurrency/rwlockimpl.cpp b/src/mongo/util/concurrency/rwlockimpl.cpp index b8bcf50b530..28eeebd363b 100644 --- a/src/mongo/util/concurrency/rwlockimpl.cpp +++ b/src/mongo/util/concurrency/rwlockimpl.cpp @@ -52,62 +52,69 @@ using namespace std; namespace mongo { #if defined(NTDDI_VERSION) && defined(NTDDI_WIN7) && (NTDDI_VERSION >= NTDDI_WIN7) - SimpleRWLock::SimpleRWLock(StringData p) : name(p.toString()) { - InitializeSRWLock(&_lock); - } -# if defined(MONGO_CONFIG_DEBUG_BUILD) - // the code below in a debug build will check that we don't try to recursively lock, - // which is not supported by this class. also checks that you don't unlock without - // having locked - void SimpleRWLock::lock() { - unsigned me = GetCurrentThreadId(); - int& state = s.getRef(); - dassert( state == 0 ); - state--; - AcquireSRWLockExclusive(&_lock); - tid = me; // this is for use in the debugger to see who does have the lock - } - void SimpleRWLock::unlock() { - int& state = s.getRef(); - dassert( state == -1 ); - state++; - tid = 0xffffffff; - ReleaseSRWLockExclusive(&_lock); - } - void SimpleRWLock::lock_shared() { - int& state = s.getRef(); - dassert( state == 0 ); - state++; - AcquireSRWLockShared(&_lock); - shares.fetchAndAdd(1); - } - void SimpleRWLock::unlock_shared() { - int& state = s.getRef(); - dassert( state == 1 ); - state--; - shares.fetchAndSubtract(1); - ReleaseSRWLockShared(&_lock); - } -# else - void SimpleRWLock::lock() { - AcquireSRWLockExclusive(&_lock); - } - void SimpleRWLock::unlock() { - ReleaseSRWLockExclusive(&_lock); - } - void SimpleRWLock::lock_shared() { - AcquireSRWLockShared(&_lock); - } - void SimpleRWLock::unlock_shared() { - ReleaseSRWLockShared(&_lock); - } -# endif +SimpleRWLock::SimpleRWLock(StringData p) : name(p.toString()) { + InitializeSRWLock(&_lock); +} +#if defined(MONGO_CONFIG_DEBUG_BUILD) +// the code below in a debug build will check that we don't try to recursively lock, +// which is not supported by this class. also checks that you don't unlock without +// having locked +void SimpleRWLock::lock() { + unsigned me = GetCurrentThreadId(); + int& state = s.getRef(); + dassert(state == 0); + state--; + AcquireSRWLockExclusive(&_lock); + tid = me; // this is for use in the debugger to see who does have the lock +} +void SimpleRWLock::unlock() { + int& state = s.getRef(); + dassert(state == -1); + state++; + tid = 0xffffffff; + ReleaseSRWLockExclusive(&_lock); +} +void SimpleRWLock::lock_shared() { + int& state = s.getRef(); + dassert(state == 0); + state++; + AcquireSRWLockShared(&_lock); + shares.fetchAndAdd(1); +} +void SimpleRWLock::unlock_shared() { + int& state = s.getRef(); + dassert(state == 1); + state--; + shares.fetchAndSubtract(1); + ReleaseSRWLockShared(&_lock); +} +#else +void SimpleRWLock::lock() { + AcquireSRWLockExclusive(&_lock); +} +void SimpleRWLock::unlock() { + ReleaseSRWLockExclusive(&_lock); +} +void SimpleRWLock::lock_shared() { + AcquireSRWLockShared(&_lock); +} +void SimpleRWLock::unlock_shared() { + ReleaseSRWLockShared(&_lock); +} +#endif #else - SimpleRWLock::SimpleRWLock(StringData p) : name(p.toString()) { } - void SimpleRWLock::lock() { m.lock(); } - void SimpleRWLock::unlock() { m.unlock(); } - void SimpleRWLock::lock_shared() { m.lock_shared(); } - void SimpleRWLock::unlock_shared() { m.unlock_shared(); } +SimpleRWLock::SimpleRWLock(StringData p) : name(p.toString()) {} +void SimpleRWLock::lock() { + m.lock(); +} +void SimpleRWLock::unlock() { + m.unlock(); +} +void SimpleRWLock::lock_shared() { + m.lock_shared(); +} +void SimpleRWLock::unlock_shared() { + m.unlock_shared(); +} #endif - } diff --git a/src/mongo/util/concurrency/rwlockimpl.h b/src/mongo/util/concurrency/rwlockimpl.h index a591bd389b7..898197a9777 100644 --- a/src/mongo/util/concurrency/rwlockimpl.h +++ b/src/mongo/util/concurrency/rwlockimpl.h @@ -35,110 +35,137 @@ #if defined(NTDDI_VERSION) && defined(NTDDI_WIN7) && (NTDDI_VERSION >= NTDDI_WIN7) // Windows slimreaderwriter version. Newer windows versions only. Under contention this is slower -// than boost::shared_mutex, but see https://jira.mongodb.org/browse/SERVER-2327 for why it cannot +// than boost::shared_mutex, but see https://jira.mongodb.org/browse/SERVER-2327 for why it cannot // be used. namespace mongo { - unsigned long long curTimeMicros64(); - - class RWLockBase { - MONGO_DISALLOW_COPYING(RWLockBase); - friend class SimpleRWLock; - SRWLOCK _lock; - protected: - RWLockBase() { InitializeSRWLock(&_lock); } - ~RWLockBase() { - // no special action needed to destroy a SRWLOCK - } - void lock() { AcquireSRWLockExclusive(&_lock); } - void unlock() { ReleaseSRWLockExclusive(&_lock); } - void lock_shared() { AcquireSRWLockShared(&_lock); } - void unlock_shared() { ReleaseSRWLockShared(&_lock); } - bool lock_shared_try( int millis ) { - if( TryAcquireSRWLockShared(&_lock) ) - return true; - if( millis == 0 ) - return false; - unsigned long long end = curTimeMicros64() + millis*1000; - while( 1 ) { - Sleep(1); - if( TryAcquireSRWLockShared(&_lock) ) - return true; - if( curTimeMicros64() >= end ) - break; - } +unsigned long long curTimeMicros64(); + +class RWLockBase { + MONGO_DISALLOW_COPYING(RWLockBase); + friend class SimpleRWLock; + SRWLOCK _lock; + +protected: + RWLockBase() { + InitializeSRWLock(&_lock); + } + ~RWLockBase() { + // no special action needed to destroy a SRWLOCK + } + void lock() { + AcquireSRWLockExclusive(&_lock); + } + void unlock() { + ReleaseSRWLockExclusive(&_lock); + } + void lock_shared() { + AcquireSRWLockShared(&_lock); + } + void unlock_shared() { + ReleaseSRWLockShared(&_lock); + } + bool lock_shared_try(int millis) { + if (TryAcquireSRWLockShared(&_lock)) + return true; + if (millis == 0) return false; - } - bool lock_try( int millis = 0 ) { - if( TryAcquireSRWLockExclusive(&_lock) ) // quick check to optimistically avoid calling curTimeMicros64 + unsigned long long end = curTimeMicros64() + millis * 1000; + while (1) { + Sleep(1); + if (TryAcquireSRWLockShared(&_lock)) return true; - if( millis == 0 ) - return false; - unsigned long long end = curTimeMicros64() + millis*1000; - do { - Sleep(1); - if( TryAcquireSRWLockExclusive(&_lock) ) - return true; - } while( curTimeMicros64() < end ); - return false; + if (curTimeMicros64() >= end) + break; } - // no upgradable for this impl - void lockAsUpgradable() { lock(); } - void unlockFromUpgradable() { unlock(); } - void upgrade() { } - public: - const char * implType() const { return "WINSRW"; } - }; + return false; + } + bool lock_try(int millis = 0) { + if (TryAcquireSRWLockExclusive( + &_lock)) // quick check to optimistically avoid calling curTimeMicros64 + return true; + if (millis == 0) + return false; + unsigned long long end = curTimeMicros64() + millis * 1000; + do { + Sleep(1); + if (TryAcquireSRWLockExclusive(&_lock)) + return true; + } while (curTimeMicros64() < end); + return false; + } + // no upgradable for this impl + void lockAsUpgradable() { + lock(); + } + void unlockFromUpgradable() { + unlock(); + } + void upgrade() {} + +public: + const char* implType() const { + return "WINSRW"; + } +}; } #else -# if defined(_WIN32) -# include "shared_mutex_win.hpp" -namespace mongo { typedef boost::modified_shared_mutex shared_mutex; } -# else -# include <boost/thread/shared_mutex.hpp> -namespace mongo { using boost::shared_mutex; } -# endif - -namespace mongo { - class RWLockBase { - MONGO_DISALLOW_COPYING(RWLockBase); - friend class SimpleRWLock; - shared_mutex _m; - protected: - RWLockBase() = default; - - void lock() { - _m.lock(); - } - void unlock() { - _m.unlock(); - } - void lockAsUpgradable() { - _m.lock_upgrade(); - } - void unlockFromUpgradable() { // upgradable -> unlocked - _m.unlock_upgrade(); - } - void upgrade() { // upgradable -> exclusive lock - _m.unlock_upgrade_and_lock(); - } - void lock_shared() { - _m.lock_shared(); - } - void unlock_shared() { - _m.unlock_shared(); - } - bool lock_shared_try( int millis ) { - return _m.timed_lock_shared( boost::posix_time::milliseconds(millis) ); - } - bool lock_try( int millis = 0 ) { - return _m.timed_lock( boost::posix_time::milliseconds(millis) ); - } - public: - const char * implType() const { return "boost"; } - }; +#if defined(_WIN32) +#include "shared_mutex_win.hpp" +namespace mongo { +typedef boost::modified_shared_mutex shared_mutex; +} +#else +#include <boost/thread/shared_mutex.hpp> +namespace mongo { +using boost::shared_mutex; +} +#endif + +namespace mongo { +class RWLockBase { + MONGO_DISALLOW_COPYING(RWLockBase); + friend class SimpleRWLock; + shared_mutex _m; + +protected: + RWLockBase() = default; + + void lock() { + _m.lock(); + } + void unlock() { + _m.unlock(); + } + void lockAsUpgradable() { + _m.lock_upgrade(); + } + void unlockFromUpgradable() { // upgradable -> unlocked + _m.unlock_upgrade(); + } + void upgrade() { // upgradable -> exclusive lock + _m.unlock_upgrade_and_lock(); + } + void lock_shared() { + _m.lock_shared(); + } + void unlock_shared() { + _m.unlock_shared(); + } + bool lock_shared_try(int millis) { + return _m.timed_lock_shared(boost::posix_time::milliseconds(millis)); + } + bool lock_try(int millis = 0) { + return _m.timed_lock(boost::posix_time::milliseconds(millis)); + } + +public: + const char* implType() const { + return "boost"; + } +}; } #endif diff --git a/src/mongo/util/concurrency/simplerwlock.h b/src/mongo/util/concurrency/simplerwlock.h index 8a8b3171ec9..4673f799f8c 100644 --- a/src/mongo/util/concurrency/simplerwlock.h +++ b/src/mongo/util/concurrency/simplerwlock.h @@ -35,42 +35,51 @@ namespace mongo { - /** separated out as later the implementation of this may be different than RWLock, - depending on OS, as there is no upgrade etc. facility herein. - */ - class SimpleRWLock { - MONGO_DISALLOW_COPYING(SimpleRWLock); +/** separated out as later the implementation of this may be different than RWLock, + depending on OS, as there is no upgrade etc. facility herein. +*/ +class SimpleRWLock { + MONGO_DISALLOW_COPYING(SimpleRWLock); #if defined(NTDDI_VERSION) && defined(NTDDI_WIN7) && (NTDDI_VERSION >= NTDDI_WIN7) - SRWLOCK _lock; + SRWLOCK _lock; #else - RWLockBase m; + RWLockBase m; #endif #if defined(_WIN32) && defined(MONGO_CONFIG_DEBUG_BUILD) - AtomicUInt32 shares; - ThreadLocalValue<int> s; - unsigned tid; + AtomicUInt32 shares; + ThreadLocalValue<int> s; + unsigned tid; #endif +public: + const std::string name; + SimpleRWLock(StringData name = ""); + void lock(); + void unlock(); + void lock_shared(); + void unlock_shared(); + class Shared { + MONGO_DISALLOW_COPYING(Shared); + SimpleRWLock& _r; + public: - const std::string name; - SimpleRWLock(StringData name = "" ); - void lock(); - void unlock(); - void lock_shared(); - void unlock_shared(); - class Shared { - MONGO_DISALLOW_COPYING(Shared); - SimpleRWLock& _r; - public: - Shared(SimpleRWLock& rwlock) : _r(rwlock) {_r.lock_shared(); } - ~Shared() { _r.unlock_shared(); } - }; - class Exclusive { - MONGO_DISALLOW_COPYING(Exclusive); - SimpleRWLock& _r; - public: - Exclusive(SimpleRWLock& rwlock) : _r(rwlock) {_r.lock(); } - ~Exclusive() { _r.unlock(); } - }; + Shared(SimpleRWLock& rwlock) : _r(rwlock) { + _r.lock_shared(); + } + ~Shared() { + _r.unlock_shared(); + } }; + class Exclusive { + MONGO_DISALLOW_COPYING(Exclusive); + SimpleRWLock& _r; + public: + Exclusive(SimpleRWLock& rwlock) : _r(rwlock) { + _r.lock(); + } + ~Exclusive() { + _r.unlock(); + } + }; +}; } diff --git a/src/mongo/util/concurrency/spin_lock.cpp b/src/mongo/util/concurrency/spin_lock.cpp index 6b52f4b727f..601dcd0f53b 100644 --- a/src/mongo/util/concurrency/spin_lock.cpp +++ b/src/mongo/util/concurrency/spin_lock.cpp @@ -28,7 +28,7 @@ */ #include "mongo/platform/basic.h" -#undef MONGO_PCH_WHITELISTED // todo eliminate this include +#undef MONGO_PCH_WHITELISTED // todo eliminate this include #include "mongo/util/concurrency/spin_lock.h" @@ -38,90 +38,95 @@ namespace mongo { - SpinLock::~SpinLock() { +SpinLock::~SpinLock() { #if defined(_WIN32) - DeleteCriticalSection(&_cs); + DeleteCriticalSection(&_cs); #elif defined(__USE_XOPEN2K) - pthread_spin_destroy(&_lock); + pthread_spin_destroy(&_lock); #endif - } +} - SpinLock::SpinLock() +SpinLock::SpinLock() #if defined(_WIN32) - { InitializeCriticalSectionAndSpinCount(&_cs, 4000); } +{ + InitializeCriticalSectionAndSpinCount(&_cs, 4000); +} #elif defined(__USE_XOPEN2K) - { pthread_spin_init( &_lock , 0 ); } +{ + pthread_spin_init(&_lock, 0); +} #elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - : _locked( false ) { } + : _locked(false) { +} #else - : _mutex( "SpinLock" ) { } + : _mutex("SpinLock") { +} #endif #if defined(__USE_XOPEN2K) - NOINLINE_DECL void SpinLock::_lk() { - /** - * this is designed to perform close to the default spin lock - * the reason for the mild insanity is to prevent horrible performance - * when contention spikes - * it allows spinlocks to be used in many more places - * which is good because even with this change they are about 8x faster on linux - */ - - for ( int i=0; i<1000; i++ ) { - if ( pthread_spin_trylock( &_lock ) == 0 ) - return; +NOINLINE_DECL void SpinLock::_lk() { + /** + * this is designed to perform close to the default spin lock + * the reason for the mild insanity is to prevent horrible performance + * when contention spikes + * it allows spinlocks to be used in many more places + * which is good because even with this change they are about 8x faster on linux + */ + + for (int i = 0; i < 1000; i++) { + if (pthread_spin_trylock(&_lock) == 0) + return; #if defined(__i386__) || defined(__x86_64__) - asm volatile ( "pause" ) ; // maybe trylock does this; just in case. + asm volatile("pause"); // maybe trylock does this; just in case. #endif - } - - for ( int i=0; i<1000; i++ ) { - if ( pthread_spin_trylock( &_lock ) == 0 ) - return; - pthread_yield(); - } - - struct timespec t; - t.tv_sec = 0; - t.tv_nsec = 5000000; - - while ( pthread_spin_trylock( &_lock ) != 0 ) { - nanosleep(&t, NULL); - } } -#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - void SpinLock::lock() { - // fast path - if (!_locked && !__sync_lock_test_and_set(&_locked, true)) { + for (int i = 0; i < 1000; i++) { + if (pthread_spin_trylock(&_lock) == 0) return; - } + pthread_yield(); + } + + struct timespec t; + t.tv_sec = 0; + t.tv_nsec = 5000000; + + while (pthread_spin_trylock(&_lock) != 0) { + nanosleep(&t, NULL); + } +} +#elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) +void SpinLock::lock() { + // fast path + if (!_locked && !__sync_lock_test_and_set(&_locked, true)) { + return; + } - // wait for lock - int wait = 1000; - while ((wait-- > 0) && (_locked)) { + // wait for lock + int wait = 1000; + while ((wait-- > 0) && (_locked)) { #if defined(__i386__) || defined(__x86_64__) - asm volatile ( "pause" ) ; + asm volatile("pause"); #endif - } - - // if failed to grab lock, sleep - struct timespec t; - t.tv_sec = 0; - t.tv_nsec = 5000000; - while (__sync_lock_test_and_set(&_locked, true)) { - nanosleep(&t, NULL); - } } + + // if failed to grab lock, sleep + struct timespec t; + t.tv_sec = 0; + t.tv_nsec = 5000000; + while (__sync_lock_test_and_set(&_locked, true)) { + nanosleep(&t, NULL); + } +} #endif - bool SpinLock::isfast() { +bool SpinLock::isfast() { #if defined(_WIN32) || defined(__USE_XOPEN2K) || defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - return true; + return true; #else - return false; + return false; #endif - } +} } // namespace mongo diff --git a/src/mongo/util/concurrency/spin_lock.h b/src/mongo/util/concurrency/spin_lock.h index 782edd4f026..e56458d6cf8 100644 --- a/src/mongo/util/concurrency/spin_lock.h +++ b/src/mongo/util/concurrency/spin_lock.h @@ -38,58 +38,78 @@ namespace mongo { - /** - * The spinlock currently requires late GCC support routines to be efficient. - * Other platforms default to a mutex implemenation. - */ - class SpinLock { - MONGO_DISALLOW_COPYING(SpinLock); - public: - SpinLock(); - ~SpinLock(); - - static bool isfast(); // true if a real spinlock on this platform - - private: +/** + * The spinlock currently requires late GCC support routines to be efficient. + * Other platforms default to a mutex implemenation. + */ +class SpinLock { + MONGO_DISALLOW_COPYING(SpinLock); + +public: + SpinLock(); + ~SpinLock(); + + static bool isfast(); // true if a real spinlock on this platform + +private: #if defined(_WIN32) - CRITICAL_SECTION _cs; - public: - void lock() {EnterCriticalSection(&_cs); } - void unlock() { LeaveCriticalSection(&_cs); } + CRITICAL_SECTION _cs; + +public: + void lock() { + EnterCriticalSection(&_cs); + } + void unlock() { + LeaveCriticalSection(&_cs); + } #elif defined(__USE_XOPEN2K) - pthread_spinlock_t _lock; - void _lk(); - public: - void unlock() { pthread_spin_unlock(&_lock); } - void lock() { - if ( MONGO_likely( pthread_spin_trylock( &_lock ) == 0 ) ) - return; - _lk(); - } + pthread_spinlock_t _lock; + void _lk(); + +public: + void unlock() { + pthread_spin_unlock(&_lock); + } + void lock() { + if (MONGO_likely(pthread_spin_trylock(&_lock) == 0)) + return; + _lk(); + } #elif defined(__GCC_HAVE_SYNC_COMPARE_AND_SWAP_4) - volatile bool _locked; - public: - void unlock() {__sync_lock_release(&_locked); } - void lock(); + volatile bool _locked; + +public: + void unlock() { + __sync_lock_release(&_locked); + } + void lock(); #else - // default to a mutex if not implemented - SimpleMutex _mutex; - public: - void unlock() { _mutex.unlock(); } - void lock() { _mutex.lock(); } + // default to a mutex if not implemented + SimpleMutex _mutex; + +public: + void unlock() { + _mutex.unlock(); + } + void lock() { + _mutex.lock(); + } #endif - }; - - class scoped_spinlock { - MONGO_DISALLOW_COPYING(scoped_spinlock); - public: - scoped_spinlock( SpinLock& l ) : _l(l) { - _l.lock(); - } - ~scoped_spinlock() { - _l.unlock();} - private: - SpinLock& _l; - }; +}; + +class scoped_spinlock { + MONGO_DISALLOW_COPYING(scoped_spinlock); + +public: + scoped_spinlock(SpinLock& l) : _l(l) { + _l.lock(); + } + ~scoped_spinlock() { + _l.unlock(); + } + +private: + SpinLock& _l; +}; } // namespace mongo diff --git a/src/mongo/util/concurrency/spin_lock_test.cpp b/src/mongo/util/concurrency/spin_lock_test.cpp index 9fef554f352..8a2e1f47cec 100644 --- a/src/mongo/util/concurrency/spin_lock_test.cpp +++ b/src/mongo/util/concurrency/spin_lock_test.cpp @@ -34,81 +34,81 @@ namespace { - using mongo::SpinLock; - using mongo::Timer; +using mongo::SpinLock; +using mongo::Timer; - namespace stdx = mongo::stdx; +namespace stdx = mongo::stdx; - class LockTester { - public: - LockTester( SpinLock* spin, int* counter ) - : _spin(spin), _counter(counter), _requests(0) {} +class LockTester { +public: + LockTester(SpinLock* spin, int* counter) : _spin(spin), _counter(counter), _requests(0) {} - ~LockTester() { - delete _t; - } + ~LockTester() { + delete _t; + } - void start( int increments ) { - _t = new stdx::thread( mongo::stdx::bind(&LockTester::test, this, increments) ); - } + void start(int increments) { + _t = new stdx::thread(mongo::stdx::bind(&LockTester::test, this, increments)); + } - void join() { - if ( _t ) _t->join(); - } + void join() { + if (_t) + _t->join(); + } - int requests() const { - return _requests; - } + int requests() const { + return _requests; + } - private: - SpinLock* _spin; // not owned here - int* _counter; // not owned here - int _requests; - stdx::thread* _t; - - void test( int increments ) { - while ( increments-- > 0 ) { - _spin->lock(); - ++(*_counter); - ++_requests; - _spin->unlock(); - } +private: + SpinLock* _spin; // not owned here + int* _counter; // not owned here + int _requests; + stdx::thread* _t; + + void test(int increments) { + while (increments-- > 0) { + _spin->lock(); + ++(*_counter); + ++_requests; + _spin->unlock(); } + } - LockTester( LockTester& ); - LockTester& operator=( LockTester& ); - }; + LockTester(LockTester&); + LockTester& operator=(LockTester&); +}; - TEST(Concurrency, ConcurrentIncs) { - SpinLock spin; - int counter = 0; +TEST(Concurrency, ConcurrentIncs) { + SpinLock spin; + int counter = 0; - const int threads = 64; - const int incs = 50000; - LockTester* testers[threads]; + const int threads = 64; + const int incs = 50000; + LockTester* testers[threads]; - Timer timer; + Timer timer; - for ( int i = 0; i < threads; i++ ) { - testers[i] = new LockTester( &spin, &counter ); - } - for ( int i = 0; i < threads; i++ ) { - testers[i]->start( incs ); - } - for ( int i = 0; i < threads; i++ ) { - testers[i]->join(); - ASSERT_EQUALS( testers[i]->requests(), incs ); - delete testers[i]; - } + for (int i = 0; i < threads; i++) { + testers[i] = new LockTester(&spin, &counter); + } + for (int i = 0; i < threads; i++) { + testers[i]->start(incs); + } + for (int i = 0; i < threads; i++) { + testers[i]->join(); + ASSERT_EQUALS(testers[i]->requests(), incs); + delete testers[i]; + } - int ms = timer.millis(); - mongo::unittest::log() << "spinlock ConcurrentIncs time: " << ms << std::endl; + int ms = timer.millis(); + mongo::unittest::log() << "spinlock ConcurrentIncs time: " << ms << std::endl; - ASSERT_EQUALS( counter, threads*incs ); + ASSERT_EQUALS(counter, threads * incs); #if defined(__linux__) - ASSERT( SpinLock::isfast() ); + ASSERT(SpinLock::isfast()); #endif - } +} -} // namespace +} // namespace diff --git a/src/mongo/util/concurrency/synchronization.cpp b/src/mongo/util/concurrency/synchronization.cpp index c3b90019a1b..73c5cd946c2 100644 --- a/src/mongo/util/concurrency/synchronization.cpp +++ b/src/mongo/util/concurrency/synchronization.cpp @@ -38,81 +38,80 @@ namespace mongo { namespace { - ThreadIdleCallback threadIdleCallback; -} // namespace +ThreadIdleCallback threadIdleCallback; +} // namespace - void registerThreadIdleCallback(ThreadIdleCallback callback) { - invariant(!threadIdleCallback); - threadIdleCallback = callback; - } - - void markThreadIdle() { - if (!threadIdleCallback) { - return; - } - try { - threadIdleCallback(); - } - catch (...) { - severe() << "Exception escaped from threadIdleCallback"; - fassertFailedNoTrace(28603); - } - } - - Notification::Notification() { - lookFor = 1; - cur = 0; - } - - void Notification::waitToBeNotified() { - stdx::unique_lock<stdx::mutex> lock( _mutex ); - while ( lookFor != cur ) - _condition.wait(lock); - lookFor++; - } +void registerThreadIdleCallback(ThreadIdleCallback callback) { + invariant(!threadIdleCallback); + threadIdleCallback = callback; +} - void Notification::notifyOne() { - stdx::lock_guard<stdx::mutex> lock( _mutex ); - verify( cur != lookFor ); - cur++; - _condition.notify_one(); +void markThreadIdle() { + if (!threadIdleCallback) { + return; } - - /* --- NotifyAll --- */ - - NotifyAll::NotifyAll() { - _lastDone = 0; - _lastReturned = 0; - _nWaiting = 0; + try { + threadIdleCallback(); + } catch (...) { + severe() << "Exception escaped from threadIdleCallback"; + fassertFailedNoTrace(28603); } - - NotifyAll::When NotifyAll::now() { - stdx::lock_guard<stdx::mutex> lock( _mutex ); - return ++_lastReturned; +} + +Notification::Notification() { + lookFor = 1; + cur = 0; +} + +void Notification::waitToBeNotified() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + while (lookFor != cur) + _condition.wait(lock); + lookFor++; +} + +void Notification::notifyOne() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + verify(cur != lookFor); + cur++; + _condition.notify_one(); +} + +/* --- NotifyAll --- */ + +NotifyAll::NotifyAll() { + _lastDone = 0; + _lastReturned = 0; + _nWaiting = 0; +} + +NotifyAll::When NotifyAll::now() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return ++_lastReturned; +} + +void NotifyAll::waitFor(When e) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + ++_nWaiting; + while (_lastDone < e) { + _condition.wait(lock); } - - void NotifyAll::waitFor(When e) { - stdx::unique_lock<stdx::mutex> lock( _mutex ); - ++_nWaiting; - while( _lastDone < e ) { - _condition.wait(lock); - } +} + +void NotifyAll::awaitBeyondNow() { + stdx::unique_lock<stdx::mutex> lock(_mutex); + ++_nWaiting; + When e = ++_lastReturned; + while (_lastDone <= e) { + _condition.wait(lock); } +} - void NotifyAll::awaitBeyondNow() { - stdx::unique_lock<stdx::mutex> lock( _mutex ); - ++_nWaiting; - When e = ++_lastReturned; - while( _lastDone <= e ) { - _condition.wait(lock); - } - } - - void NotifyAll::notifyAll(When e) { - stdx::unique_lock<stdx::mutex> lock( _mutex ); - _lastDone = e; - _nWaiting = 0; - _condition.notify_all(); - } +void NotifyAll::notifyAll(When e) { + stdx::unique_lock<stdx::mutex> lock(_mutex); + _lastDone = e; + _nWaiting = 0; + _condition.notify_all(); +} -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/util/concurrency/synchronization.h b/src/mongo/util/concurrency/synchronization.h index b89320bb8c6..655ddfb149d 100644 --- a/src/mongo/util/concurrency/synchronization.h +++ b/src/mongo/util/concurrency/synchronization.h @@ -35,87 +35,91 @@ namespace mongo { - /** - * Type of callback functions that can be invoked when markThreadIdle() runs. - * These functions *must not throw*. - */ - typedef void(*ThreadIdleCallback)(); +/** + * Type of callback functions that can be invoked when markThreadIdle() runs. + * These functions *must not throw*. + */ +typedef void (*ThreadIdleCallback)(); - /** - * Informs the registered listener that this thread believes it may go idle for an extended - * period. The caller should avoid calling markThreadIdle at a high rate, as it can both be - * moderately costly itself and in terms of distributed overhead for subsequent malloc/free - * calls. - */ - void markThreadIdle(); +/** + * Informs the registered listener that this thread believes it may go idle for an extended + * period. The caller should avoid calling markThreadIdle at a high rate, as it can both be + * moderately costly itself and in terms of distributed overhead for subsequent malloc/free + * calls. + */ +void markThreadIdle(); + +/** + * Allows for registering callbacks for when threads go idle and become active. This is used + * by TCMalloc to return freed memory to its central freelist at appropriate points, so it + * won't happen during critical sections while holding locks. Calling this is not thread-safe. + */ +void registerThreadIdleCallback(ThreadIdleCallback callback); + +/* + * A class to establish a synchronization point between two threads. One thread is the waiter + * and one is the notifier. After the notification event, both proceed normally. + * + * This class is thread-safe. + */ +class Notification { + MONGO_DISALLOW_COPYING(Notification); + +public: + Notification(); - /** - * Allows for registering callbacks for when threads go idle and become active. This is used - * by TCMalloc to return freed memory to its central freelist at appropriate points, so it - * won't happen during critical sections while holding locks. Calling this is not thread-safe. + /* + * Blocks until the method 'notifyOne()' is called. */ - void registerThreadIdleCallback(ThreadIdleCallback callback); + void waitToBeNotified(); /* - * A class to establish a synchronization point between two threads. One thread is the waiter - * and one is the notifier. After the notification event, both proceed normally. - * - * This class is thread-safe. + * Notifies the waiter of '*this' that it can proceed. Can only be called once. */ - class Notification { - MONGO_DISALLOW_COPYING(Notification); - public: - Notification(); - - /* - * Blocks until the method 'notifyOne()' is called. - */ - void waitToBeNotified(); - - /* - * Notifies the waiter of '*this' that it can proceed. Can only be called once. - */ - void notifyOne(); - - private: - stdx::mutex _mutex; // protects state below - unsigned long long lookFor; - unsigned long long cur; - stdx::condition_variable _condition; // cond over _notified being true - }; - - /** establishes a synchronization point between threads. N threads are waits and one is notifier. - threadsafe. - */ - class NotifyAll { - MONGO_DISALLOW_COPYING(NotifyAll); - public: - NotifyAll(); + void notifyOne(); - typedef unsigned long long When; +private: + stdx::mutex _mutex; // protects state below + unsigned long long lookFor; + unsigned long long cur; + stdx::condition_variable _condition; // cond over _notified being true +}; - When now(); +/** establishes a synchronization point between threads. N threads are waits and one is notifier. + threadsafe. +*/ +class NotifyAll { + MONGO_DISALLOW_COPYING(NotifyAll); - /** awaits the next notifyAll() call by another thread. notifications that precede this - call are ignored -- we are looking for a fresh event. - */ - void waitFor(When); +public: + NotifyAll(); + + typedef unsigned long long When; + + When now(); + + /** awaits the next notifyAll() call by another thread. notifications that precede this + call are ignored -- we are looking for a fresh event. + */ + void waitFor(When); - /** a bit faster than waitFor( now() ) */ - void awaitBeyondNow(); + /** a bit faster than waitFor( now() ) */ + void awaitBeyondNow(); - /** may be called multiple times. notifies all waiters */ - void notifyAll(When); + /** may be called multiple times. notifies all waiters */ + void notifyAll(When); - /** indicates how many threads are waiting for a notify. */ - unsigned nWaiting() const { return _nWaiting; } + /** indicates how many threads are waiting for a notify. */ + unsigned nWaiting() const { + return _nWaiting; + } - private: - stdx::mutex _mutex; - stdx::condition_variable _condition; - When _lastDone; - When _lastReturned; - unsigned _nWaiting; - }; +private: + stdx::mutex _mutex; + stdx::condition_variable _condition; + When _lastDone; + When _lastReturned; + unsigned _nWaiting; +}; -} // namespace mongo +} // namespace mongo diff --git a/src/mongo/util/concurrency/task.cpp b/src/mongo/util/concurrency/task.cpp index 915cdecbc66..a73938a5f75 100644 --- a/src/mongo/util/concurrency/task.cpp +++ b/src/mongo/util/concurrency/task.cpp @@ -37,49 +37,49 @@ namespace mongo { - namespace task { +namespace task { - Task::Task() - : BackgroundJob( true /* deleteSelf */ ) { - n = 0; - repeat = 0; - } +Task::Task() : BackgroundJob(true /* deleteSelf */) { + n = 0; + repeat = 0; +} - void Task::halt() { repeat = 0; } +void Task::halt() { + repeat = 0; +} - void Task::setUp() {} +void Task::setUp() {} - void Task::run() { - verify( n == 0 ); +void Task::run() { + verify(n == 0); - setUp(); + setUp(); - while( 1 ) { - n++; - try { - doWork(); - } - catch(...) { } - sleepmillis(repeat); - if( inShutdown() ) - break; - if( repeat == 0 ) - break; - } - } - - void Task::begin() { - go(); + while (1) { + n++; + try { + doWork(); + } catch (...) { } + sleepmillis(repeat); + if (inShutdown()) + break; + if (repeat == 0) + break; + } +} - void fork(Task *t) { - t->begin(); - } +void Task::begin() { + go(); +} - void repeat(Task *t, unsigned millis) { - t->repeat = millis; - t->begin(); - } +void fork(Task* t) { + t->begin(); +} - } +void repeat(Task* t, unsigned millis) { + t->repeat = millis; + t->begin(); +} +} } diff --git a/src/mongo/util/concurrency/task.h b/src/mongo/util/concurrency/task.h index e3430a6f9b4..123434db3ac 100644 --- a/src/mongo/util/concurrency/task.h +++ b/src/mongo/util/concurrency/task.h @@ -34,52 +34,51 @@ namespace mongo { - namespace task { +namespace task { - /** abstraction around threads. simpler than BackgroundJob which is used behind the scenes. - allocate the Task dynamically. when the thread terminates, the Task object will delete itself. - */ - class Task : private BackgroundJob { - protected: - virtual void setUp(); // Override to perform any do-once work for the task. - virtual void doWork() = 0; // implement the task here. - virtual std::string name() const = 0; // name the thread - public: - Task(); - - /** for a repeating task, stop after current invocation ends. can be called by other threads - as long as the Task is still in scope. - */ - void halt(); - private: - unsigned n, repeat; - friend void fork(Task* t); - friend void repeat(Task* t, unsigned millis); - virtual void run(); - //virtual void ending() { } - void begin(); - }; +/** abstraction around threads. simpler than BackgroundJob which is used behind the scenes. + allocate the Task dynamically. when the thread terminates, the Task object will delete itself. +*/ +class Task : private BackgroundJob { +protected: + virtual void setUp(); // Override to perform any do-once work for the task. + virtual void doWork() = 0; // implement the task here. + virtual std::string name() const = 0; // name the thread +public: + Task(); - /** run once */ - void fork(Task *t); + /** for a repeating task, stop after current invocation ends. can be called by other threads + as long as the Task is still in scope. + */ + void halt(); - /** run doWork() over and over, with a pause between runs of millis */ - void repeat(Task *t, unsigned millis); +private: + unsigned n, repeat; + friend void fork(Task* t); + friend void repeat(Task* t, unsigned millis); + virtual void run(); + // virtual void ending() { } + void begin(); +}; - /*** Example *** - inline void sample() { - class Sample : public Task { - public: - int result; - virtual void doWork() { result = 1234; } - Sample() : result(0) { } - }; - std::shared_ptr<Sample> q( new Sample() ); - fork(q); - cout << q->result << std::endl; // could print 1234 or 0. - } - */ +/** run once */ +void fork(Task* t); - } +/** run doWork() over and over, with a pause between runs of millis */ +void repeat(Task* t, unsigned millis); +/*** Example *** +inline void sample() { + class Sample : public Task { + public: + int result; + virtual void doWork() { result = 1234; } + Sample() : result(0) { } + }; + std::shared_ptr<Sample> q( new Sample() ); + fork(q); + cout << q->result << std::endl; // could print 1234 or 0. +} +*/ +} } diff --git a/src/mongo/util/concurrency/thread_name.cpp b/src/mongo/util/concurrency/thread_name.cpp index c2d8f8f2b9d..8b2b66302b6 100644 --- a/src/mongo/util/concurrency/thread_name.cpp +++ b/src/mongo/util/concurrency/thread_name.cpp @@ -37,50 +37,48 @@ namespace mongo { - using std::string; +using std::string; namespace { - boost::thread_specific_ptr<std::string> threadName; - AtomicInt64 nextUnnamedThreadId{1}; +boost::thread_specific_ptr<std::string> threadName; +AtomicInt64 nextUnnamedThreadId{1}; - // It is unsafe to access threadName before its dynamic initialization has completed. Use - // the execution of mongo initializers (which only happens once we have entered main, and - // therefore after dynamic initialization is complete) to signal that it is safe to use - // 'threadName'. - bool mongoInitializersHaveRun{}; - MONGO_INITIALIZER(ThreadNameInitializer)(InitializerContext*) { - mongoInitializersHaveRun = true; - // The global initializers should only ever be run from main, so setting thread name - // here makes sense. - setThreadName("main"); - return Status::OK(); - } +// It is unsafe to access threadName before its dynamic initialization has completed. Use +// the execution of mongo initializers (which only happens once we have entered main, and +// therefore after dynamic initialization is complete) to signal that it is safe to use +// 'threadName'. +bool mongoInitializersHaveRun{}; +MONGO_INITIALIZER(ThreadNameInitializer)(InitializerContext*) { + mongoInitializersHaveRun = true; + // The global initializers should only ever be run from main, so setting thread name + // here makes sense. + setThreadName("main"); + return Status::OK(); +} } // namespace - void setThreadName(StringData name) { - invariant(mongoInitializersHaveRun); - threadName.reset(new string(name.toString())); - } - - const string& getThreadName() { +void setThreadName(StringData name) { + invariant(mongoInitializersHaveRun); + threadName.reset(new string(name.toString())); +} - if (MONGO_unlikely(!mongoInitializersHaveRun)) { - // 'getThreadName' has been called before dynamic initialization for this - // translation unit has completed, so return a fallback value rather than accessing - // the 'threadName' variable, which requires dynamic initialization. We assume that - // we are in the 'main' thread. - static const std::string kFallback = "main"; - return kFallback; - } +const string& getThreadName() { + if (MONGO_unlikely(!mongoInitializersHaveRun)) { + // 'getThreadName' has been called before dynamic initialization for this + // translation unit has completed, so return a fallback value rather than accessing + // the 'threadName' variable, which requires dynamic initialization. We assume that + // we are in the 'main' thread. + static const std::string kFallback = "main"; + return kFallback; + } - std::string* s; - while (!(s = threadName.get())) { - setThreadName( - std::string(str::stream() << "thread" << nextUnnamedThreadId.fetchAndAdd(1))); - } - return *s; + std::string* s; + while (!(s = threadName.get())) { + setThreadName(std::string(str::stream() << "thread" << nextUnnamedThreadId.fetchAndAdd(1))); } + return *s; +} } // namespace mongo diff --git a/src/mongo/util/concurrency/thread_name.h b/src/mongo/util/concurrency/thread_name.h index 88e7d25bd88..3e05b0529d6 100644 --- a/src/mongo/util/concurrency/thread_name.h +++ b/src/mongo/util/concurrency/thread_name.h @@ -33,15 +33,15 @@ namespace mongo { - /** - * Sets the name of the current thread to "name". - */ - void setThreadName(StringData name); +/** + * Sets the name of the current thread to "name". + */ +void setThreadName(StringData name); - /** - * Retrieves the name of the current thread, as previously set, or "" if no name was previously - * set. - */ - const std::string& getThreadName(); +/** + * Retrieves the name of the current thread, as previously set, or "" if no name was previously + * set. + */ +const std::string& getThreadName(); } // namespace mongo diff --git a/src/mongo/util/concurrency/threadlocal.h b/src/mongo/util/concurrency/threadlocal.h index 4530ab9aa6d..a73088e846e 100644 --- a/src/mongo/util/concurrency/threadlocal.h +++ b/src/mongo/util/concurrency/threadlocal.h @@ -32,163 +32,177 @@ #include "mongo/config.h" -namespace mongo { - - using boost::thread_specific_ptr; - - /* thread local "value" rather than a pointer - good for things which have copy constructors (and the copy constructor is fast enough) - e.g. - ThreadLocalValue<int> myint; - */ - template<class T> - class ThreadLocalValue { - public: - ThreadLocalValue( T def = 0 ) : _default( def ) { } - - T get() const { - T * val = _val.get(); - if ( val ) - return *val; - return _default; - } +namespace mongo { + +using boost::thread_specific_ptr; - void set( const T& i ) { - T *v = _val.get(); - if( v ) { - *v = i; - return; - } - v = new T(i); - _val.reset( v ); +/* thread local "value" rather than a pointer + good for things which have copy constructors (and the copy constructor is fast enough) + e.g. + ThreadLocalValue<int> myint; +*/ +template <class T> +class ThreadLocalValue { +public: + ThreadLocalValue(T def = 0) : _default(def) {} + + T get() const { + T* val = _val.get(); + if (val) + return *val; + return _default; + } + + void set(const T& i) { + T* v = _val.get(); + if (v) { + *v = i; + return; } + v = new T(i); + _val.reset(v); + } - T& getRef() { - T *v = _val.get(); - if( v ) { - return *v; - } - v = new T(_default); - _val.reset( v ); + T& getRef() { + T* v = _val.get(); + if (v) { return *v; } - - private: - boost::thread_specific_ptr<T> _val; - const T _default; - }; - - /* TSP - These macros use intrinsics which are faster than boost::thread_specific_ptr. - However the intrinsics don't free up objects on thread closure. Thus we use - a combination here, with the assumption that reset's are infrequent, so that - get's are fast. - */ + v = new T(_default); + _val.reset(v); + return *v; + } + +private: + boost::thread_specific_ptr<T> _val; + const T _default; +}; + +/* TSP + These macros use intrinsics which are faster than boost::thread_specific_ptr. + However the intrinsics don't free up objects on thread closure. Thus we use + a combination here, with the assumption that reset's are infrequent, so that + get's are fast. +*/ #if defined(MONGO_CONFIG_HAVE___THREAD) || defined(MONGO_CONFIG_HAVE___DECLSPEC_THREAD) - - template< class T > - struct TSP { - boost::thread_specific_ptr<T> tsp; - public: - T* get() const; - void reset(T* v); - T* getMake() { - T *t = get(); - if( t == 0 ) - reset( t = new T() ); - return t; - } - }; -# if defined(MONGO_CONFIG_HAVE___DECLSPEC_THREAD) - -# define TSP_DECLARE(T,p) extern TSP<T> p; +template <class T> +struct TSP { + boost::thread_specific_ptr<T> tsp; + +public: + T* get() const; + void reset(T* v); + T* getMake() { + T* t = get(); + if (t == 0) + reset(t = new T()); + return t; + } +}; + +#if defined(MONGO_CONFIG_HAVE___DECLSPEC_THREAD) + +#define TSP_DECLARE(T, p) extern TSP<T> p; + +#define TSP_DEFINE(T, p) \ + __declspec(thread) T* _##p; \ + TSP<T> p; \ + template <> \ + T* TSP<T>::get() const { \ + return _##p; \ + } \ + void TSP<T>::reset(T* v) { \ + tsp.reset(v); \ + _##p = v; \ + } +#else -# define TSP_DEFINE(T,p) __declspec( thread ) T* _ ## p; \ - TSP<T> p; \ - template<> T* TSP<T>::get() const { return _ ## p; } \ - void TSP<T>::reset(T* v) { \ - tsp.reset(v); \ - _ ## p = v; \ - } -# else - -# define TSP_DECLARE(T,p) \ - extern __thread T* _ ## p; \ - template<> inline T* TSP<T>::get() const { return _ ## p; } \ +#define TSP_DECLARE(T, p) \ + extern __thread T* _##p; \ + template <> \ + inline T* TSP<T>::get() const { \ + return _##p; \ + } \ extern TSP<T> p; -# define TSP_DEFINE(T,p) \ - __thread T* _ ## p; \ - template<> void TSP<T>::reset(T* v) { \ - tsp.reset(v); \ - _ ## p = v; \ - } \ +#define TSP_DEFINE(T, p) \ + __thread T* _##p; \ + template <> \ + void TSP<T>::reset(T* v) { \ + tsp.reset(v); \ + _##p = v; \ + } \ TSP<T> p; -# endif +#endif #elif defined(_POSIX_THREADS) && (_POSIX_THREADS >= 0) - template< class T> - struct TSP { - pthread_key_t _key; - public: - TSP() { - verify( pthread_key_create( &_key, TSP::dodelete ) == 0 ); +template <class T> +struct TSP { + pthread_key_t _key; + +public: + TSP() { + verify(pthread_key_create(&_key, TSP::dodelete) == 0); + } + + ~TSP() { + pthread_key_delete(_key); + } + + static void dodelete(void* x) { + T* t = reinterpret_cast<T*>(x); + delete t; + } + + T* get() const { + return reinterpret_cast<T*>(pthread_getspecific(_key)); + } + + void reset(T* v) { + T* old = get(); + delete old; + verify(pthread_setspecific(_key, v) == 0); + } + + T* getMake() { + T* t = get(); + if (t == 0) { + t = new T(); + reset(t); } + return t; + } +}; - ~TSP() { - pthread_key_delete( _key ); - } +#define TSP_DECLARE(T, p) extern TSP<T> p; - static void dodelete( void* x ) { - T* t = reinterpret_cast<T*>(x); - delete t; - } - - T* get() const { - return reinterpret_cast<T*>( pthread_getspecific( _key ) ); - } - - void reset(T* v) { - T* old = get(); - delete old; - verify( pthread_setspecific( _key, v ) == 0 ); - } - - T* getMake() { - T *t = get(); - if( t == 0 ) { - t = new T(); - reset( t ); - } - return t; - } - }; - -# define TSP_DECLARE(T,p) extern TSP<T> p; - -# define TSP_DEFINE(T,p) TSP<T> p; +#define TSP_DEFINE(T, p) TSP<T> p; #else - template< class T > - struct TSP { - thread_specific_ptr<T> tsp; - public: - T* get() const { return tsp.get(); } - void reset(T* v) { tsp.reset(v); } - T* getMake() { - T *t = get(); - if( t == 0 ) - reset( t = new T() ); - return t; - } - }; - -# define TSP_DECLARE(T,p) extern TSP<T> p; - -# define TSP_DEFINE(T,p) TSP<T> p; +template <class T> +struct TSP { + thread_specific_ptr<T> tsp; + +public: + T* get() const { + return tsp.get(); + } + void reset(T* v) { + tsp.reset(v); + } + T* getMake() { + T* t = get(); + if (t == 0) + reset(t = new T()); + return t; + } +}; + +#define TSP_DECLARE(T, p) extern TSP<T> p; + +#define TSP_DEFINE(T, p) TSP<T> p; #endif - } diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index ca0c2560ef0..17efed9300d 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -36,161 +36,163 @@ namespace mongo { #if defined(__linux__) - namespace { - void _check(int ret) { - if (ret == 0) - return; - int err = errno; - severe() << "error in Ticketholder: " << errnoWithDescription(err); - fassertFailed(28604); - } - } +namespace { +void _check(int ret) { + if (ret == 0) + return; + int err = errno; + severe() << "error in Ticketholder: " << errnoWithDescription(err); + fassertFailed(28604); +} +} - TicketHolder::TicketHolder(int num) - : _outof(num) { - _check(sem_init(&_sem, 0, num)); - } +TicketHolder::TicketHolder(int num) : _outof(num) { + _check(sem_init(&_sem, 0, num)); +} - TicketHolder::~TicketHolder(){ - _check(sem_destroy(&_sem)); - } +TicketHolder::~TicketHolder() { + _check(sem_destroy(&_sem)); +} - bool TicketHolder::tryAcquire() { - while (0 != sem_trywait(&_sem)) { - switch(errno) { - case EAGAIN: return false; - case EINTR: break; - default: _check(-1); - } +bool TicketHolder::tryAcquire() { + while (0 != sem_trywait(&_sem)) { + switch (errno) { + case EAGAIN: + return false; + case EINTR: + break; + default: + _check(-1); } - return true; } + return true; +} - void TicketHolder::waitForTicket() { - while (0 != sem_wait(&_sem)) { - switch(errno) { - case EINTR: break; - default: _check(-1); - } +void TicketHolder::waitForTicket() { + while (0 != sem_wait(&_sem)) { + switch (errno) { + case EINTR: + break; + default: + _check(-1); } } +} - void TicketHolder::release() { - _check(sem_post(&_sem)); - } - - Status TicketHolder::resize(int newSize) { - stdx::lock_guard<stdx::mutex> lk(_resizeMutex); +void TicketHolder::release() { + _check(sem_post(&_sem)); +} - if (newSize < 5) - return Status(ErrorCodes::BadValue, - str::stream() << "Minimum value for semaphore is 5; given " - << newSize); +Status TicketHolder::resize(int newSize) { + stdx::lock_guard<stdx::mutex> lk(_resizeMutex); - if (newSize > SEM_VALUE_MAX) - return Status(ErrorCodes::BadValue, - str::stream() << "Maximum value for semaphore is " - << SEM_VALUE_MAX << "; given " << newSize ); + if (newSize < 5) + return Status(ErrorCodes::BadValue, + str::stream() << "Minimum value for semaphore is 5; given " << newSize); - while (_outof.load() < newSize) { - release(); - _outof.fetchAndAdd(1); - } - - while (_outof.load() > newSize) { - waitForTicket(); - _outof.subtractAndFetch(1); - } + if (newSize > SEM_VALUE_MAX) + return Status(ErrorCodes::BadValue, + str::stream() << "Maximum value for semaphore is " << SEM_VALUE_MAX + << "; given " << newSize); - invariant(_outof.load() == newSize); - return Status::OK(); + while (_outof.load() < newSize) { + release(); + _outof.fetchAndAdd(1); } - int TicketHolder::available() const { - int val = 0; - _check(sem_getvalue(&_sem, &val)); - return val; + while (_outof.load() > newSize) { + waitForTicket(); + _outof.subtractAndFetch(1); } - int TicketHolder::used() const { - return outof() - available(); - } + invariant(_outof.load() == newSize); + return Status::OK(); +} - int TicketHolder::outof() const { - return _outof.load(); - } +int TicketHolder::available() const { + int val = 0; + _check(sem_getvalue(&_sem, &val)); + return val; +} + +int TicketHolder::used() const { + return outof() - available(); +} + +int TicketHolder::outof() const { + return _outof.load(); +} #else - TicketHolder::TicketHolder( int num ) : _outof(num), _num(num) {} +TicketHolder::TicketHolder(int num) : _outof(num), _num(num) {} - TicketHolder::~TicketHolder() = default; +TicketHolder::~TicketHolder() = default; - bool TicketHolder::tryAcquire() { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - return _tryAcquire(); - } +bool TicketHolder::tryAcquire() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _tryAcquire(); +} - void TicketHolder::waitForTicket() { - stdx::unique_lock<stdx::mutex> lk( _mutex ); +void TicketHolder::waitForTicket() { + stdx::unique_lock<stdx::mutex> lk(_mutex); - while( ! _tryAcquire() ) { - _newTicket.wait( lk ); - } + while (!_tryAcquire()) { + _newTicket.wait(lk); } +} - void TicketHolder::release() { - { - stdx::lock_guard<stdx::mutex> lk( _mutex ); - _num++; - } - _newTicket.notify_one(); +void TicketHolder::release() { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _num++; } + _newTicket.notify_one(); +} - Status TicketHolder::resize( int newSize ) { - stdx::lock_guard<stdx::mutex> lk( _mutex ); +Status TicketHolder::resize(int newSize) { + stdx::lock_guard<stdx::mutex> lk(_mutex); - int used = _outof.load() - _num; - if ( used > newSize ) { - std::stringstream ss; - ss << "can't resize since we're using (" << used << ") " - << "more than newSize(" << newSize << ")"; + int used = _outof.load() - _num; + if (used > newSize) { + std::stringstream ss; + ss << "can't resize since we're using (" << used << ") " + << "more than newSize(" << newSize << ")"; - std::string errmsg = ss.str(); - log() << errmsg; - return Status(ErrorCodes::BadValue, errmsg); - } + std::string errmsg = ss.str(); + log() << errmsg; + return Status(ErrorCodes::BadValue, errmsg); + } - _outof.store(newSize); - _num = _outof.load() - used; + _outof.store(newSize); + _num = _outof.load() - used; - // Potentially wasteful, but easier to see is correct - _newTicket.notify_all(); - return Status::OK(); - } + // Potentially wasteful, but easier to see is correct + _newTicket.notify_all(); + return Status::OK(); +} - int TicketHolder::available() const { - return _num; - } +int TicketHolder::available() const { + return _num; +} - int TicketHolder::used() const { - return outof() - _num; - } +int TicketHolder::used() const { + return outof() - _num; +} - int TicketHolder::outof() const { - return _outof.load(); - } +int TicketHolder::outof() const { + return _outof.load(); +} - bool TicketHolder::_tryAcquire(){ - if ( _num <= 0 ) { - if ( _num < 0 ) { - std::cerr << "DISASTER! in TicketHolder" << std::endl; - } - return false; +bool TicketHolder::_tryAcquire() { + if (_num <= 0) { + if (_num < 0) { + std::cerr << "DISASTER! in TicketHolder" << std::endl; } - _num--; - return true; + return false; } + _num--; + return true; +} #endif - } diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 31d8a989fde..6906dafa897 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -37,85 +37,88 @@ namespace mongo { - class TicketHolder { - MONGO_DISALLOW_COPYING(TicketHolder); - public: - explicit TicketHolder(int num); - ~TicketHolder(); +class TicketHolder { + MONGO_DISALLOW_COPYING(TicketHolder); - bool tryAcquire(); +public: + explicit TicketHolder(int num); + ~TicketHolder(); - void waitForTicket(); + bool tryAcquire(); - void release(); + void waitForTicket(); - Status resize(int newSize); + void release(); - int available() const; + Status resize(int newSize); - int used() const; + int available() const; - int outof() const; + int used() const; - private: + int outof() const; + +private: #if defined(__linux__) - mutable sem_t _sem; + mutable sem_t _sem; - // You can read _outof without a lock, but have to hold _resizeMutex to change. - AtomicInt32 _outof; - stdx::mutex _resizeMutex; + // You can read _outof without a lock, but have to hold _resizeMutex to change. + AtomicInt32 _outof; + stdx::mutex _resizeMutex; #else - bool _tryAcquire(); + bool _tryAcquire(); - AtomicInt32 _outof; - int _num; - stdx::mutex _mutex; - stdx::condition_variable _newTicket; + AtomicInt32 _outof; + int _num; + stdx::mutex _mutex; + stdx::condition_variable _newTicket; #endif - }; +}; - class ScopedTicket { - public: +class ScopedTicket { +public: + ScopedTicket(TicketHolder* holder) : _holder(holder) { + _holder->waitForTicket(); + } - ScopedTicket(TicketHolder* holder) : _holder(holder) { - _holder->waitForTicket(); - } + ~ScopedTicket() { + _holder->release(); + } - ~ScopedTicket() { - _holder->release(); - } +private: + TicketHolder* _holder; +}; - private: - TicketHolder* _holder; - }; +class TicketHolderReleaser { + MONGO_DISALLOW_COPYING(TicketHolderReleaser); - class TicketHolderReleaser { - MONGO_DISALLOW_COPYING(TicketHolderReleaser); - public: - TicketHolderReleaser() { - _holder = NULL; - } +public: + TicketHolderReleaser() { + _holder = NULL; + } - explicit TicketHolderReleaser(TicketHolder* holder) { - _holder = holder; - } + explicit TicketHolderReleaser(TicketHolder* holder) { + _holder = holder; + } - ~TicketHolderReleaser() { - if (_holder) { - _holder->release(); - } + ~TicketHolderReleaser() { + if (_holder) { + _holder->release(); } + } - bool hasTicket() const { return _holder != NULL; } + bool hasTicket() const { + return _holder != NULL; + } - void reset(TicketHolder* holder = NULL) { - if (_holder) { - _holder->release(); - } - _holder = holder; + void reset(TicketHolder* holder = NULL) { + if (_holder) { + _holder->release(); } + _holder = holder; + } - private: - TicketHolder * _holder; - }; +private: + TicketHolder* _holder; +}; } diff --git a/src/mongo/util/concurrency/value.h b/src/mongo/util/concurrency/value.h index 5dc15684c8a..4be9c3d14e5 100644 --- a/src/mongo/util/concurrency/value.h +++ b/src/mongo/util/concurrency/value.h @@ -36,41 +36,45 @@ namespace mongo { - // todo: rename this to ThreadSafeString or something - /** there is now one mutex per DiagStr. If you have hundreds or millions of - DiagStrs you'll need to do something different. - */ - class DiagStr { - mutable SpinLock m; - std::string _s; - public: - DiagStr(const DiagStr& r) : _s(r.get()) { } - DiagStr(const std::string& r) : _s(r) { } - DiagStr() { } - bool empty() const { - scoped_spinlock lk(m); - return _s.empty(); - } - std::string get() const { - scoped_spinlock lk(m); - return _s; - } - void set(const char *s) { - scoped_spinlock lk(m); - _s = s; - } - void set(const std::string& s) { - scoped_spinlock lk(m); - _s = s; - } - operator std::string() const { return get(); } - void operator=(const std::string& s) { set(s); } - void operator=(const DiagStr& rhs) { - set( rhs.get() ); - } +// todo: rename this to ThreadSafeString or something +/** there is now one mutex per DiagStr. If you have hundreds or millions of + DiagStrs you'll need to do something different. +*/ +class DiagStr { + mutable SpinLock m; + std::string _s; - // == is not defined. use get() == ... instead. done this way so one thinks about if composing multiple operations - bool operator==(const std::string& s) const; - }; +public: + DiagStr(const DiagStr& r) : _s(r.get()) {} + DiagStr(const std::string& r) : _s(r) {} + DiagStr() {} + bool empty() const { + scoped_spinlock lk(m); + return _s.empty(); + } + std::string get() const { + scoped_spinlock lk(m); + return _s; + } + void set(const char* s) { + scoped_spinlock lk(m); + _s = s; + } + void set(const std::string& s) { + scoped_spinlock lk(m); + _s = s; + } + operator std::string() const { + return get(); + } + void operator=(const std::string& s) { + set(s); + } + void operator=(const DiagStr& rhs) { + set(rhs.get()); + } + // == is not defined. use get() == ... instead. done this way so one thinks about if composing multiple operations + bool operator==(const std::string& s) const; +}; } |