summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2019-01-15 18:29:18 -0500
committerJason Carey <jcarey@argv.me>2019-01-30 14:55:47 -0500
commit3a9efc840255c252732429b97c8b36ded52ed417 (patch)
tree9a9b3f82f53fa0eb8c2930a9c45a90869be5a59a
parenteace76975fd1c521993e82fdc0c2c7833f84ed48 (diff)
downloadmongo-3a9efc840255c252732429b97c8b36ded52ed417.tar.gz
SERVER-39024 Improve PCQ::popMany
improve the ProduceConsumerQueue's popMany and popManyUpTo to return a deque, rather than writing to an output iterator.
-rw-r--r--src/mongo/util/producer_consumer_queue.h84
-rw-r--r--src/mongo/util/producer_consumer_queue_test.cpp71
2 files changed, 78 insertions, 77 deletions
diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h
index 626370489e4..dbb2f53dcf2 100644
--- a/src/mongo/util/producer_consumer_queue.h
+++ b/src/mongo/util/producer_consumer_queue.h
@@ -34,8 +34,6 @@
#include <deque>
#include <list>
#include <numeric>
-#include <queue>
-#include <stack>
#include "mongo/db/operation_context.h"
#include "mongo/stdx/condition_variable.h"
@@ -541,44 +539,50 @@ public:
// Waits for at least one item in the queue, then pops items out of the queue until it would
// block
//
- // OutputIterator must not throw on move assignment to *iter or popped values may be lost
- // TODO: add sfinae to check to enforce
- //
- // Returns the cost value of the items extracted, along with the updated output iterator
- template <typename OutputIterator>
- std::pair<size_t, OutputIterator> popMany(
- OutputIterator iterator, Interruptible* interruptible = Interruptible::notInterruptible()) {
- return popManyUpTo(_options.maxQueueDepth, iterator, interruptible);
+ // Returns the popped values, along with the cost value of the items extracted
+ std::pair<std::deque<T>, size_t> popMany(
+ Interruptible* interruptible = Interruptible::notInterruptible()) {
+ return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
+ _waitForNonEmpty(lk, interruptible);
+ return std::make_pair(std::exchange(_queue, {}), std::exchange(_current, 0));
+ });
}
// Waits for at least one item in the queue, then pops items out of the queue until it would
- // block, or we've exceeded our budget
+ // block, or the items cost would exceeded our budget
//
- // OutputIterator must not throw on move assignment to *iter or popped values may be lost
- // TODO: add sfinae to check to enforce
+ // Returns the popped values, along with the cost value of the items extracted.
//
- // Returns the cost value of the items extracted, along with the updated output iterator
- template <typename OutputIterator>
- std::pair<size_t, OutputIterator> popManyUpTo(
- size_t budget,
- OutputIterator iterator,
- Interruptible* interruptible = Interruptible::notInterruptible()) {
+ // Note that if the next item in the queue costs more than our budget, this may return without
+ // any items.
+ //
+ std::pair<std::deque<T>, size_t> popManyUpTo(
+ size_t budget, Interruptible* interruptible = Interruptible::notInterruptible()) {
return _popRunner([&](stdx::unique_lock<stdx::mutex>& lk) {
- size_t cost = 0;
-
_waitForNonEmpty(lk, interruptible);
- while (auto out = _tryPop(lk)) {
- cost += _invokeCostFunc(*out, lk);
- *iterator = std::move(*out);
- ++iterator;
+ if (_current <= budget) {
+ return std::make_pair(std::exchange(_queue, {}), std::exchange(_current, 0));
+ }
+
+ decltype(_queue) queue;
+ size_t cost = 0;
+
+ while (_queue.size()) {
+ auto potentialCost = _invokeCostFunc(_queue.front(), lk);
- if (cost >= budget) {
+ if (cost + potentialCost > budget) {
break;
}
+
+ cost += potentialCost;
+
+ queue.emplace_back(std::move(_queue.front()));
+ _queue.pop_front();
+ _current -= potentialCost;
}
- return std::make_pair(cost, iterator);
+ return std::make_pair(std::move(queue), cost);
});
}
@@ -669,20 +673,14 @@ public:
return _parent->pop(interruptible);
}
- template <typename OutputIterator>
- std::pair<size_t, OutputIterator> popMany(
- OutputIterator&& iterator,
+ std::pair<std::deque<T>, size_t> popMany(
Interruptible* interruptible = Interruptible::notInterruptible()) const {
- return _parent->popMany(std::forward<OutputIterator>(iterator), interruptible);
+ return _parent->popMany(interruptible);
}
- template <typename OutputIterator>
- std::pair<size_t, OutputIterator> popManyUpTo(
- size_t budget,
- OutputIterator&& iterator,
- Interruptible* interruptible = Interruptible::notInterruptible()) const {
- return _parent->popManyUpTo(
- budget, std::forward<OutputIterator>(iterator), interruptible);
+ std::pair<std::deque<T>, size_t> popManyUpTo(
+ size_t budget, Interruptible* interruptible = Interruptible::notInterruptible()) const {
+ return _parent->popManyUpTo(budget, interruptible);
}
boost::optional<T> tryPop() const {
@@ -833,7 +831,7 @@ private:
bool _tryPush(WithLock wl, T&& t) {
size_t cost = _invokeCostFunc(t, wl);
if (_current + cost <= _options.maxQueueDepth) {
- _queue.emplace(std::move(t));
+ _queue.emplace_back(std::move(t));
_current += cost;
return true;
}
@@ -845,7 +843,7 @@ private:
size_t cost = _invokeCostFunc(t, wl);
invariant(_current + cost <= _options.maxQueueDepth);
- _queue.emplace(std::move(t));
+ _queue.emplace_back(std::move(t));
_current += cost;
}
@@ -854,7 +852,7 @@ private:
if (!_queue.empty()) {
out.emplace(std::move(_queue.front()));
- _queue.pop();
+ _queue.pop_front();
_current -= _invokeCostFunc(*out, wl);
}
@@ -865,7 +863,7 @@ private:
invariant(_queue.size());
auto t = std::move(_queue.front());
- _queue.pop();
+ _queue.pop_front();
_current -= _invokeCostFunc(t, wl);
@@ -907,7 +905,7 @@ private:
// Current size of the queue
size_t _current = 0;
- std::queue<T> _queue;
+ std::deque<T> _queue;
// State for waiting consumers and producers
Consumers _consumers;
diff --git a/src/mongo/util/producer_consumer_queue_test.cpp b/src/mongo/util/producer_consumer_queue_test.cpp
index f49f8b80667..a1fa880c5d6 100644
--- a/src/mongo/util/producer_consumer_queue_test.cpp
+++ b/src/mongo/util/producer_consumer_queue_test.cpp
@@ -295,20 +295,16 @@ PRODUCER_CONSUMER_QUEUE_TEST(popsWithTimeout, runTimeoutPermutations<false, fals
typename Helper::template ProducerConsumerQueue<MoveOnly> pcq{};
helper
- .runThread("Consumer",
- [&](OperationContext* opCtx) {
- ASSERT_THROWS_CODE(
- pcq.pop(opCtx), DBException, ErrorCodes::ExceededTimeLimit);
+ .runThread(
+ "Consumer",
+ [&](OperationContext* opCtx) {
+ ASSERT_THROWS_CODE(pcq.pop(opCtx), DBException, ErrorCodes::ExceededTimeLimit);
- std::vector<MoveOnly> vec;
- ASSERT_THROWS_CODE(pcq.popMany(std::back_inserter(vec), opCtx),
- DBException,
- ErrorCodes::ExceededTimeLimit);
+ ASSERT_THROWS_CODE(pcq.popMany(opCtx), DBException, ErrorCodes::ExceededTimeLimit);
- ASSERT_THROWS_CODE(pcq.popManyUpTo(1000, std::back_inserter(vec), opCtx),
- DBException,
- ErrorCodes::ExceededTimeLimit);
- })
+ ASSERT_THROWS_CODE(
+ pcq.popManyUpTo(1000, opCtx), DBException, ErrorCodes::ExceededTimeLimit);
+ })
.join();
ASSERT_EQUALS(pcq.getStats().queueDepth, 0ul);
@@ -508,9 +504,8 @@ PRODUCER_CONSUMER_QUEUE_TEST(popManyPopWithBlocking, runPermutations<false, fals
auto consumer = helper.runThread("Consumer", [&](OperationContext* opCtx) {
for (int i = 0; i < 10; i = i + 2) {
- std::vector<MoveOnly> out;
-
- pcq.popMany(std::back_inserter(out), opCtx);
+ std::deque<MoveOnly> out;
+ std::tie(out, std::ignore) = pcq.popMany(opCtx);
ASSERT_EQUALS(out.size(), 2ul);
ASSERT_EQUALS(out[0], MoveOnly(i));
@@ -543,10 +538,10 @@ PRODUCER_CONSUMER_QUEUE_TEST(popManyUpToPopWithBlocking, runPermutations<false,
auto consumer = helper.runThread("Consumer", [&](OperationContext* opCtx) {
for (int i = 0; i < 10; i = i + 2) {
- std::vector<MoveOnly> out;
-
+ std::deque<MoveOnly> out;
size_t spent;
- std::tie(spent, std::ignore) = pcq.popManyUpTo(2, std::back_inserter(out), opCtx);
+
+ std::tie(out, spent) = pcq.popManyUpTo(2, opCtx);
ASSERT_EQUALS(spent, 2ul);
ASSERT_EQUALS(out.size(), 2ul);
@@ -578,32 +573,41 @@ PRODUCER_CONSUMER_QUEUE_TEST(popManyUpToPopWithBlockingWithSpecialCost,
auto consumer = helper.runThread("Consumer", [&](OperationContext* opCtx) {
{
- std::vector<MoveOnly> out;
+ std::deque<MoveOnly> out;
size_t spent;
- std::tie(spent, std::ignore) = pcq.popManyUpTo(5, std::back_inserter(out), opCtx);
+ std::tie(out, spent) = pcq.popManyUpTo(5, opCtx);
- ASSERT_EQUALS(spent, 6ul);
- ASSERT_EQUALS(out.size(), 3ul);
+ ASSERT_EQUALS(spent, 3ul);
+ ASSERT_EQUALS(out.size(), 2ul);
ASSERT_EQUALS(out[0], MoveOnly(1));
ASSERT_EQUALS(out[1], MoveOnly(2));
- ASSERT_EQUALS(out[2], MoveOnly(3));
}
{
- std::vector<MoveOnly> out;
+ std::deque<MoveOnly> out;
size_t spent;
- std::tie(spent, std::ignore) = pcq.popManyUpTo(15, std::back_inserter(out), opCtx);
+ std::tie(out, spent) = pcq.popManyUpTo(15, opCtx);
- ASSERT_EQUALS(spent, 9ul);
- ASSERT_EQUALS(out.size(), 2ul);
- ASSERT_EQUALS(out[0], MoveOnly(4));
- ASSERT_EQUALS(out[1], MoveOnly(5));
+ ASSERT_EQUALS(spent, 12ul);
+ ASSERT_EQUALS(out.size(), 3ul);
+ ASSERT_EQUALS(out[0], MoveOnly(3));
+ ASSERT_EQUALS(out[1], MoveOnly(4));
+ ASSERT_EQUALS(out[2], MoveOnly(5));
+ }
+
+ {
+ std::deque<MoveOnly> out;
+ size_t spent;
+ std::tie(out, spent) = pcq.popManyUpTo(5, opCtx);
+
+ ASSERT_EQUALS(spent, 0ul);
+ ASSERT_EQUALS(out.size(), 0ul);
}
});
auto producer = helper.runThread("Producer", [&](OperationContext* opCtx) {
std::vector<MoveOnly> vec;
- for (int i = 1; i < 6; ++i) {
+ for (int i = 1; i <= 6; ++i) {
vec.emplace_back(MoveOnly(i));
}
@@ -613,7 +617,7 @@ PRODUCER_CONSUMER_QUEUE_TEST(popManyUpToPopWithBlockingWithSpecialCost,
consumer.join();
producer.join();
- ASSERT_EQUALS(pcq.getStats().queueDepth, 0ul);
+ ASSERT_EQUALS(pcq.getStats().queueDepth, 6ul);
}
PRODUCER_CONSUMER_QUEUE_TEST(singleProducerMultiConsumer, runPermutations<false, true>) {
@@ -818,9 +822,8 @@ PRODUCER_CONSUMER_QUEUE_TEST(pipeCompiles, runPermutations<false, false>) {
ASSERT(producer.tryPush(MoveOnly(1)));
ASSERT_EQUALS(consumer.pop(), MoveOnly(1));
- std::vector<MoveOnly> out;
- ASSERT_EQUALS(consumer.popManyUpTo(1ul, std::back_inserter(out)).first, 1ul);
- ASSERT_EQUALS(consumer.popMany(std::back_inserter(out)).first, 1ul);
+ ASSERT_EQUALS(consumer.popManyUpTo(1ul).second, 1ul);
+ ASSERT_EQUALS(consumer.popMany().second, 1ul);
ASSERT_FALSE(consumer.tryPop());
producer.close();