summaryrefslogtreecommitdiff
path: root/src/mongo/util/producer_consumer_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/producer_consumer_queue.h')
-rw-r--r--src/mongo/util/producer_consumer_queue.h36
1 files changed, 17 insertions, 19 deletions
diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h
index 05b39eff7db..44a87f93aec 100644
--- a/src/mongo/util/producer_consumer_queue.h
+++ b/src/mongo/util/producer_consumer_queue.h
@@ -35,8 +35,8 @@
#include <numeric>
#include "mongo/db/operation_context.h"
-#include "mongo/stdx/condition_variable.h"
-#include "mongo/stdx/mutex.h"
+#include "mongo/platform/condition_variable.h"
+#include "mongo/platform/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
#include "mongo/util/interruptible.h"
#include "mongo/util/scopeguard.h"
@@ -468,7 +468,7 @@ public:
//
// Leaves T unchanged if an interrupt exception is thrown while waiting for space
void push(T&& t, Interruptible* interruptible = Interruptible::notInterruptible()) {
- _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
+ _pushRunner([&](stdx::unique_lock<Latch>& lk) {
auto cost = _invokeCostFunc(t, lk);
uassert(ErrorCodes::ProducerConsumerQueueBatchTooLarge,
str::stream() << "cost of item (" << cost
@@ -496,7 +496,7 @@ public:
void pushMany(StartIterator start,
EndIterator last,
Interruptible* interruptible = Interruptible::notInterruptible()) {
- return _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
+ return _pushRunner([&](stdx::unique_lock<Latch>& lk) {
size_t cost = 0;
for (auto iter = start; iter != last; ++iter) {
cost += _invokeCostFunc(*iter, lk);
@@ -521,12 +521,12 @@ public:
// Leaves T unchanged if it fails
bool tryPush(T&& t) {
return _pushRunner(
- [&](stdx::unique_lock<stdx::mutex>& lk) { return _tryPush(lk, std::move(t)); });
+ [&](stdx::unique_lock<Latch>& lk) { return _tryPush(lk, std::move(t)); });
}
// Pops one T out of the queue
T pop(Interruptible* interruptible = Interruptible::notInterruptible()) {
- return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
+ return _popRunner([&](stdx::unique_lock<Latch>& lk) {
_waitForNonEmpty(lk, interruptible);
return _pop(lk);
});
@@ -538,7 +538,7 @@ public:
// Returns the popped values, along with the cost value of the items extracted
std::pair<std::deque<T>, size_t> popMany(
Interruptible* interruptible = Interruptible::notInterruptible()) {
- return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
+ return _popRunner([&](stdx::unique_lock<Latch>& lk) {
_waitForNonEmpty(lk, interruptible);
return std::make_pair(std::exchange(_queue, {}), std::exchange(_current, 0));
});
@@ -554,7 +554,7 @@ public:
//
std::pair<std::deque<T>, size_t> popManyUpTo(
size_t budget, Interruptible* interruptible = Interruptible::notInterruptible()) {
- return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
+ return _popRunner([&](stdx::unique_lock<Latch>& lk) {
_waitForNonEmpty(lk, interruptible);
if (_current <= budget) {
@@ -584,13 +584,13 @@ public:
// Attempts a non-blocking pop of a value
boost::optional<T> tryPop() {
- return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) { return _tryPop(lk); });
+ return _popRunner([&](stdx::unique_lock<Latch>& lk) { return _tryPop(lk); });
}
// Closes the producer end. Consumers will continue to consume until the queue is exhausted, at
// which time they will begin to throw with an interruption dbexception
void closeProducerEnd() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_producerEndClosed = true;
@@ -599,7 +599,7 @@ public:
// Closes the consumer end. This causes all callers to throw with an interruption dbexception
void closeConsumerEnd() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
_consumerEndClosed = true;
_producerEndClosed = true;
@@ -608,7 +608,7 @@ public:
}
Stats getStats() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
+ stdx::lock_guard<Latch> lk(_mutex);
Stats stats;
stats.queueDepth = _current;
stats.waitingConsumers = _consumers;
@@ -804,7 +804,7 @@ private:
template <typename Callback>
auto _pushRunner(Callback&& cb) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_checkProducerClosed(lk);
@@ -815,7 +815,7 @@ private:
template <typename Callback>
auto _popRunner(Callback&& cb) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
+ stdx::unique_lock<Latch> lk(_mutex);
_checkConsumerClosed(lk);
@@ -866,9 +866,7 @@ private:
return t;
}
- void _waitForSpace(stdx::unique_lock<stdx::mutex>& lk,
- size_t cost,
- Interruptible* interruptible) {
+ void _waitForSpace(stdx::unique_lock<Latch>& lk, size_t cost, Interruptible* interruptible) {
// We do some pre-flight checks to avoid creating a cv if we don't need one
_checkProducerClosed(lk);
@@ -885,7 +883,7 @@ private:
});
}
- void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk, Interruptible* interruptible) {
+ void _waitForNonEmpty(stdx::unique_lock<Latch>& lk, Interruptible* interruptible) {
typename Consumers::Waiter waiter(_consumers);
interruptible->waitForConditionOrInterrupt(_consumers.cv(), lk, [&] {
@@ -894,7 +892,7 @@ private:
});
}
- mutable stdx::mutex _mutex;
+ mutable Mutex _mutex = MONGO_MAKE_LATCH("ProducerConsumerQueue::_mutex");
Options _options;