diff options
Diffstat (limited to 'src/mongo/util/producer_consumer_queue.h')
-rw-r--r-- | src/mongo/util/producer_consumer_queue.h | 173 |
1 files changed, 28 insertions, 145 deletions
diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h index 8f2b17d2265..a2e1a09e9bb 100644 --- a/src/mongo/util/producer_consumer_queue.h +++ b/src/mongo/util/producer_consumer_queue.h @@ -38,6 +38,7 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/interruptible.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -56,43 +57,6 @@ struct DefaultCostFunction { } }; -// Various helpers to tighten down whether the args getting passed are valid interruption args. -// -// Whatever the caller passes in the interruption args, they need to be invocable on one of -// these helpers. std::is_invocable would do the job in C++17 -constexpr std::false_type areInterruptionArgsHelper(...) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(OperationContext*) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Milliseconds) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Date_t) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(Milliseconds) { - return {}; -} - -constexpr std::true_type areInterruptionArgsHelper(Date_t) { - return {}; -} - -template <typename U, typename... InterruptionArgs> -constexpr auto areInterruptionArgs(U&& u, InterruptionArgs&&... args) { - return areInterruptionArgsHelper(std::forward<U>(u), std::forward<InterruptionArgs>(args)...); -} - -constexpr std::true_type areInterruptionArgs() { - return {}; -} - } // namespace producer_consumer_queue_detail /** @@ -109,20 +73,9 @@ constexpr std::true_type areInterruptionArgs() { * multi-consumer - Any number of threads may pop work out of the queue * * Interruptibility: - * All of the blocking methods on this type allow for 6 kinds of interruptibility. The matrix is - * parameterized by (void|OperationContext*)|(void|Milliseconds|Date_t). These provide different - * kinds of waiting based on whether the method should be interruptible via opCtx, and then - * whether they should timeout via deadline or duration + * All of the blocking methods on this type take an interruptible. * - * A contrived example: pcq.pop(opCtx, Minutes(1)) would be warranted if: - * - The caller is blocking on a client thread. (opCtx) - * - The caller needs to act periodically on inactivity. (the duration) - * - * Exceptions include: - * timeouts - * ErrorCodes::ExceededTimeLimit exceptions - * opCtx interrupts - * ErrorCodes::Interrupted exceptions + * Exceptions outside the interruptible include: * closure of queue endpoints * ErrorCodes::ProducerConsumerQueueEndClosed * pushes with batches that exceed the max queue size @@ -165,11 +118,7 @@ public: // Pushes the passed T into the queue // // Leaves T unchanged if an interrupt exception is thrown while waiting for space - template < - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - void push(T&& t, InterruptionArgs&&... interruptionArgs) { + void push(T&& t, Interruptible* interruptible = Interruptible::notInterruptible()) { _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) { auto cost = _invokeCostFunc(t, lk); uassert(ErrorCodes::ProducerConsumerQueueBatchTooLarge, @@ -179,7 +128,7 @@ public: << ")", cost <= _max); - _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...); + _waitForSpace(lk, cost, interruptible); _push(lk, std::move(t)); }); } @@ -195,13 +144,10 @@ public: // // Lifecycle methods of T must not throw if you want to use this method, as there's no obvious // mechanism to see what was and was not pushed if those do throw - template < - typename StartIterator, - typename EndIterator, - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - void pushMany(StartIterator start, EndIterator last, InterruptionArgs&&... interruptionArgs) { + template <typename StartIterator, typename EndIterator> + void pushMany(StartIterator start, + EndIterator last, + Interruptible* interruptible = Interruptible::notInterruptible()) { return _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) { size_t cost = 0; for (auto iter = start; iter != last; ++iter) { @@ -215,7 +161,7 @@ public: << ")", cost <= _max); - _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...); + _waitForSpace(lk, cost, interruptible); for (auto iter = start; iter != last; ++iter) { _push(lk, std::move(*iter)); @@ -232,13 +178,9 @@ public: } // Pops one T out of the queue - template < - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - T pop(InterruptionArgs&&... interruptionArgs) { + T pop(Interruptible* interruptible = Interruptible::notInterruptible()) { return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) { - _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...); + _waitForNonEmpty(lk, interruptible); return _pop(lk); }); } @@ -250,14 +192,10 @@ public: // TODO: add sfinae to check to enforce // // Returns the cost value of the items extracted, along with the updated output iterator - template < - typename OutputIterator, - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - std::pair<size_t, OutputIterator> popMany(OutputIterator iterator, - InterruptionArgs&&... interruptionArgs) { - return popManyUpTo(_max, iterator, std::forward<InterruptionArgs>(interruptionArgs)...); + template <typename OutputIterator> + std::pair<size_t, OutputIterator> popMany( + OutputIterator iterator, Interruptible* interruptible = Interruptible::notInterruptible()) { + return popManyUpTo(_max, iterator, interruptible); } // Waits for at least one item in the queue, then pops items out of the queue until it would @@ -267,18 +205,15 @@ public: // TODO: add sfinae to check to enforce // // Returns the cost value of the items extracted, along with the updated output iterator - template < - typename OutputIterator, - typename... InterruptionArgs, - typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs( - std::declval<InterruptionArgs>()...))::value>> - std::pair<size_t, OutputIterator> popManyUpTo(size_t budget, - OutputIterator iterator, - InterruptionArgs&&... interruptionArgs) { + template <typename OutputIterator> + std::pair<size_t, OutputIterator> popManyUpTo( + size_t budget, + OutputIterator iterator, + Interruptible* interruptible = Interruptible::notInterruptible()) { return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) { size_t cost = 0; - _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...); + _waitForNonEmpty(lk, interruptible); while (auto out = _tryPop(lk)) { cost += _invokeCostFunc(*out, lk); @@ -449,10 +384,9 @@ private: return t; } - template <typename... InterruptionArgs> void _waitForSpace(stdx::unique_lock<stdx::mutex>& lk, size_t cost, - InterruptionArgs&&... interruptionArgs) { + Interruptible* interruptible) { invariant(!_producerWants); _producerWants = cost; @@ -464,12 +398,10 @@ private: _checkProducerClosed(lk); return _current + cost <= _max; }, - std::forward<InterruptionArgs>(interruptionArgs)...); + interruptible); } - template <typename... InterruptionArgs> - void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk, - InterruptionArgs&&... interruptionArgs) { + void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk, Interruptible* interruptible) { _consumers++; const auto guard = MakeGuard([&] { _consumers--; }); @@ -480,64 +412,15 @@ private: _checkConsumerClosed(lk); return _queue.size(); }, - std::forward<InterruptionArgs>(interruptionArgs)...); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred, - OperationContext* opCtx) { - opCtx->waitForConditionOrInterrupt(condvar, lk, pred); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred) { - condvar.wait(lk, pred); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred, - OperationContext* opCtx, - Date_t deadline) { - uassert(ErrorCodes::ExceededTimeLimit, - "exceeded timeout", - opCtx->waitForConditionOrInterruptUntil(condvar, lk, deadline, pred)); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred, - Date_t deadline) { - uassert(ErrorCodes::ExceededTimeLimit, - "exceeded timeout", - condvar.wait_until(lk, deadline.toSystemTimePoint(), pred)); - } - - template <typename Callback> - void _waitFor(stdx::unique_lock<stdx::mutex>& lk, - stdx::condition_variable& condvar, - Callback&& pred, - OperationContext* opCtx, - Milliseconds duration) { - uassert(ErrorCodes::ExceededTimeLimit, - "exceeded timeout", - opCtx->waitForConditionOrInterruptFor(condvar, lk, duration, pred)); + interruptible); } template <typename Callback> void _waitFor(stdx::unique_lock<stdx::mutex>& lk, stdx::condition_variable& condvar, Callback&& pred, - Milliseconds duration) { - uassert(ErrorCodes::ExceededTimeLimit, - "exceeded timeout", - condvar.wait_for(lk, duration.toSystemDuration(), pred)); + Interruptible* interruptible) { + interruptible->waitForConditionOrInterrupt(condvar, lk, pred); } mutable stdx::mutex _mutex; |