diff options
author | Eric Milkie <milkie@10gen.com> | 2015-09-02 15:32:15 -0400 |
---|---|---|
committer | Eric Milkie <milkie@10gen.com> | 2015-09-03 11:02:27 -0400 |
commit | 20e7c1c9e1c138e37b8c3c3009bcdd9c69d92667 (patch) | |
tree | 53b64024ba06806f1ddcef8706c4a156dc0308b8 /src/mongo/util/queue.h | |
parent | 84a461126d2286d34414bc01b5272791ee37de18 (diff) | |
download | mongo-20e7c1c9e1c138e37b8c3c3009bcdd9c69d92667.tar.gz |
SERVER-20187 wake up from blockingPeek when entering drain mode
Diffstat (limited to 'src/mongo/util/queue.h')
-rw-r--r-- | src/mongo/util/queue.h | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h index f204abb2484..a1b8caac1a0 100644 --- a/src/mongo/util/queue.h +++ b/src/mongo/util/queue.h @@ -58,14 +58,13 @@ class BlockingQueue { public: BlockingQueue() - : _maxSize(std::numeric_limits<std::size_t>::max()), - _currentSize(0), - _getSize(&_getSizeDefault) {} - BlockingQueue(size_t size) : _maxSize(size), _currentSize(0), _getSize(&_getSizeDefault) {} - BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _currentSize(0), _getSize(f) {} + : _maxSize(std::numeric_limits<std::size_t>::max()), _getSize(&_getSizeDefault) {} + BlockingQueue(size_t size) : _maxSize(size), _getSize(&_getSizeDefault) {} + BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _getSize(f) {} void push(T const& t) { stdx::unique_lock<stdx::mutex> l(_lock); + _clearing = false; size_t tSize = _getSize(t); while (_currentSize + tSize > _maxSize) { _cvNoLongerFull.wait(l); @@ -105,9 +104,11 @@ public: void clear() { stdx::lock_guard<stdx::mutex> l(_lock); + _clearing = true; _queue = std::queue<T>(); _currentSize = 0; _cvNoLongerFull.notify_one(); + _cvNoLongerEmpty.notify_one(); } bool tryPop(T& t) { @@ -125,8 +126,12 @@ public: T blockingPop() { stdx::unique_lock<stdx::mutex> l(_lock); - while (_queue.empty()) + _clearing = false; + while (_queue.empty() && !_clearing) _cvNoLongerEmpty.wait(l); + if (_clearing) { + return T{}; + } T t = _queue.front(); _queue.pop(); @@ -146,11 +151,15 @@ public: using namespace stdx::chrono; const auto deadline = system_clock::now() + seconds(maxSecondsToWait); stdx::unique_lock<stdx::mutex> l(_lock); - while (_queue.empty()) { + _clearing = false; + while (_queue.empty() && !_clearing) { if (stdx::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline)) return false; } + if (_clearing) { + return false; + } t = _queue.front(); _queue.pop(); _currentSize -= _getSize(t); @@ -164,11 +173,14 @@ public: using namespace stdx::chrono; const auto deadline = system_clock::now() + seconds(maxSecondsToWait); stdx::unique_lock<stdx::mutex> l(_lock); - while (_queue.empty()) { + _clearing = false; + while (_queue.empty() && !_clearing) { if (stdx::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline)) return false; } - + if (_clearing) { + return false; + } t = _queue.front(); return true; } @@ -189,8 +201,9 @@ private: mutable stdx::mutex _lock; std::queue<T> _queue; const size_t _maxSize; - size_t _currentSize; + size_t _currentSize = 0; getSizeFunc _getSize; + bool _clearing = false; stdx::condition_variable _cvNoLongerFull; stdx::condition_variable _cvNoLongerEmpty; |