diff options
author | Eric Milkie <milkie@10gen.com> | 2012-04-30 11:26:55 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2012-04-30 14:43:50 -0400 |
commit | f31d4d9acaee1e11f2ff0f1e84929ffb684ae053 (patch) | |
tree | 05584aaff7b9c60d5c746cdbb358c60e9ea23917 /src/mongo/util/queue.h | |
parent | 7c50f4320f11865483541c8c092a0b482f5c51fc (diff) | |
download | mongo-f31d4d9acaee1e11f2ff0f1e84929ffb684ae053.tar.gz |
implement max-size thread-safe queue class
To use, pass a size when you construct the queue. The push() method
will block for as long as the queue has more than n elements in the queue.
Diffstat (limited to 'src/mongo/util/queue.h')
-rw-r--r-- | src/mongo/util/queue.h | 35 |
1 files changed, 25 insertions, 10 deletions
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index 4223bd6c256..2aaa91cde39 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -17,25 +17,35 @@ #pragma once -#include "../pch.h" +#include "pch.h" +#include <limits> #include <queue> -#include "../util/timer.h" +#include "mongo/util/timer.h" namespace mongo { /** * simple blocking queue */ - template<typename T> class BlockingQueue : boost::noncopyable { + template<typename T> + class BlockingQueue : boost::noncopyable { public: - BlockingQueue() : _lock("BlockingQueue") { } + BlockingQueue() : + _lock("BlockingQueue"), + _size(std::numeric_limits<std::size_t>::max()) { } + BlockingQueue(size_t size) : + _lock("BlockingQueue(bounded)"), + _size(size) { } void push(T const& t) { scoped_lock l( _lock ); + while (_queue.size() >= _size) { + _cvNoLongerFull.wait( l.boost() ); + } _queue.push( t ); - _condition.notify_one(); + _cvNoLongerEmpty.notify_one(); } bool empty() const { @@ -56,6 +66,7 @@ namespace mongo { t = _queue.front(); _queue.pop(); + _cvNoLongerFull.notify_one(); return true; } @@ -64,10 +75,12 @@ namespace mongo { scoped_lock l( _lock ); while( _queue.empty() ) - _condition.wait( l.boost() ); + _cvNoLongerEmpty.wait( l.boost() ); T t = _queue.front(); _queue.pop(); + _cvNoLongerFull.notify_one(); + return t; } @@ -87,20 +100,22 @@ namespace mongo { scoped_lock l( _lock ); while( _queue.empty() ) { - if ( ! _condition.timed_wait( l.boost() , xt ) ) + if ( ! _cvNoLongerEmpty.timed_wait( l.boost() , xt ) ) return false; } t = _queue.front(); _queue.pop(); + _cvNoLongerFull.notify_one(); return true; } private: - std::queue<T> _queue; - mutable mongo::mutex _lock; - boost::condition _condition; + std::queue<T> _queue; + size_t _size; + boost::condition _cvNoLongerFull; + boost::condition _cvNoLongerEmpty; }; } |