summaryrefslogtreecommitdiff
path: root/src/mongo/util/queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/queue.h')
-rw-r--r--src/mongo/util/queue.h272
1 files changed, 132 insertions, 140 deletions
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h
index c6305394e0e..b2475fbebfd 100644
--- a/src/mongo/util/queue.h
+++ b/src/mongo/util/queue.h
@@ -39,169 +39,161 @@
namespace mongo {
- template <typename T>
- size_t _getSizeDefault(const T& t) {
- return 1;
+template <typename T>
+size_t _getSizeDefault(const T& t) {
+ return 1;
+}
+
+/**
+ * Simple blocking queue with optional max size (by count or custom sizing function).
+ * A custom sizing function can optionally be given. By default the getSize function
+ * returns 1 for each item, resulting in size equaling the number of items queued.
+ *
+ * Note that use of this class is deprecated. This class only works with a single consumer and
+ * a single producer.
+ */
+template <typename T>
+class BlockingQueue {
+ MONGO_DISALLOW_COPYING(BlockingQueue);
+ typedef size_t (*getSizeFunc)(const T& t);
+
+public:
+ BlockingQueue()
+ : _maxSize(std::numeric_limits<std::size_t>::max()),
+ _currentSize(0),
+ _getSize(&_getSizeDefault) {}
+ BlockingQueue(size_t size) : _maxSize(size), _currentSize(0), _getSize(&_getSizeDefault) {}
+ BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _currentSize(0), _getSize(f) {}
+
+ void push(T const& t) {
+ std::unique_lock<std::mutex> l(_lock);
+ size_t tSize = _getSize(t);
+ while (_currentSize + tSize > _maxSize) {
+ _cvNoLongerFull.wait(l);
+ }
+ _queue.push(t);
+ _currentSize += tSize;
+ _cvNoLongerEmpty.notify_one();
+ }
+
+ bool empty() const {
+ std::lock_guard<std::mutex> l(_lock);
+ return _queue.empty();
}
/**
- * Simple blocking queue with optional max size (by count or custom sizing function).
- * A custom sizing function can optionally be given. By default the getSize function
- * returns 1 for each item, resulting in size equaling the number of items queued.
- *
- * Note that use of this class is deprecated. This class only works with a single consumer and
- * a single producer.
+ * The size as measured by the size function. Default to counting each item
*/
- template<typename T>
- class BlockingQueue {
- MONGO_DISALLOW_COPYING(BlockingQueue);
- typedef size_t (*getSizeFunc)(const T& t);
- public:
- BlockingQueue() :
- _maxSize(std::numeric_limits<std::size_t>::max()),
- _currentSize(0),
- _getSize(&_getSizeDefault) {}
- BlockingQueue(size_t size) :
- _maxSize(size),
- _currentSize(0),
- _getSize(&_getSizeDefault) {}
- BlockingQueue(size_t size, getSizeFunc f) :
- _maxSize(size),
- _currentSize(0),
- _getSize(f) {}
-
- void push(T const& t) {
- std::unique_lock<std::mutex> l( _lock );
- size_t tSize = _getSize(t);
- while (_currentSize + tSize > _maxSize) {
- _cvNoLongerFull.wait( l );
- }
- _queue.push( t );
- _currentSize += tSize;
- _cvNoLongerEmpty.notify_one();
- }
-
- bool empty() const {
- std::lock_guard<std::mutex> l( _lock );
- return _queue.empty();
- }
+ size_t size() const {
+ std::lock_guard<std::mutex> l(_lock);
+ return _currentSize;
+ }
- /**
- * The size as measured by the size function. Default to counting each item
- */
- size_t size() const {
- std::lock_guard<std::mutex> l( _lock );
- return _currentSize;
- }
+ /**
+ * The max size for this queue
+ */
+ size_t maxSize() const {
+ return _maxSize;
+ }
- /**
- * The max size for this queue
- */
- size_t maxSize() const {
- return _maxSize;
- }
+ /**
+ * The number/count of items in the queue ( _queue.size() )
+ */
+ size_t count() const {
+ std::lock_guard<std::mutex> l(_lock);
+ return _queue.size();
+ }
- /**
- * The number/count of items in the queue ( _queue.size() )
- */
- size_t count() const {
- std::lock_guard<std::mutex> l( _lock );
- return _queue.size();
- }
+ void clear() {
+ std::lock_guard<std::mutex> l(_lock);
+ _queue = std::queue<T>();
+ _currentSize = 0;
+ _cvNoLongerFull.notify_one();
+ }
- void clear() {
- std::lock_guard<std::mutex> l(_lock);
- _queue = std::queue<T>();
- _currentSize = 0;
- _cvNoLongerFull.notify_one();
- }
+ bool tryPop(T& t) {
+ std::lock_guard<std::mutex> l(_lock);
+ if (_queue.empty())
+ return false;
- bool tryPop( T & t ) {
- std::lock_guard<std::mutex> l( _lock );
- if ( _queue.empty() )
- return false;
+ t = _queue.front();
+ _queue.pop();
+ _currentSize -= _getSize(t);
+ _cvNoLongerFull.notify_one();
- t = _queue.front();
- _queue.pop();
- _currentSize -= _getSize(t);
- _cvNoLongerFull.notify_one();
+ return true;
+ }
- return true;
- }
+ T blockingPop() {
+ std::unique_lock<std::mutex> l(_lock);
+ while (_queue.empty())
+ _cvNoLongerEmpty.wait(l);
- T blockingPop() {
+ T t = _queue.front();
+ _queue.pop();
+ _currentSize -= _getSize(t);
+ _cvNoLongerFull.notify_one();
- std::unique_lock<std::mutex> l( _lock );
- while( _queue.empty() )
- _cvNoLongerEmpty.wait( l );
+ return t;
+ }
- T t = _queue.front();
- _queue.pop();
- _currentSize -= _getSize(t);
- _cvNoLongerFull.notify_one();
- return t;
+ /**
+ * blocks waiting for an object until maxSecondsToWait passes
+ * if got one, return true and set in t
+ * otherwise return false and t won't be changed
+ */
+ bool blockingPop(T& t, int maxSecondsToWait) {
+ using namespace std::chrono;
+ const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
+ std::unique_lock<std::mutex> l(_lock);
+ while (_queue.empty()) {
+ if (std::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline))
+ return false;
}
+ t = _queue.front();
+ _queue.pop();
+ _currentSize -= _getSize(t);
+ _cvNoLongerFull.notify_one();
+ return true;
+ }
- /**
- * blocks waiting for an object until maxSecondsToWait passes
- * if got one, return true and set in t
- * otherwise return false and t won't be changed
- */
- bool blockingPop( T& t , int maxSecondsToWait ) {
- using namespace std::chrono;
- const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
- std::unique_lock<std::mutex> l(_lock);
- while(_queue.empty()) {
- if (std::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline))
- return false;
- }
-
- t = _queue.front();
- _queue.pop();
- _currentSize -= _getSize(t);
- _cvNoLongerFull.notify_one();
- return true;
- }
-
- // Obviously, this should only be used when you have
- // only one consumer
- bool blockingPeek(T& t, int maxSecondsToWait) {
- using namespace std::chrono;
- const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
- std::unique_lock<std::mutex> l(_lock);
- while(_queue.empty()) {
- if (std::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline))
- return false;
- }
-
- t = _queue.front();
- return true;
+ // Obviously, this should only be used when you have
+ // only one consumer
+ bool blockingPeek(T& t, int maxSecondsToWait) {
+ using namespace std::chrono;
+ const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
+ std::unique_lock<std::mutex> l(_lock);
+ while (_queue.empty()) {
+ if (std::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline))
+ return false;
}
- // Obviously, this should only be used when you have
- // only one consumer
- bool peek(T& t) {
-
- std::unique_lock<std::mutex> l( _lock );
- if (_queue.empty()) {
- return false;
- }
+ t = _queue.front();
+ return true;
+ }
- t = _queue.front();
- return true;
+ // Obviously, this should only be used when you have
+ // only one consumer
+ bool peek(T& t) {
+ std::unique_lock<std::mutex> l(_lock);
+ if (_queue.empty()) {
+ return false;
}
- private:
- mutable std::mutex _lock;
- std::queue<T> _queue;
- const size_t _maxSize;
- size_t _currentSize;
- getSizeFunc _getSize;
+ t = _queue.front();
+ return true;
+ }
- std::condition_variable _cvNoLongerFull;
- std::condition_variable _cvNoLongerEmpty;
- };
+private:
+ mutable std::mutex _lock;
+ std::queue<T> _queue;
+ const size_t _maxSize;
+ size_t _currentSize;
+ getSizeFunc _getSize;
+ std::condition_variable _cvNoLongerFull;
+ std::condition_variable _cvNoLongerEmpty;
+};
}