summaryrefslogtreecommitdiff
path: root/src/mongo/util/queue.h
diff options
context:
space:
mode:
authorEric Milkie <milkie@10gen.com>2015-09-02 15:32:15 -0400
committerEric Milkie <milkie@10gen.com>2015-09-03 11:02:27 -0400
commit20e7c1c9e1c138e37b8c3c3009bcdd9c69d92667 (patch)
tree53b64024ba06806f1ddcef8706c4a156dc0308b8 /src/mongo/util/queue.h
parent84a461126d2286d34414bc01b5272791ee37de18 (diff)
downloadmongo-20e7c1c9e1c138e37b8c3c3009bcdd9c69d92667.tar.gz
SERVER-20187 wake up from blockingPeek when entering drain mode
Diffstat (limited to 'src/mongo/util/queue.h')
-rw-r--r--src/mongo/util/queue.h33
1 files changed, 23 insertions, 10 deletions
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h
index f204abb2484..a1b8caac1a0 100644
--- a/src/mongo/util/queue.h
+++ b/src/mongo/util/queue.h
@@ -58,14 +58,13 @@ class BlockingQueue {
public:
BlockingQueue()
- : _maxSize(std::numeric_limits<std::size_t>::max()),
- _currentSize(0),
- _getSize(&_getSizeDefault) {}
- BlockingQueue(size_t size) : _maxSize(size), _currentSize(0), _getSize(&_getSizeDefault) {}
- BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _currentSize(0), _getSize(f) {}
+ : _maxSize(std::numeric_limits<std::size_t>::max()), _getSize(&_getSizeDefault) {}
+ BlockingQueue(size_t size) : _maxSize(size), _getSize(&_getSizeDefault) {}
+ BlockingQueue(size_t size, getSizeFunc f) : _maxSize(size), _getSize(f) {}
void push(T const& t) {
stdx::unique_lock<stdx::mutex> l(_lock);
+ _clearing = false;
size_t tSize = _getSize(t);
while (_currentSize + tSize > _maxSize) {
_cvNoLongerFull.wait(l);
@@ -105,9 +104,11 @@ public:
void clear() {
stdx::lock_guard<stdx::mutex> l(_lock);
+ _clearing = true;
_queue = std::queue<T>();
_currentSize = 0;
_cvNoLongerFull.notify_one();
+ _cvNoLongerEmpty.notify_one();
}
bool tryPop(T& t) {
@@ -125,8 +126,12 @@ public:
T blockingPop() {
stdx::unique_lock<stdx::mutex> l(_lock);
- while (_queue.empty())
+ _clearing = false;
+ while (_queue.empty() && !_clearing)
_cvNoLongerEmpty.wait(l);
+ if (_clearing) {
+ return T{};
+ }
T t = _queue.front();
_queue.pop();
@@ -146,11 +151,15 @@ public:
using namespace stdx::chrono;
const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
stdx::unique_lock<stdx::mutex> l(_lock);
- while (_queue.empty()) {
+ _clearing = false;
+ while (_queue.empty() && !_clearing) {
if (stdx::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline))
return false;
}
+ if (_clearing) {
+ return false;
+ }
t = _queue.front();
_queue.pop();
_currentSize -= _getSize(t);
@@ -164,11 +173,14 @@ public:
using namespace stdx::chrono;
const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
stdx::unique_lock<stdx::mutex> l(_lock);
- while (_queue.empty()) {
+ _clearing = false;
+ while (_queue.empty() && !_clearing) {
if (stdx::cv_status::timeout == _cvNoLongerEmpty.wait_until(l, deadline))
return false;
}
-
+ if (_clearing) {
+ return false;
+ }
t = _queue.front();
return true;
}
@@ -189,8 +201,9 @@ private:
mutable stdx::mutex _lock;
std::queue<T> _queue;
const size_t _maxSize;
- size_t _currentSize;
+ size_t _currentSize = 0;
getSizeFunc _getSize;
+ bool _clearing = false;
stdx::condition_variable _cvNoLongerFull;
stdx::condition_variable _cvNoLongerEmpty;