diff options
Diffstat (limited to 'src/mongo/util/producer_consumer_queue.h')
-rw-r--r-- | src/mongo/util/producer_consumer_queue.h | 36 |
1 files changed, 17 insertions, 19 deletions
diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h index 05b39eff7db..44a87f93aec 100644 --- a/src/mongo/util/producer_consumer_queue.h +++ b/src/mongo/util/producer_consumer_queue.h @@ -35,8 +35,8 @@ #include <numeric> #include "mongo/db/operation_context.h" -#include "mongo/stdx/condition_variable.h" -#include "mongo/stdx/mutex.h" +#include "mongo/platform/condition_variable.h" +#include "mongo/platform/mutex.h" #include "mongo/util/concurrency/with_lock.h" #include "mongo/util/interruptible.h" #include "mongo/util/scopeguard.h" @@ -468,7 +468,7 @@ public: // // Leaves T unchanged if an interrupt exception is thrown while waiting for space void push(T&& t, Interruptible* interruptible = Interruptible::notInterruptible()) { - _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) { + _pushRunner([&](stdx::unique_lock<Latch>& lk) { auto cost = _invokeCostFunc(t, lk); uassert(ErrorCodes::ProducerConsumerQueueBatchTooLarge, str::stream() << "cost of item (" << cost @@ -496,7 +496,7 @@ public: void pushMany(StartIterator start, EndIterator last, Interruptible* interruptible = Interruptible::notInterruptible()) { - return _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) { + return _pushRunner([&](stdx::unique_lock<Latch>& lk) { size_t cost = 0; for (auto iter = start; iter != last; ++iter) { cost += _invokeCostFunc(*iter, lk); @@ -521,12 +521,12 @@ public: // Leaves T unchanged if it fails bool tryPush(T&& t) { return _pushRunner( - [&](stdx::unique_lock<stdx::mutex>& lk) { return _tryPush(lk, std::move(t)); }); + [&](stdx::unique_lock<Latch>& lk) { return _tryPush(lk, std::move(t)); }); } // Pops one T out of the queue T pop(Interruptible* interruptible = Interruptible::notInterruptible()) { - return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) { + return _popRunner([&](stdx::unique_lock<Latch>& lk) { _waitForNonEmpty(lk, interruptible); return _pop(lk); }); @@ -538,7 +538,7 @@ public: // Returns the popped values, along with the cost value of the items extracted std::pair<std::deque<T>, size_t> popMany( Interruptible* interruptible = Interruptible::notInterruptible()) { - return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) { + return _popRunner([&](stdx::unique_lock<Latch>& lk) { _waitForNonEmpty(lk, interruptible); return std::make_pair(std::exchange(_queue, {}), std::exchange(_current, 0)); }); @@ -554,7 +554,7 @@ public: // std::pair<std::deque<T>, size_t> popManyUpTo( size_t budget, Interruptible* interruptible = Interruptible::notInterruptible()) { - return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) { + return _popRunner([&](stdx::unique_lock<Latch>& lk) { _waitForNonEmpty(lk, interruptible); if (_current <= budget) { @@ -584,13 +584,13 @@ public: // Attempts a non-blocking pop of a value boost::optional<T> tryPop() { - return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) { return _tryPop(lk); }); + return _popRunner([&](stdx::unique_lock<Latch>& lk) { return _tryPop(lk); }); } // Closes the producer end. Consumers will continue to consume until the queue is exhausted, at // which time they will begin to throw with an interruption dbexception void closeProducerEnd() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _producerEndClosed = true; @@ -599,7 +599,7 @@ public: // Closes the consumer end. This causes all callers to throw with an interruption dbexception void closeConsumerEnd() { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); _consumerEndClosed = true; _producerEndClosed = true; @@ -608,7 +608,7 @@ public: } Stats getStats() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); + stdx::lock_guard<Latch> lk(_mutex); Stats stats; stats.queueDepth = _current; stats.waitingConsumers = _consumers; @@ -804,7 +804,7 @@ private: template <typename Callback> auto _pushRunner(Callback&& cb) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _checkProducerClosed(lk); @@ -815,7 +815,7 @@ private: template <typename Callback> auto _popRunner(Callback&& cb) { - stdx::unique_lock<stdx::mutex> lk(_mutex); + stdx::unique_lock<Latch> lk(_mutex); _checkConsumerClosed(lk); @@ -866,9 +866,7 @@ private: return t; } - void _waitForSpace(stdx::unique_lock<stdx::mutex>& lk, - size_t cost, - Interruptible* interruptible) { + void _waitForSpace(stdx::unique_lock<Latch>& lk, size_t cost, Interruptible* interruptible) { // We do some pre-flight checks to avoid creating a cv if we don't need one _checkProducerClosed(lk); @@ -885,7 +883,7 @@ private: }); } - void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk, Interruptible* interruptible) { + void _waitForNonEmpty(stdx::unique_lock<Latch>& lk, Interruptible* interruptible) { typename Consumers::Waiter waiter(_consumers); interruptible->waitForConditionOrInterrupt(_consumers.cv(), lk, [&] { @@ -894,7 +892,7 @@ private: }); } - mutable stdx::mutex _mutex; + mutable Mutex _mutex = MONGO_MAKE_LATCH("ProducerConsumerQueue::_mutex"); Options _options; |