summaryrefslogtreecommitdiff
path: root/src/mongo/util/queue.h
diff options
context:
space:
mode:
authorKristina <kristina@10gen.com>2012-05-16 15:50:24 -0400
committerKristina <kristina@10gen.com>2012-05-16 17:08:46 -0400
commit2fb9e3407939acdead413af39605e176172daf58 (patch)
treede688fa6186596e849855193291ada9dfcecfe6f /src/mongo/util/queue.h
parent39629ff51d5f2b2361328681e63b742e601f307a (diff)
downloadmongo-2fb9e3407939acdead413af39605e176172daf58.tar.gz
Make BlockingQueue take an optional sizing function and add peek and clear methods
Diffstat (limited to 'src/mongo/util/queue.h')
-rw-r--r--src/mongo/util/queue.h68
1 files changed, 57 insertions, 11 deletions
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h
index 2aaa91cde39..348dc45fdff 100644
--- a/src/mongo/util/queue.h
+++ b/src/mongo/util/queue.h
@@ -26,25 +26,41 @@
namespace mongo {
+ template <typename T>
+ size_t _getSizeDefault(const T& t) {
+ return 1;
+ }
+
/**
- * simple blocking queue
+ * Simple blocking queue with optional max size.
+ * A custom sizing function can optionally be given. By default, size is calculated as
+ * _queue.size().
*/
- template<typename T>
+ template<typename T>
class BlockingQueue : boost::noncopyable {
+ typedef size_t (*getSize_t)(const T& t);
public:
- BlockingQueue() :
- _lock("BlockingQueue"),
- _size(std::numeric_limits<std::size_t>::max()) { }
- BlockingQueue(size_t size) :
- _lock("BlockingQueue(bounded)"),
- _size(size) { }
+ BlockingQueue() :
+ _lock("BlockingQueue"),
+ _maxSize(std::numeric_limits<std::size_t>::max()),
+ _getSize(&_getSizeDefault) {}
+ BlockingQueue(size_t size) :
+ _lock("BlockingQueue(bounded)"),
+ _maxSize(size),
+ _getSize(&_getSizeDefault) {}
+ BlockingQueue(size_t size, getSize_t f) :
+ _lock("BlockingQueue(custom size)"),
+ _maxSize(size),
+ _getSize(f) {}
void push(T const& t) {
scoped_lock l( _lock );
- while (_queue.size() >= _size) {
+ size_t tSize = _getSize(t);
+ while (_queue.size()+tSize >= _maxSize) {
_cvNoLongerFull.wait( l.boost() );
}
_queue.push( t );
+ _currentSize += tSize;
_cvNoLongerEmpty.notify_one();
}
@@ -55,9 +71,16 @@ namespace mongo {
size_t size() const {
scoped_lock l( _lock );
- return _queue.size();
+ return _currentSize;
}
+ void clear() {
+ scoped_lock l(_lock);
+ while (!_queue.empty()) {
+ _queue.pop();
+ }
+ _currentSize = 0;
+ }
bool tryPop( T & t ) {
scoped_lock l( _lock );
@@ -66,6 +89,7 @@ namespace mongo {
t = _queue.front();
_queue.pop();
+ _currentSize -= _getSize(t);
_cvNoLongerFull.notify_one();
return true;
@@ -79,6 +103,7 @@ namespace mongo {
T t = _queue.front();
_queue.pop();
+ _currentSize -= _getSize(t);
_cvNoLongerFull.notify_one();
return t;
@@ -106,14 +131,35 @@ namespace mongo {
t = _queue.front();
_queue.pop();
+ _currentSize -= _getSize(t);
_cvNoLongerFull.notify_one();
return true;
}
+ bool blockingPeek(T& t, int maxSecondsToWait) {
+ Timer timer;
+
+ boost::xtime xt;
+ boost::xtime_get(&xt, boost::TIME_UTC);
+ xt.sec += maxSecondsToWait;
+
+ scoped_lock l( _lock );
+ while( _queue.empty() ) {
+ if ( ! _cvNoLongerEmpty.timed_wait( l.boost() , xt ) )
+ return false;
+ }
+
+ t = _queue.front();
+ return true;
+ }
+
private:
mutable mongo::mutex _lock;
std::queue<T> _queue;
- size_t _size;
+ const size_t _maxSize;
+ size_t _currentSize;
+ getSize_t _getSize;
+
boost::condition _cvNoLongerFull;
boost::condition _cvNoLongerEmpty;
};