diff options
author | Kristina <kristina@10gen.com> | 2012-05-16 15:50:24 -0400 |
---|---|---|
committer | Kristina <kristina@10gen.com> | 2012-05-16 17:08:46 -0400 |
commit | 2fb9e3407939acdead413af39605e176172daf58 (patch) | |
tree | de688fa6186596e849855193291ada9dfcecfe6f /src/mongo/util/queue.h | |
parent | 39629ff51d5f2b2361328681e63b742e601f307a (diff) | |
download | mongo-2fb9e3407939acdead413af39605e176172daf58.tar.gz |
Make BlockingQueue take an optional sizing function and add peek and clear methods
Diffstat (limited to 'src/mongo/util/queue.h')
-rw-r--r-- | src/mongo/util/queue.h | 68 |
1 files changed, 57 insertions, 11 deletions
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index 2aaa91cde39..348dc45fdff 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -26,25 +26,41 @@ namespace mongo { + template <typename T> + size_t _getSizeDefault(const T& t) { + return 1; + } + /** - * simple blocking queue + * Simple blocking queue with optional max size. + * A custom sizing function can optionally be given. By default, size is calculated as + * _queue.size(). */ - template<typename T> + template<typename T> class BlockingQueue : boost::noncopyable { + typedef size_t (*getSize_t)(const T& t); public: - BlockingQueue() : - _lock("BlockingQueue"), - _size(std::numeric_limits<std::size_t>::max()) { } - BlockingQueue(size_t size) : - _lock("BlockingQueue(bounded)"), - _size(size) { } + BlockingQueue() : + _lock("BlockingQueue"), + _maxSize(std::numeric_limits<std::size_t>::max()), + _getSize(&_getSizeDefault) {} + BlockingQueue(size_t size) : + _lock("BlockingQueue(bounded)"), + _maxSize(size), + _getSize(&_getSizeDefault) {} + BlockingQueue(size_t size, getSize_t f) : + _lock("BlockingQueue(custom size)"), + _maxSize(size), + _getSize(f) {} void push(T const& t) { scoped_lock l( _lock ); - while (_queue.size() >= _size) { + size_t tSize = _getSize(t); + while (_queue.size()+tSize >= _maxSize) { _cvNoLongerFull.wait( l.boost() ); } _queue.push( t ); + _currentSize += tSize; _cvNoLongerEmpty.notify_one(); } @@ -55,9 +71,16 @@ namespace mongo { size_t size() const { scoped_lock l( _lock ); - return _queue.size(); + return _currentSize; } + void clear() { + scoped_lock l(_lock); + while (!_queue.empty()) { + _queue.pop(); + } + _currentSize = 0; + } bool tryPop( T & t ) { scoped_lock l( _lock ); @@ -66,6 +89,7 @@ namespace mongo { t = _queue.front(); _queue.pop(); + _currentSize -= _getSize(t); _cvNoLongerFull.notify_one(); return true; @@ -79,6 +103,7 @@ namespace mongo { T t = _queue.front(); _queue.pop(); + _currentSize -= _getSize(t); _cvNoLongerFull.notify_one(); return t; @@ -106,14 +131,35 @@ namespace mongo { t = _queue.front(); _queue.pop(); + _currentSize -= _getSize(t); _cvNoLongerFull.notify_one(); return true; } + bool blockingPeek(T& t, int maxSecondsToWait) { + Timer timer; + + boost::xtime xt; + boost::xtime_get(&xt, boost::TIME_UTC); + xt.sec += maxSecondsToWait; + + scoped_lock l( _lock ); + while( _queue.empty() ) { + if ( ! _cvNoLongerEmpty.timed_wait( l.boost() , xt ) ) + return false; + } + + t = _queue.front(); + return true; + } + private: mutable mongo::mutex _lock; std::queue<T> _queue; - size_t _size; + const size_t _maxSize; + size_t _currentSize; + getSize_t _getSize; + boost::condition _cvNoLongerFull; boost::condition _cvNoLongerEmpty; }; |