summaryrefslogtreecommitdiff
path: root/src/mongo/util/queue.h
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2012-04-30 11:26:55 -0400
committerEric Milkie <milkie@10gen.com>2012-04-30 14:43:50 -0400
commitf31d4d9acaee1e11f2ff0f1e84929ffb684ae053 (patch)
tree05584aaff7b9c60d5c746cdbb358c60e9ea23917 /src/mongo/util/queue.h
parent7c50f4320f11865483541c8c092a0b482f5c51fc (diff)
downloadmongo-f31d4d9acaee1e11f2ff0f1e84929ffb684ae053.tar.gz
implement max-size thread-safe queue class
To use, pass a size when you construct the queue. The push() method will block for as long as the queue has more than n elements in the queue.
Diffstat (limited to 'src/mongo/util/queue.h')
-rw-r--r--src/mongo/util/queue.h35
1 files changed, 25 insertions, 10 deletions
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h
index 4223bd6c256..2aaa91cde39 100644
--- a/src/mongo/util/queue.h
+++ b/src/mongo/util/queue.h
@@ -17,25 +17,35 @@
#pragma once
-#include "../pch.h"
+#include "pch.h"
+#include <limits>
#include <queue>
-#include "../util/timer.h"
+#include "mongo/util/timer.h"
namespace mongo {
/**
* simple blocking queue
*/
- template<typename T> class BlockingQueue : boost::noncopyable {
+ template<typename T>
+ class BlockingQueue : boost::noncopyable {
public:
- BlockingQueue() : _lock("BlockingQueue") { }
+ BlockingQueue() :
+ _lock("BlockingQueue"),
+ _size(std::numeric_limits<std::size_t>::max()) { }
+ BlockingQueue(size_t size) :
+ _lock("BlockingQueue(bounded)"),
+ _size(size) { }
void push(T const& t) {
scoped_lock l( _lock );
+ while (_queue.size() >= _size) {
+ _cvNoLongerFull.wait( l.boost() );
+ }
_queue.push( t );
- _condition.notify_one();
+ _cvNoLongerEmpty.notify_one();
}
bool empty() const {
@@ -56,6 +66,7 @@ namespace mongo {
t = _queue.front();
_queue.pop();
+ _cvNoLongerFull.notify_one();
return true;
}
@@ -64,10 +75,12 @@ namespace mongo {
scoped_lock l( _lock );
while( _queue.empty() )
- _condition.wait( l.boost() );
+ _cvNoLongerEmpty.wait( l.boost() );
T t = _queue.front();
_queue.pop();
+ _cvNoLongerFull.notify_one();
+
return t;
}
@@ -87,20 +100,22 @@ namespace mongo {
scoped_lock l( _lock );
while( _queue.empty() ) {
- if ( ! _condition.timed_wait( l.boost() , xt ) )
+ if ( ! _cvNoLongerEmpty.timed_wait( l.boost() , xt ) )
return false;
}
t = _queue.front();
_queue.pop();
+ _cvNoLongerFull.notify_one();
return true;
}
private:
- std::queue<T> _queue;
-
mutable mongo::mutex _lock;
- boost::condition _condition;
+ std::queue<T> _queue;
+ size_t _size;
+ boost::condition _cvNoLongerFull;
+ boost::condition _cvNoLongerEmpty;
};
}