From 3a9efc840255c252732429b97c8b36ded52ed417 Mon Sep 17 00:00:00 2001 From: Jason Carey Date: Tue, 15 Jan 2019 18:29:18 -0500 Subject: SERVER-39024 Improve PCQ::popMany improve the ProduceConsumerQueue's popMany and popManyUpTo to return a deque, rather than writing to an output iterator. --- src/mongo/util/producer_consumer_queue.h | 84 ++++++++++++------------- src/mongo/util/producer_consumer_queue_test.cpp | 71 +++++++++++---------- 2 files changed, 78 insertions(+), 77 deletions(-) diff --git a/src/mongo/util/producer_consumer_queue.h b/src/mongo/util/producer_consumer_queue.h index 626370489e4..dbb2f53dcf2 100644 --- a/src/mongo/util/producer_consumer_queue.h +++ b/src/mongo/util/producer_consumer_queue.h @@ -34,8 +34,6 @@ #include #include #include -#include -#include #include "mongo/db/operation_context.h" #include "mongo/stdx/condition_variable.h" @@ -541,44 +539,50 @@ public: // Waits for at least one item in the queue, then pops items out of the queue until it would // block // - // OutputIterator must not throw on move assignment to *iter or popped values may be lost - // TODO: add sfinae to check to enforce - // - // Returns the cost value of the items extracted, along with the updated output iterator - template - std::pair popMany( - OutputIterator iterator, Interruptible* interruptible = Interruptible::notInterruptible()) { - return popManyUpTo(_options.maxQueueDepth, iterator, interruptible); + // Returns the popped values, along with the cost value of the items extracted + std::pair, size_t> popMany( + Interruptible* interruptible = Interruptible::notInterruptible()) { + return _popRunner([&](stdx::unique_lock& lk) { + _waitForNonEmpty(lk, interruptible); + return std::make_pair(std::exchange(_queue, {}), std::exchange(_current, 0)); + }); } // Waits for at least one item in the queue, then pops items out of the queue until it would - // block, or we've exceeded our budget + // block, or the items cost would exceeded our budget // - // OutputIterator must not throw on move assignment to *iter or popped values may be lost - // TODO: add sfinae to check to enforce + // Returns the popped values, along with the cost value of the items extracted. // - // Returns the cost value of the items extracted, along with the updated output iterator - template - std::pair popManyUpTo( - size_t budget, - OutputIterator iterator, - Interruptible* interruptible = Interruptible::notInterruptible()) { + // Note that if the next item in the queue costs more than our budget, this may return without + // any items. + // + std::pair, size_t> popManyUpTo( + size_t budget, Interruptible* interruptible = Interruptible::notInterruptible()) { return _popRunner([&](stdx::unique_lock& lk) { - size_t cost = 0; - _waitForNonEmpty(lk, interruptible); - while (auto out = _tryPop(lk)) { - cost += _invokeCostFunc(*out, lk); - *iterator = std::move(*out); - ++iterator; + if (_current <= budget) { + return std::make_pair(std::exchange(_queue, {}), std::exchange(_current, 0)); + } + + decltype(_queue) queue; + size_t cost = 0; + + while (_queue.size()) { + auto potentialCost = _invokeCostFunc(_queue.front(), lk); - if (cost >= budget) { + if (cost + potentialCost > budget) { break; } + + cost += potentialCost; + + queue.emplace_back(std::move(_queue.front())); + _queue.pop_front(); + _current -= potentialCost; } - return std::make_pair(cost, iterator); + return std::make_pair(std::move(queue), cost); }); } @@ -669,20 +673,14 @@ public: return _parent->pop(interruptible); } - template - std::pair popMany( - OutputIterator&& iterator, + std::pair, size_t> popMany( Interruptible* interruptible = Interruptible::notInterruptible()) const { - return _parent->popMany(std::forward(iterator), interruptible); + return _parent->popMany(interruptible); } - template - std::pair popManyUpTo( - size_t budget, - OutputIterator&& iterator, - Interruptible* interruptible = Interruptible::notInterruptible()) const { - return _parent->popManyUpTo( - budget, std::forward(iterator), interruptible); + std::pair, size_t> popManyUpTo( + size_t budget, Interruptible* interruptible = Interruptible::notInterruptible()) const { + return _parent->popManyUpTo(budget, interruptible); } boost::optional tryPop() const { @@ -833,7 +831,7 @@ private: bool _tryPush(WithLock wl, T&& t) { size_t cost = _invokeCostFunc(t, wl); if (_current + cost <= _options.maxQueueDepth) { - _queue.emplace(std::move(t)); + _queue.emplace_back(std::move(t)); _current += cost; return true; } @@ -845,7 +843,7 @@ private: size_t cost = _invokeCostFunc(t, wl); invariant(_current + cost <= _options.maxQueueDepth); - _queue.emplace(std::move(t)); + _queue.emplace_back(std::move(t)); _current += cost; } @@ -854,7 +852,7 @@ private: if (!_queue.empty()) { out.emplace(std::move(_queue.front())); - _queue.pop(); + _queue.pop_front(); _current -= _invokeCostFunc(*out, wl); } @@ -865,7 +863,7 @@ private: invariant(_queue.size()); auto t = std::move(_queue.front()); - _queue.pop(); + _queue.pop_front(); _current -= _invokeCostFunc(t, wl); @@ -907,7 +905,7 @@ private: // Current size of the queue size_t _current = 0; - std::queue _queue; + std::deque _queue; // State for waiting consumers and producers Consumers _consumers; diff --git a/src/mongo/util/producer_consumer_queue_test.cpp b/src/mongo/util/producer_consumer_queue_test.cpp index f49f8b80667..a1fa880c5d6 100644 --- a/src/mongo/util/producer_consumer_queue_test.cpp +++ b/src/mongo/util/producer_consumer_queue_test.cpp @@ -295,20 +295,16 @@ PRODUCER_CONSUMER_QUEUE_TEST(popsWithTimeout, runTimeoutPermutations pcq{}; helper - .runThread("Consumer", - [&](OperationContext* opCtx) { - ASSERT_THROWS_CODE( - pcq.pop(opCtx), DBException, ErrorCodes::ExceededTimeLimit); + .runThread( + "Consumer", + [&](OperationContext* opCtx) { + ASSERT_THROWS_CODE(pcq.pop(opCtx), DBException, ErrorCodes::ExceededTimeLimit); - std::vector vec; - ASSERT_THROWS_CODE(pcq.popMany(std::back_inserter(vec), opCtx), - DBException, - ErrorCodes::ExceededTimeLimit); + ASSERT_THROWS_CODE(pcq.popMany(opCtx), DBException, ErrorCodes::ExceededTimeLimit); - ASSERT_THROWS_CODE(pcq.popManyUpTo(1000, std::back_inserter(vec), opCtx), - DBException, - ErrorCodes::ExceededTimeLimit); - }) + ASSERT_THROWS_CODE( + pcq.popManyUpTo(1000, opCtx), DBException, ErrorCodes::ExceededTimeLimit); + }) .join(); ASSERT_EQUALS(pcq.getStats().queueDepth, 0ul); @@ -508,9 +504,8 @@ PRODUCER_CONSUMER_QUEUE_TEST(popManyPopWithBlocking, runPermutations out; - - pcq.popMany(std::back_inserter(out), opCtx); + std::deque out; + std::tie(out, std::ignore) = pcq.popMany(opCtx); ASSERT_EQUALS(out.size(), 2ul); ASSERT_EQUALS(out[0], MoveOnly(i)); @@ -543,10 +538,10 @@ PRODUCER_CONSUMER_QUEUE_TEST(popManyUpToPopWithBlocking, runPermutations out; - + std::deque out; size_t spent; - std::tie(spent, std::ignore) = pcq.popManyUpTo(2, std::back_inserter(out), opCtx); + + std::tie(out, spent) = pcq.popManyUpTo(2, opCtx); ASSERT_EQUALS(spent, 2ul); ASSERT_EQUALS(out.size(), 2ul); @@ -578,32 +573,41 @@ PRODUCER_CONSUMER_QUEUE_TEST(popManyUpToPopWithBlockingWithSpecialCost, auto consumer = helper.runThread("Consumer", [&](OperationContext* opCtx) { { - std::vector out; + std::deque out; size_t spent; - std::tie(spent, std::ignore) = pcq.popManyUpTo(5, std::back_inserter(out), opCtx); + std::tie(out, spent) = pcq.popManyUpTo(5, opCtx); - ASSERT_EQUALS(spent, 6ul); - ASSERT_EQUALS(out.size(), 3ul); + ASSERT_EQUALS(spent, 3ul); + ASSERT_EQUALS(out.size(), 2ul); ASSERT_EQUALS(out[0], MoveOnly(1)); ASSERT_EQUALS(out[1], MoveOnly(2)); - ASSERT_EQUALS(out[2], MoveOnly(3)); } { - std::vector out; + std::deque out; size_t spent; - std::tie(spent, std::ignore) = pcq.popManyUpTo(15, std::back_inserter(out), opCtx); + std::tie(out, spent) = pcq.popManyUpTo(15, opCtx); - ASSERT_EQUALS(spent, 9ul); - ASSERT_EQUALS(out.size(), 2ul); - ASSERT_EQUALS(out[0], MoveOnly(4)); - ASSERT_EQUALS(out[1], MoveOnly(5)); + ASSERT_EQUALS(spent, 12ul); + ASSERT_EQUALS(out.size(), 3ul); + ASSERT_EQUALS(out[0], MoveOnly(3)); + ASSERT_EQUALS(out[1], MoveOnly(4)); + ASSERT_EQUALS(out[2], MoveOnly(5)); + } + + { + std::deque out; + size_t spent; + std::tie(out, spent) = pcq.popManyUpTo(5, opCtx); + + ASSERT_EQUALS(spent, 0ul); + ASSERT_EQUALS(out.size(), 0ul); } }); auto producer = helper.runThread("Producer", [&](OperationContext* opCtx) { std::vector vec; - for (int i = 1; i < 6; ++i) { + for (int i = 1; i <= 6; ++i) { vec.emplace_back(MoveOnly(i)); } @@ -613,7 +617,7 @@ PRODUCER_CONSUMER_QUEUE_TEST(popManyUpToPopWithBlockingWithSpecialCost, consumer.join(); producer.join(); - ASSERT_EQUALS(pcq.getStats().queueDepth, 0ul); + ASSERT_EQUALS(pcq.getStats().queueDepth, 6ul); } PRODUCER_CONSUMER_QUEUE_TEST(singleProducerMultiConsumer, runPermutations) { @@ -818,9 +822,8 @@ PRODUCER_CONSUMER_QUEUE_TEST(pipeCompiles, runPermutations) { ASSERT(producer.tryPush(MoveOnly(1))); ASSERT_EQUALS(consumer.pop(), MoveOnly(1)); - std::vector out; - ASSERT_EQUALS(consumer.popManyUpTo(1ul, std::back_inserter(out)).first, 1ul); - ASSERT_EQUALS(consumer.popMany(std::back_inserter(out)).first, 1ul); + ASSERT_EQUALS(consumer.popManyUpTo(1ul).second, 1ul); + ASSERT_EQUALS(consumer.popMany().second, 1ul); ASSERT_FALSE(consumer.tryPop()); producer.close(); -- cgit v1.2.1