diff options
Diffstat (limited to 'src/mongo/util/queue.h')
-rw-r--r-- | src/mongo/util/queue.h | 272 |
1 files changed, 132 insertions, 140 deletions
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index c6305394e0e..b2475fbebfd 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -39,169 +39,161 @@ namespace mongo { - template <typename T> - size_t _getSizeDefault(const T& t) { - return 1; +template <typename T> +size_t _getSizeDefault(const T& t) { + return 1; +} + +/** + * Simple blocking queue with optional max size (by count or custom sizing function). + * A custom sizing function can optionally be given. By default the getSize function + * returns 1 for each item, resulting in size equaling the number of items queued. + * + * Note that use of this class is deprecated. This class only works with a single consumer and + * a single producer. + */ +template <typename T> +class BlockingQueue { + MONGO_DISALLOW_COPYING(BlockingQueue); + typedef size_t (*getSizeFunc)(const T& t); + +public: + BlockingQueue() + : _maxSize(std::numeric_limits<std::size_t>::max()), + _currentSize(0), + _getSize(&_getSizeDefault) {} + BlockingQueue(size_t size) : _maxSize(size), _currentSize(0), _getSize(&_getSizeDefault) {} + BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _currentSize(0), _getSize(f) {} + + void push(T const& t) { + std::unique_lock<std::mutex> l(_lock); + size_t tSize = _getSize(t); + while (_currentSize + tSize > _maxSize) { + _cvNoLongerFull.wait(l); + } + _queue.push(t); + _currentSize += tSize; + _cvNoLongerEmpty.notify_one(); + } + + bool empty() const { + std::lock_guard<std::mutex> l(_lock); + return _queue.empty(); } /** - * Simple blocking queue with optional max size (by count or custom sizing function). - * A custom sizing function can optionally be given. By default the getSize function - * returns 1 for each item, resulting in size equaling the number of items queued. - * - * Note that use of this class is deprecated. This class only works with a single consumer and - * a single producer. + * The size as measured by the size function. Default to counting each item */ - template<typename T> - class BlockingQueue { - MONGO_DISALLOW_COPYING(BlockingQueue); - typedef size_t (*getSizeFunc)(const T& t); - public: - BlockingQueue() : - _maxSize(std::numeric_limits<std::size_t>::max()), - _currentSize(0), - _getSize(&_getSizeDefault) {} - BlockingQueue(size_t size) : - _maxSize(size), - _currentSize(0), - _getSize(&_getSizeDefault) {} - BlockingQueue(size_t size, getSizeFunc f) : - _maxSize(size), - _currentSize(0), - _getSize(f) {} - - void push(T const& t) { - std::unique_lock<std::mutex> l( _lock ); - size_t tSize = _getSize(t); - while (_currentSize + tSize > _maxSize) { - _cvNoLongerFull.wait( l ); - } - _queue.push( t ); - _currentSize += tSize; - _cvNoLongerEmpty.notify_one(); - } - - bool empty() const { - std::lock_guard<std::mutex> l( _lock ); - return _queue.empty(); - } + size_t size() const { + std::lock_guard<std::mutex> l(_lock); + return _currentSize; + } - /** - * The size as measured by the size function. Default to counting each item - */ - size_t size() const { - std::lock_guard<std::mutex> l( _lock ); - return _currentSize; - } + /** + * The max size for this queue + */ + size_t maxSize() const { + return _maxSize; + } - /** - * The max size for this queue - */ - size_t maxSize() const { - return _maxSize; - } + /** + * The number/count of items in the queue ( _queue.size() ) + */ + size_t count() const { + std::lock_guard<std::mutex> l(_lock); + return _queue.size(); + } - /** - * The number/count of items in the queue ( _queue.size() ) - */ - size_t count() const { - std::lock_guard<std::mutex> l( _lock ); - return _queue.size(); - } + void clear() { + std::lock_guard<std::mutex> l(_lock); + _queue = std::queue<T>(); + _currentSize = 0; + _cvNoLongerFull.notify_one(); + } - void clear() { - std::lock_guard<std::mutex> l(_lock); - _queue = std::queue<T>(); - _currentSize = 0; - _cvNoLongerFull.notify_one(); - } + bool tryPop(T& t) { + std::lock_guard<std::mutex> l(_lock); + if (_queue.empty()) + return false; - bool tryPop( T & t ) { - std::lock_guard<std::mutex> l( _lock ); - if ( _queue.empty() ) - return false; + t = _queue.front(); + _queue.pop(); + _currentSize -= _getSize(t); + _cvNoLongerFull.notify_one(); - t = _queue.front(); - _queue.pop(); - _currentSize -= _getSize(t); - _cvNoLongerFull.notify_one(); + return true; + } - return true; - } + T blockingPop() { + std::unique_lock<std::mutex> l(_lock); + while (_queue.empty()) + _cvNoLongerEmpty.wait(l); - T blockingPop() { + T t = _queue.front(); + _queue.pop(); + _currentSize -= _getSize(t); + _cvNoLongerFull.notify_one(); - std::unique_lock<std::mutex> l( _lock ); - while( _queue.empty() ) - _cvNoLongerEmpty.wait( l ); + return t; + } - T t = _queue.front(); - _queue.pop(); - _currentSize -= _getSize(t); - _cvNoLongerFull.notify_one(); - return t; + /** + * blocks waiting for an object until maxSecondsToWait passes + * if got one, return true and set in t + * otherwise return false and t won't be changed + */ + bool blockingPop(T& t, int maxSecondsToWait) { + using namespace std::chrono; + const auto deadline = system_clock::now() + seconds(maxSecondsToWait); + std::unique_lock<std::mutex> l(_lock); + while (_queue.empty()) { + if (std::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline)) + return false; } + t = _queue.front(); + _queue.pop(); + _currentSize -= _getSize(t); + _cvNoLongerFull.notify_one(); + return true; + } - /** - * blocks waiting for an object until maxSecondsToWait passes - * if got one, return true and set in t - * otherwise return false and t won't be changed - */ - bool blockingPop( T& t , int maxSecondsToWait ) { - using namespace std::chrono; - const auto deadline = system_clock::now() + seconds(maxSecondsToWait); - std::unique_lock<std::mutex> l(_lock); - while(_queue.empty()) { - if (std::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline)) - return false; - } - - t = _queue.front(); - _queue.pop(); - _currentSize -= _getSize(t); - _cvNoLongerFull.notify_one(); - return true; - } - - // Obviously, this should only be used when you have - // only one consumer - bool blockingPeek(T& t, int maxSecondsToWait) { - using namespace std::chrono; - const auto deadline = system_clock::now() + seconds(maxSecondsToWait); - std::unique_lock<std::mutex> l(_lock); - while(_queue.empty()) { - if (std::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline)) - return false; - } - - t = _queue.front(); - return true; + // Obviously, this should only be used when you have + // only one consumer + bool blockingPeek(T& t, int maxSecondsToWait) { + using namespace std::chrono; + const auto deadline = system_clock::now() + seconds(maxSecondsToWait); + std::unique_lock<std::mutex> l(_lock); + while (_queue.empty()) { + if (std::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline)) + return false; } - // Obviously, this should only be used when you have - // only one consumer - bool peek(T& t) { - - std::unique_lock<std::mutex> l( _lock ); - if (_queue.empty()) { - return false; - } + t = _queue.front(); + return true; + } - t = _queue.front(); - return true; + // Obviously, this should only be used when you have + // only one consumer + bool peek(T& t) { + std::unique_lock<std::mutex> l(_lock); + if (_queue.empty()) { + return false; } - private: - mutable std::mutex _lock; - std::queue<T> _queue; - const size_t _maxSize; - size_t _currentSize; - getSizeFunc _getSize; + t = _queue.front(); + return true; + } - std::condition_variable _cvNoLongerFull; - std::condition_variable _cvNoLongerEmpty; - }; +private: + mutable std::mutex _lock; + std::queue<T> _queue; + const size_t _maxSize; + size_t _currentSize; + getSizeFunc _getSize; + std::condition_variable _cvNoLongerFull; + std::condition_variable _cvNoLongerEmpty; +}; } |