summaryrefslogtreecommitdiff
path: root/src/mongo/util/producer_consumer_queue_test.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2018-02-21 12:13:58 -0500
committerJason Carey <jcarey@argv.me>2018-03-08 10:56:30 -0500
commit6cfa204de9fe5a5c0f93c0ba2e0fc8f19d307b78 (patch)
tree6719bfcb24ccdd1a63629b2e3f71c091451009fa /src/mongo/util/producer_consumer_queue_test.cpp
parent707329965e7300f409694698b64ec42dd0d85e46 (diff)
downloadmongo-6cfa204de9fe5a5c0f93c0ba2e0fc8f19d307b78.tar.gz
SERVER-33572 Add ProducerConsumerQueue to util
Add a bounded, interruptible, thread safe, single producer, multi-consumer queue to the utility directory.
Diffstat (limited to 'src/mongo/util/producer_consumer_queue_test.cpp')
-rw-r--r--src/mongo/util/producer_consumer_queue_test.cpp706
1 files changed, 706 insertions, 0 deletions
diff --git a/src/mongo/util/producer_consumer_queue_test.cpp b/src/mongo/util/producer_consumer_queue_test.cpp
new file mode 100644
index 00000000000..22577e791df
--- /dev/null
+++ b/src/mongo/util/producer_consumer_queue_test.cpp
@@ -0,0 +1,706 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/unittest/unittest.h"
+
+#include "mongo/util/producer_consumer_queue.h"
+
+#include "mongo/db/service_context_noop.h"
+#include "mongo/stdx/condition_variable.h"
+#include "mongo/stdx/mutex.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/assert_util.h"
+
+namespace mongo {
+
+namespace {
+
+template <typename... Args>
+class ProducerConsumerQueueTestHelper;
+
+template <>
+class ProducerConsumerQueueTestHelper<OperationContext> {
+public:
+ ProducerConsumerQueueTestHelper(ServiceContext* serviceCtx) : _serviceCtx(serviceCtx) {}
+
+ template <typename Callback>
+ stdx::thread runThread(StringData name, Callback&& cb) {
+ return stdx::thread([this, name, cb] {
+ auto client = _serviceCtx->makeClient(name.toString());
+ auto opCtx = client->makeOperationContext();
+
+ cb(opCtx.get());
+ });
+ }
+
+private:
+ ServiceContext* _serviceCtx;
+};
+
+template <typename Timeout>
+class ProducerConsumerQueueTestHelper<OperationContext, Timeout> {
+public:
+ ProducerConsumerQueueTestHelper(ServiceContext* serviceCtx, Timeout timeout)
+ : _serviceCtx(serviceCtx), _timeout(timeout) {}
+
+ template <typename Callback>
+ stdx::thread runThread(StringData name, Callback&& cb) {
+ return stdx::thread([this, name, cb] {
+ auto client = _serviceCtx->makeClient(name.toString());
+ auto opCtx = client->makeOperationContext();
+
+ cb(opCtx.get(), _timeout);
+ });
+ }
+
+private:
+ ServiceContext* _serviceCtx;
+ Timeout _timeout;
+};
+
+template <>
+class ProducerConsumerQueueTestHelper<> {
+public:
+ ProducerConsumerQueueTestHelper() = default;
+
+ template <typename Callback>
+ stdx::thread runThread(StringData name, Callback&& cb) {
+ return stdx::thread([this, name, cb] { cb(); });
+ }
+};
+
+template <typename Timeout>
+class ProducerConsumerQueueTestHelper<Timeout> {
+public:
+ ProducerConsumerQueueTestHelper(Timeout timeout) : _timeout(timeout) {}
+
+ template <typename Callback>
+ stdx::thread runThread(StringData name, Callback&& cb) {
+ return stdx::thread([this, name, cb] { cb(_timeout); });
+ }
+
+private:
+ Timeout _timeout;
+};
+
+class ProducerConsumerQueueTest : public unittest::Test {
+public:
+ ProducerConsumerQueueTest() : _serviceCtx(stdx::make_unique<ServiceContextNoop>()) {}
+
+ template <typename Callback>
+ stdx::thread runThread(StringData name, Callback&& cb) {
+ return stdx::thread([this, name, cb] {
+ auto client = _serviceCtx->makeClient(name.toString());
+ auto opCtx = client->makeOperationContext();
+
+ cb(opCtx.get());
+ });
+ }
+
+ template <typename Callback>
+ void runPermutations(Callback&& callback) {
+ const Minutes duration(30);
+
+ callback(ProducerConsumerQueueTestHelper<OperationContext>(_serviceCtx.get()));
+ callback(ProducerConsumerQueueTestHelper<OperationContext, Milliseconds>(_serviceCtx.get(),
+ duration));
+ callback(ProducerConsumerQueueTestHelper<OperationContext, Date_t>(
+ _serviceCtx.get(), _serviceCtx->getPreciseClockSource()->now() + duration));
+ callback(ProducerConsumerQueueTestHelper<>());
+ callback(ProducerConsumerQueueTestHelper<Milliseconds>(duration));
+ callback(ProducerConsumerQueueTestHelper<Date_t>(
+ _serviceCtx->getPreciseClockSource()->now() + duration));
+ }
+
+ template <typename Callback>
+ void runTimeoutPermutations(Callback&& callback) {
+ const Milliseconds duration(10);
+
+ callback(ProducerConsumerQueueTestHelper<OperationContext, Milliseconds>(_serviceCtx.get(),
+ duration));
+ callback(ProducerConsumerQueueTestHelper<OperationContext, Date_t>(
+ _serviceCtx.get(), _serviceCtx->getPreciseClockSource()->now() + duration));
+ callback(ProducerConsumerQueueTestHelper<Milliseconds>(duration));
+ callback(ProducerConsumerQueueTestHelper<Date_t>(
+ _serviceCtx->getPreciseClockSource()->now() + duration));
+ }
+
+private:
+ std::unique_ptr<ServiceContext> _serviceCtx;
+};
+
+class MoveOnly {
+public:
+ struct CostFunc {
+ CostFunc() = default;
+ explicit CostFunc(size_t val) : val(val) {}
+
+ size_t operator()(const MoveOnly& mo) const {
+ return val + *mo._val;
+ }
+
+ const size_t val = 0;
+ };
+
+ explicit MoveOnly(int i) : _val(i) {}
+
+ MoveOnly(const MoveOnly&) = delete;
+ MoveOnly& operator=(const MoveOnly&) = delete;
+
+ MoveOnly(MoveOnly&& other) : _val(other._val) {
+ other._val.reset();
+ }
+
+ MoveOnly& operator=(MoveOnly&& other) {
+ if (&other == this) {
+ return *this;
+ }
+
+ _val = other._val;
+ other._val.reset();
+
+ return *this;
+ }
+
+ bool movedFrom() const {
+ return !_val;
+ }
+
+ friend bool operator==(const MoveOnly& lhs, const MoveOnly& rhs) {
+ return *lhs._val == *rhs._val;
+ }
+
+ friend bool operator!=(const MoveOnly& lhs, const MoveOnly& rhs) {
+ return !(lhs == rhs);
+ }
+
+ friend std::ostream& operator<<(std::ostream& os, const MoveOnly& mo) {
+ return (os << "MoveOnly(" << *mo._val << ")");
+ }
+
+private:
+ boost::optional<int> _val;
+};
+
+TEST_F(ProducerConsumerQueueTest, basicPushPop) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{};
+
+ helper
+ .runThread(
+ "Producer",
+ [&](auto... interruptionArgs) { pcq.push(MoveOnly(1), interruptionArgs...); })
+ .join();
+
+ ASSERT_EQUALS(pcq.sizeForTest(), 1ul);
+
+ helper
+ .runThread("Consumer",
+ [&](auto... interruptionArgs) {
+ ASSERT_EQUALS(pcq.pop(interruptionArgs...), MoveOnly(1));
+ })
+ .join();
+
+ ASSERT_TRUE(pcq.emptyForTest());
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, closeConsumerEnd) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{1};
+
+ pcq.push(MoveOnly(1));
+
+ auto producer = helper.runThread("Producer", [&](auto... interruptionArgs) {
+ ASSERT_THROWS_CODE(pcq.push(MoveOnly(2), interruptionArgs...),
+ DBException,
+ ErrorCodes::ProducerConsumerQueueEndClosed);
+ });
+
+ ASSERT_EQUALS(pcq.sizeForTest(), 1ul);
+
+ pcq.closeConsumerEnd();
+
+ ASSERT_THROWS_CODE(pcq.pop(), DBException, ErrorCodes::ProducerConsumerQueueEndClosed);
+
+ producer.join();
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, closeProducerEndImmediate) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{};
+
+ pcq.push(MoveOnly(1));
+ pcq.closeProducerEnd();
+
+ helper
+ .runThread("Consumer",
+ [&](auto... interruptionArgs) {
+ ASSERT_EQUALS(pcq.pop(interruptionArgs...), MoveOnly(1));
+
+ ASSERT_THROWS_CODE(pcq.pop(interruptionArgs...),
+ DBException,
+ ErrorCodes::ProducerConsumerQueueEndClosed);
+ })
+ .join();
+
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, closeProducerEndBlocking) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{};
+
+ auto consumer = helper.runThread("Consumer", [&](auto... interruptionArgs) {
+ ASSERT_THROWS_CODE(pcq.pop(interruptionArgs...),
+ DBException,
+ ErrorCodes::ProducerConsumerQueueEndClosed);
+ });
+
+ pcq.closeProducerEnd();
+
+ consumer.join();
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, popsWithTimeout) {
+ runTimeoutPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{};
+
+ helper
+ .runThread(
+ "Consumer",
+ [&](auto... interruptionArgs) {
+ ASSERT_THROWS_CODE(
+ pcq.pop(interruptionArgs...), DBException, ErrorCodes::ExceededTimeLimit);
+
+ std::vector<MoveOnly> vec;
+ ASSERT_THROWS_CODE(pcq.popMany(std::back_inserter(vec), interruptionArgs...),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+
+ ASSERT_THROWS_CODE(
+ pcq.popManyUpTo(1000, std::back_inserter(vec), interruptionArgs...),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+ })
+ .join();
+
+ ASSERT_EQUALS(pcq.sizeForTest(), 0ul);
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, pushesWithTimeout) {
+ runTimeoutPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{1};
+
+ {
+ MoveOnly mo(1);
+ pcq.push(std::move(mo));
+ ASSERT(mo.movedFrom());
+ }
+
+ helper
+ .runThread("Consumer",
+ [&](auto... interruptionArgs) {
+ {
+ MoveOnly mo(2);
+ ASSERT_THROWS_CODE(pcq.push(std::move(mo), interruptionArgs...),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+ ASSERT_EQUALS(pcq.sizeForTest(), 1ul);
+ ASSERT(!mo.movedFrom());
+ ASSERT_EQUALS(mo, MoveOnly(2));
+ }
+
+ {
+ std::vector<MoveOnly> vec;
+ vec.emplace_back(MoveOnly(2));
+
+ auto iter = begin(vec);
+ ASSERT_THROWS_CODE(pcq.pushMany(iter, end(vec), interruptionArgs...),
+ DBException,
+ ErrorCodes::ExceededTimeLimit);
+ ASSERT_EQUALS(pcq.sizeForTest(), 1ul);
+ ASSERT(!vec[0].movedFrom());
+ ASSERT_EQUALS(vec[0], MoveOnly(2));
+ }
+ })
+ .join();
+
+ ASSERT_EQUALS(pcq.sizeForTest(), 1ul);
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, basicPushPopWithBlocking) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{};
+
+ auto consumer = helper.runThread("Consumer", [&](auto... interruptionArgs) {
+ ASSERT_EQUALS(pcq.pop(interruptionArgs...), MoveOnly(1));
+ });
+
+ auto producer = helper.runThread("Producer", [&](auto... interruptionArgs) {
+ pcq.push(MoveOnly(1), interruptionArgs...);
+ });
+
+ consumer.join();
+ producer.join();
+
+ ASSERT_TRUE(pcq.emptyForTest());
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, multipleStepPushPopWithBlocking) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{1};
+
+ auto consumer = helper.runThread("Consumer", [&](auto... interruptionArgs) {
+ for (int i = 0; i < 10; ++i) {
+ ASSERT_EQUALS(pcq.pop(interruptionArgs...), MoveOnly(i));
+ }
+ });
+
+ auto producer = helper.runThread("Producer", [&](auto... interruptionArgs) {
+ for (int i = 0; i < 10; ++i) {
+ pcq.push(MoveOnly(i), interruptionArgs...);
+ }
+ });
+
+ consumer.join();
+ producer.join();
+
+ ASSERT_TRUE(pcq.emptyForTest());
+ });
+}
+
+
+TEST_F(ProducerConsumerQueueTest, pushTooLarge) {
+ runPermutations([](auto helper) {
+ {
+ ProducerConsumerQueue<MoveOnly, MoveOnly::CostFunc> pcq{1};
+
+ helper
+ .runThread("Producer",
+ [&](auto... interruptionArgs) {
+ ASSERT_THROWS_CODE(pcq.push(MoveOnly(2), interruptionArgs...),
+ DBException,
+ ErrorCodes::ProducerConsumerQueueBatchTooLarge);
+ })
+ .join();
+ }
+
+ {
+ ProducerConsumerQueue<MoveOnly, MoveOnly::CostFunc> pcq{4};
+
+ std::vector<MoveOnly> vec;
+ vec.push_back(MoveOnly(3));
+ vec.push_back(MoveOnly(3));
+
+ helper
+ .runThread("Producer",
+ [&](auto... interruptionArgs) {
+ ASSERT_THROWS_CODE(
+ pcq.pushMany(begin(vec), end(vec), interruptionArgs...),
+ DBException,
+ ErrorCodes::ProducerConsumerQueueBatchTooLarge);
+ })
+ .join();
+ }
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, pushManyPopWithoutBlocking) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{};
+
+ helper
+ .runThread("Producer",
+ [&](auto... interruptionArgs) {
+ std::vector<MoveOnly> vec;
+ for (int i = 0; i < 10; ++i) {
+ vec.emplace_back(MoveOnly(i));
+ }
+
+ pcq.pushMany(begin(vec), end(vec), interruptionArgs...);
+ })
+ .join();
+
+ helper
+ .runThread("Consumer",
+ [&](auto... interruptionArgs) {
+ for (int i = 0; i < 10; ++i) {
+ ASSERT_EQUALS(pcq.pop(interruptionArgs...), MoveOnly(i));
+ }
+ })
+ .join();
+
+ ASSERT_TRUE(pcq.emptyForTest());
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, popManyPopWithBlocking) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{2};
+
+ auto consumer = helper.runThread("Consumer", [&](auto... interruptionArgs) {
+ for (int i = 0; i < 10; i = i + 2) {
+ std::vector<MoveOnly> out;
+
+ pcq.popMany(std::back_inserter(out), interruptionArgs...);
+
+ ASSERT_EQUALS(out.size(), 2ul);
+ ASSERT_EQUALS(out[0], MoveOnly(i));
+ ASSERT_EQUALS(out[1], MoveOnly(i + 1));
+ }
+ });
+
+ auto producer = helper.runThread("Producer", [&](auto... interruptionArgs) {
+ std::vector<MoveOnly> vec;
+ for (int i = 0; i < 10; ++i) {
+ vec.emplace_back(MoveOnly(i));
+ }
+
+ for (auto iter = begin(vec); iter != end(vec); iter += 2) {
+ pcq.pushMany(iter, iter + 2);
+ }
+ });
+
+ consumer.join();
+ producer.join();
+
+ ASSERT_TRUE(pcq.emptyForTest());
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, popManyUpToPopWithBlocking) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{4};
+
+ auto consumer = helper.runThread("Consumer", [&](auto... interruptionArgs) {
+ for (int i = 0; i < 10; i = i + 2) {
+ std::vector<MoveOnly> out;
+
+ size_t spent;
+ std::tie(spent, std::ignore) =
+ pcq.popManyUpTo(2, std::back_inserter(out), interruptionArgs...);
+
+ ASSERT_EQUALS(spent, 2ul);
+ ASSERT_EQUALS(out.size(), 2ul);
+ ASSERT_EQUALS(out[0], MoveOnly(i));
+ ASSERT_EQUALS(out[1], MoveOnly(i + 1));
+ }
+ });
+
+ auto producer = helper.runThread("Producer", [&](auto... interruptionArgs) {
+ std::vector<MoveOnly> vec;
+ for (int i = 0; i < 10; ++i) {
+ vec.emplace_back(MoveOnly(i));
+ }
+
+ for (auto iter = begin(vec); iter != end(vec); iter += 2) {
+ pcq.pushMany(iter, iter + 2);
+ }
+ });
+
+ consumer.join();
+ producer.join();
+
+ ASSERT_TRUE(pcq.emptyForTest());
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, popManyUpToPopWithBlockingWithSpecialCost) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly, MoveOnly::CostFunc> pcq{};
+
+ auto consumer = helper.runThread("Consumer", [&](auto... interruptionArgs) {
+ {
+ std::vector<MoveOnly> out;
+ size_t spent;
+ std::tie(spent, std::ignore) =
+ pcq.popManyUpTo(5, std::back_inserter(out), interruptionArgs...);
+
+ ASSERT_EQUALS(spent, 6ul);
+ ASSERT_EQUALS(out.size(), 3ul);
+ ASSERT_EQUALS(out[0], MoveOnly(1));
+ ASSERT_EQUALS(out[1], MoveOnly(2));
+ ASSERT_EQUALS(out[2], MoveOnly(3));
+ }
+
+ {
+ std::vector<MoveOnly> out;
+ size_t spent;
+ std::tie(spent, std::ignore) =
+ pcq.popManyUpTo(15, std::back_inserter(out), interruptionArgs...);
+
+ ASSERT_EQUALS(spent, 9ul);
+ ASSERT_EQUALS(out.size(), 2ul);
+ ASSERT_EQUALS(out[0], MoveOnly(4));
+ ASSERT_EQUALS(out[1], MoveOnly(5));
+ }
+ });
+
+ auto producer = helper.runThread("Producer", [&](auto... interruptionArgs) {
+ std::vector<MoveOnly> vec;
+ for (int i = 1; i < 6; ++i) {
+ vec.emplace_back(MoveOnly(i));
+ }
+
+ pcq.pushMany(begin(vec), end(vec), interruptionArgs...);
+ });
+
+ consumer.join();
+ producer.join();
+
+ ASSERT_TRUE(pcq.emptyForTest());
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, singleProducerMultiConsumer) {
+ runPermutations([](auto helper) {
+ ProducerConsumerQueue<MoveOnly> pcq{};
+
+ stdx::mutex mutex;
+ size_t success = 0;
+ size_t failure = 0;
+
+ std::array<stdx::thread, 3> threads;
+ for (auto& thread : threads) {
+ thread = helper.runThread("Consumer", [&](auto... interruptionArgs) {
+ {
+ try {
+ pcq.pop(interruptionArgs...);
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ success++;
+ } catch (const ExceptionFor<ErrorCodes::ProducerConsumerQueueEndClosed>&) {
+ stdx::lock_guard<stdx::mutex> lk(mutex);
+ failure++;
+ }
+ }
+ });
+ }
+
+ pcq.push(MoveOnly(1));
+ pcq.push(MoveOnly(2));
+
+ pcq.closeProducerEnd();
+
+ for (auto& thread : threads) {
+ thread.join();
+ }
+
+ ASSERT_EQUALS(success, 2ul);
+ ASSERT_EQUALS(failure, 1ul);
+
+ ASSERT_TRUE(pcq.emptyForTest());
+ });
+}
+
+TEST_F(ProducerConsumerQueueTest, basicTryPop) {
+ ProducerConsumerQueue<MoveOnly> pcq{};
+
+ ASSERT_FALSE(pcq.tryPop());
+ ASSERT_TRUE(pcq.tryPush(MoveOnly(1)));
+ ASSERT_EQUALS(pcq.sizeForTest(), 1ul);
+
+ auto val = pcq.tryPop();
+
+ ASSERT_FALSE(pcq.tryPop());
+ ASSERT_TRUE(val);
+ ASSERT_EQUALS(*val, MoveOnly(1));
+
+ ASSERT_TRUE(pcq.emptyForTest());
+}
+
+TEST_F(ProducerConsumerQueueTest, basicTryPush) {
+ ProducerConsumerQueue<MoveOnly> pcq{1};
+
+ ASSERT_TRUE(pcq.tryPush(MoveOnly(1)));
+ ASSERT_FALSE(pcq.tryPush(MoveOnly(2)));
+
+ ASSERT_EQUALS(pcq.sizeForTest(), 1ul);
+
+ auto val = pcq.tryPop();
+ ASSERT_FALSE(pcq.tryPop());
+ ASSERT_TRUE(val);
+ ASSERT_EQUALS(*val, MoveOnly(1));
+
+ ASSERT_TRUE(pcq.emptyForTest());
+}
+
+TEST_F(ProducerConsumerQueueTest, tryPushWithSpecialCost) {
+ ProducerConsumerQueue<MoveOnly, MoveOnly::CostFunc> pcq{5};
+
+ ASSERT_TRUE(pcq.tryPush(MoveOnly(1)));
+ ASSERT_TRUE(pcq.tryPush(MoveOnly(2)));
+ ASSERT_FALSE(pcq.tryPush(MoveOnly(3)));
+
+ ASSERT_EQUALS(pcq.sizeForTest(), 3ul);
+
+ auto val1 = pcq.tryPop();
+ ASSERT_EQUALS(pcq.sizeForTest(), 2ul);
+ auto val2 = pcq.tryPop();
+ ASSERT_EQUALS(pcq.sizeForTest(), 0ul);
+ ASSERT_FALSE(pcq.tryPop());
+ ASSERT_TRUE(val1);
+ ASSERT_TRUE(val2);
+ ASSERT_EQUALS(*val1, MoveOnly(1));
+ ASSERT_EQUALS(*val2, MoveOnly(2));
+
+ ASSERT_TRUE(pcq.emptyForTest());
+}
+
+TEST_F(ProducerConsumerQueueTest, tryPushWithSpecialStatefulCost) {
+ ProducerConsumerQueue<MoveOnly, MoveOnly::CostFunc> pcq{5, MoveOnly::CostFunc(1)};
+
+ ASSERT_TRUE(pcq.tryPush(MoveOnly(1)));
+ ASSERT_TRUE(pcq.tryPush(MoveOnly(2)));
+ ASSERT_FALSE(pcq.tryPush(MoveOnly(3)));
+
+ ASSERT_EQUALS(pcq.sizeForTest(), 5ul);
+
+ auto val1 = pcq.tryPop();
+ ASSERT_EQUALS(pcq.sizeForTest(), 3ul);
+ auto val2 = pcq.tryPop();
+ ASSERT_EQUALS(pcq.sizeForTest(), 0ul);
+ ASSERT_FALSE(pcq.tryPop());
+ ASSERT_TRUE(val1);
+ ASSERT_TRUE(val2);
+ ASSERT_EQUALS(*val1, MoveOnly(1));
+ ASSERT_EQUALS(*val2, MoveOnly(2));
+
+ ASSERT_TRUE(pcq.emptyForTest());
+}
+
+} // namespace
+
+} // namespace mongo