summaryrefslogtreecommitdiff
path: root/src/mongo/util/producer_consumer_queue.h
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/producer_consumer_queue.h')
-rw-r--r--src/mongo/util/producer_consumer_queue.h173
1 files changed, 28 insertions, 145 deletions
diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h
index 8f2b17d2265..a2e1a09e9bb 100644
--- a/src/mongo/util/producer_consumer_queue.h
+++ b/src/mongo/util/producer_consumer_queue.h
@@ -38,6 +38,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/interruptible.h"
#include "mongo/util/scopeguard.h"
namespace mongo {
@@ -56,43 +57,6 @@ struct DefaultCostFunction {
}
};
-// Various helpers to tighten down whether the args getting passed are valid interruption args.
-//
-// Whatever the caller passes in the interruption args, they need to be invocable on one of
-// these helpers. std::is_invocable would do the job in C++17
-constexpr std::false_type areInterruptionArgsHelper(...) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(OperationContext*) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Milliseconds) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(OperationContext*, Date_t) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(Milliseconds) {
- return {};
-}
-
-constexpr std::true_type areInterruptionArgsHelper(Date_t) {
- return {};
-}
-
-template <typename U, typename... InterruptionArgs>
-constexpr auto areInterruptionArgs(U&& u, InterruptionArgs&&... args) {
- return areInterruptionArgsHelper(std::forward<U>(u), std::forward<InterruptionArgs>(args)...);
-}
-
-constexpr std::true_type areInterruptionArgs() {
- return {};
-}
-
} // namespace producer_consumer_queue_detail
/**
@@ -109,20 +73,9 @@ constexpr std::true_type areInterruptionArgs() {
* multi-consumer - Any number of threads may pop work out of the queue
*
* Interruptibility:
- * All of the blocking methods on this type allow for 6 kinds of interruptibility. The matrix is
- * parameterized by (void|OperationContext*)|(void|Milliseconds|Date_t). These provide different
- * kinds of waiting based on whether the method should be interruptible via opCtx, and then
- * whether they should timeout via deadline or duration
+ * All of the blocking methods on this type take an interruptible.
*
- * A contrived example: pcq.pop(opCtx, Minutes(1)) would be warranted if:
- * - The caller is blocking on a client thread. (opCtx)
- * - The caller needs to act periodically on inactivity. (the duration)
- *
- * Exceptions include:
- * timeouts
- * ErrorCodes::ExceededTimeLimit exceptions
- * opCtx interrupts
- * ErrorCodes::Interrupted exceptions
+ * Exceptions outside the interruptible include:
* closure of queue endpoints
* ErrorCodes::ProducerConsumerQueueEndClosed
* pushes with batches that exceed the max queue size
@@ -165,11 +118,7 @@ public:
// Pushes the passed T into the queue
//
// Leaves T unchanged if an interrupt exception is thrown while waiting for space
- template <
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- void push(T&& t, InterruptionArgs&&... interruptionArgs) {
+ void push(T&& t, Interruptible* interruptible = Interruptible::notInterruptible()) {
_pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
auto cost = _invokeCostFunc(t, lk);
uassert(ErrorCodes::ProducerConsumerQueueBatchTooLarge,
@@ -179,7 +128,7 @@ public:
<< ")",
cost <= _max);
- _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForSpace(lk, cost, interruptible);
_push(lk, std::move(t));
});
}
@@ -195,13 +144,10 @@ public:
//
// Lifecycle methods of T must not throw if you want to use this method, as there's no obvious
// mechanism to see what was and was not pushed if those do throw
- template <
- typename StartIterator,
- typename EndIterator,
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- void pushMany(StartIterator start, EndIterator last, InterruptionArgs&&... interruptionArgs) {
+ template <typename StartIterator, typename EndIterator>
+ void pushMany(StartIterator start,
+ EndIterator last,
+ Interruptible* interruptible = Interruptible::notInterruptible()) {
return _pushRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
size_t cost = 0;
for (auto iter = start; iter != last; ++iter) {
@@ -215,7 +161,7 @@ public:
<< ")",
cost <= _max);
- _waitForSpace(lk, cost, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForSpace(lk, cost, interruptible);
for (auto iter = start; iter != last; ++iter) {
_push(lk, std::move(*iter));
@@ -232,13 +178,9 @@ public:
}
// Pops one T out of the queue
- template <
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- T pop(InterruptionArgs&&... interruptionArgs) {
+ T pop(Interruptible* interruptible = Interruptible::notInterruptible()) {
return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
- _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForNonEmpty(lk, interruptible);
return _pop(lk);
});
}
@@ -250,14 +192,10 @@ public:
// TODO: add sfinae to check to enforce
//
// Returns the cost value of the items extracted, along with the updated output iterator
- template <
- typename OutputIterator,
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- std::pair<size_t, OutputIterator> popMany(OutputIterator iterator,
- InterruptionArgs&&... interruptionArgs) {
- return popManyUpTo(_max, iterator, std::forward<InterruptionArgs>(interruptionArgs)...);
+ template <typename OutputIterator>
+ std::pair<size_t, OutputIterator> popMany(
+ OutputIterator iterator, Interruptible* interruptible = Interruptible::notInterruptible()) {
+ return popManyUpTo(_max, iterator, interruptible);
}
// Waits for at least one item in the queue, then pops items out of the queue until it would
@@ -267,18 +205,15 @@ public:
// TODO: add sfinae to check to enforce
//
// Returns the cost value of the items extracted, along with the updated output iterator
- template <
- typename OutputIterator,
- typename... InterruptionArgs,
- typename = std::enable_if_t<decltype(producer_consumer_queue_detail::areInterruptionArgs(
- std::declval<InterruptionArgs>()...))::value>>
- std::pair<size_t, OutputIterator> popManyUpTo(size_t budget,
- OutputIterator iterator,
- InterruptionArgs&&... interruptionArgs) {
+ template <typename OutputIterator>
+ std::pair<size_t, OutputIterator> popManyUpTo(
+ size_t budget,
+ OutputIterator iterator,
+ Interruptible* interruptible = Interruptible::notInterruptible()) {
return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
size_t cost = 0;
- _waitForNonEmpty(lk, std::forward<InterruptionArgs>(interruptionArgs)...);
+ _waitForNonEmpty(lk, interruptible);
while (auto out = _tryPop(lk)) {
cost += _invokeCostFunc(*out, lk);
@@ -449,10 +384,9 @@ private:
return t;
}
- template <typename... InterruptionArgs>
void _waitForSpace(stdx::unique_lock<stdx::mutex>& lk,
size_t cost,
- InterruptionArgs&&... interruptionArgs) {
+ Interruptible* interruptible) {
invariant(!_producerWants);
_producerWants = cost;
@@ -464,12 +398,10 @@ private:
_checkProducerClosed(lk);
return _current + cost <= _max;
},
- std::forward<InterruptionArgs>(interruptionArgs)...);
+ interruptible);
}
- template <typename... InterruptionArgs>
- void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk,
- InterruptionArgs&&... interruptionArgs) {
+ void _waitForNonEmpty(stdx::unique_lock<stdx::mutex>& lk, Interruptible* interruptible) {
_consumers++;
const auto guard = MakeGuard([&] { _consumers--; });
@@ -480,64 +412,15 @@ private:
_checkConsumerClosed(lk);
return _queue.size();
},
- std::forward<InterruptionArgs>(interruptionArgs)...);
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- OperationContext* opCtx) {
- opCtx->waitForConditionOrInterrupt(condvar, lk, pred);
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred) {
- condvar.wait(lk, pred);
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- OperationContext* opCtx,
- Date_t deadline) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- opCtx->waitForConditionOrInterruptUntil(condvar, lk, deadline, pred));
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- Date_t deadline) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- condvar.wait_until(lk, deadline.toSystemTimePoint(), pred));
- }
-
- template <typename Callback>
- void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
- stdx::condition_variable& condvar,
- Callback&& pred,
- OperationContext* opCtx,
- Milliseconds duration) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- opCtx->waitForConditionOrInterruptFor(condvar, lk, duration, pred));
+ interruptible);
}
template <typename Callback>
void _waitFor(stdx::unique_lock<stdx::mutex>& lk,
stdx::condition_variable& condvar,
Callback&& pred,
- Milliseconds duration) {
- uassert(ErrorCodes::ExceededTimeLimit,
- "exceeded timeout",
- condvar.wait_for(lk, duration.toSystemDuration(), pred));
+ Interruptible* interruptible) {
+ interruptible->waitForConditionOrInterrupt(condvar, lk, pred);
}
mutable stdx::mutex _mutex;