summaryrefslogtreecommitdiff
path: root/src/mongo/util/queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/queue.h')
-rw-r--r--src/mongo/util/queue.h36
1 files changed, 18 insertions, 18 deletions
diff --git a/src/mongo/util/queue.h b/src/mongo/util/queue.h
index c3a56d4db21..ec927066172 100644
--- a/src/mongo/util/queue.h
+++ b/src/mongo/util/queue.h
@@ -34,9 +34,9 @@
#include <limits>
#include <queue>
+#include "mongo/platform/condition_variable.h"
+#include "mongo/platform/mutex.h"
#include "mongo/stdx/chrono.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
namespace mongo {
@@ -61,12 +61,12 @@ public:
BlockingQueue(size_t size, GetSizeFn f) : _maxSize(size), _getSize(f) {}
void pushEvenIfFull(T const& t) {
- stdx::unique_lock<stdx::mutex> lk(_lock);
+ stdx::unique_lock<Latch> lk(_lock);
pushImpl_inlock(t, _getSize(t));
}
void push(T const& t) {
- stdx::unique_lock<stdx::mutex> lk(_lock);
+ stdx::unique_lock<Latch> lk(_lock);
_clearing = false;
size_t tSize = _getSize(t);
_waitForSpace_inlock(tSize, lk);
@@ -89,7 +89,7 @@ public:
return;
}
- stdx::unique_lock<stdx::mutex> lk(_lock);
+ stdx::unique_lock<Latch> lk(_lock);
const auto startedEmpty = _queue.empty();
_clearing = false;
@@ -111,12 +111,12 @@ public:
* NOTE: Should only be used in a single producer case.
*/
void waitForSpace(size_t size) {
- stdx::unique_lock<stdx::mutex> lk(_lock);
+ stdx::unique_lock<Latch> lk(_lock);
_waitForSpace_inlock(size, lk);
}
bool empty() const {
- stdx::lock_guard<stdx::mutex> lk(_lock);
+ stdx::lock_guard<Latch> lk(_lock);
return _queue.empty();
}
@@ -124,7 +124,7 @@ public:
* The size as measured by the size function. Default to counting each item
*/
size_t size() const {
- stdx::lock_guard<stdx::mutex> lk(_lock);
+ stdx::lock_guard<Latch> lk(_lock);
return _currentSize;
}
@@ -139,12 +139,12 @@ public:
* The number/count of items in the queue ( _queue.size() )
*/
size_t count() const {
- stdx::lock_guard<stdx::mutex> lk(_lock);
+ stdx::lock_guard<Latch> lk(_lock);
return _queue.size();
}
void clear() {
- stdx::lock_guard<stdx::mutex> lk(_lock);
+ stdx::lock_guard<Latch> lk(_lock);
_clearing = true;
_queue = std::queue<T>();
_currentSize = 0;
@@ -153,7 +153,7 @@ public:
}
bool tryPop(T& t) {
- stdx::lock_guard<stdx::mutex> lk(_lock);
+ stdx::lock_guard<Latch> lk(_lock);
if (_queue.empty())
return false;
@@ -166,7 +166,7 @@ public:
}
T blockingPop() {
- stdx::unique_lock<stdx::mutex> lk(_lock);
+ stdx::unique_lock<Latch> lk(_lock);
_clearing = false;
while (_queue.empty() && !_clearing)
_cvNoLongerEmpty.wait(lk);
@@ -191,7 +191,7 @@ public:
bool blockingPop(T& t, int maxSecondsToWait) {
using namespace stdx::chrono;
const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
- stdx::unique_lock<stdx::mutex> lk(_lock);
+ stdx::unique_lock<Latch> lk(_lock);
_clearing = false;
while (_queue.empty() && !_clearing) {
if (stdx::cv_status::timeout == _cvNoLongerEmpty.wait_until(lk, deadline))
@@ -213,7 +213,7 @@ public:
bool blockingPeek(T& t, int maxSecondsToWait) {
using namespace stdx::chrono;
const auto deadline = system_clock::now() + seconds(maxSecondsToWait);
- stdx::unique_lock<stdx::mutex> lk(_lock);
+ stdx::unique_lock<Latch> lk(_lock);
_clearing = false;
while (_queue.empty() && !_clearing) {
if (stdx::cv_status::timeout == _cvNoLongerEmpty.wait_until(lk, deadline))
@@ -229,7 +229,7 @@ public:
// Obviously, this should only be used when you have
// only one consumer
bool peek(T& t) {
- stdx::unique_lock<stdx::mutex> lk(_lock);
+ stdx::unique_lock<Latch> lk(_lock);
if (_queue.empty()) {
return false;
}
@@ -242,7 +242,7 @@ public:
* Returns the item most recently added to the queue or nothing if the queue is empty.
*/
boost::optional<T> lastObjectPushed() const {
- stdx::unique_lock<stdx::mutex> lk(_lock);
+ stdx::unique_lock<Latch> lk(_lock);
if (_queue.empty()) {
return {};
}
@@ -254,7 +254,7 @@ private:
/**
* Returns when enough space is available.
*/
- void _waitForSpace_inlock(size_t size, stdx::unique_lock<stdx::mutex>& lk) {
+ void _waitForSpace_inlock(size_t size, stdx::unique_lock<Latch>& lk) {
while (_currentSize + size > _maxSize) {
_cvNoLongerFull.wait(lk);
}
@@ -268,7 +268,7 @@ private:
_cvNoLongerEmpty.notify_one();
}
- mutable stdx::mutex _lock;
+ mutable Mutex _lock = MONGO_MAKE_LATCH("BlockingQueue::_lock");
std::queue<T> _queue;
const size_t _maxSize;
size_t _currentSize = 0;