/** * Copyright (C) 2018 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/unittest/unittest.h" #include "mongo/util/producer_consumer_queue.h" #include "mongo/db/service_context_noop.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/memory.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/thread.h" #include "mongo/util/assert_util.h" namespace mongo { namespace { class ProducerConsumerQueueTest : public unittest::Test { public: ProducerConsumerQueueTest() : _serviceCtx(stdx::make_unique()) {} template stdx::thread runThread(StringData name, Callback&& cb) { return stdx::thread([this, name, cb] { cb(); }); } private: std::unique_ptr _serviceCtx; }; class MoveOnly { public: struct CostFunc { CostFunc() = default; explicit CostFunc(size_t val) : val(val) {} size_t operator()(const MoveOnly& mo) const { return val + *mo._val; } const size_t val = 0; }; explicit MoveOnly(int i) : _val(i) {} MoveOnly(const MoveOnly&) = delete; MoveOnly& operator=(const MoveOnly&) = delete; MoveOnly(MoveOnly&& other) : _val(other._val) { other._val.reset(); } MoveOnly& operator=(MoveOnly&& other) { if (&other == this) { return *this; } _val = other._val; other._val.reset(); return *this; } bool movedFrom() const { return !_val; } friend bool operator==(const MoveOnly& lhs, const MoveOnly& rhs) { return *lhs._val == *rhs._val; } friend bool operator!=(const MoveOnly& lhs, const MoveOnly& rhs) { return !(lhs == rhs); } friend std::ostream& operator<<(std::ostream& os, const MoveOnly& mo) { return (os << "MoveOnly(" << *mo._val << ")"); } private: boost::optional _val; }; TEST_F(ProducerConsumerQueueTest, basicPushPop) { ProducerConsumerQueue pcq{}; runThread("Producer", [&]() { pcq.push(MoveOnly(1)); }).join(); ASSERT_EQUALS(pcq.sizeForTest(), 1ul); runThread("Consumer", [&]() { ASSERT_EQUALS(pcq.pop(), MoveOnly(1)); }).join(); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, closeConsumerEnd) { ProducerConsumerQueue pcq{1}; pcq.push(MoveOnly(1)); auto producer = runThread("Producer", [&]() { ASSERT_THROWS_CODE( pcq.push(MoveOnly(2)), DBException, ErrorCodes::ProducerConsumerQueueEndClosed); }); ASSERT_EQUALS(pcq.sizeForTest(), 1ul); pcq.closeConsumerEnd(); ASSERT_THROWS_CODE(pcq.pop(), DBException, ErrorCodes::ProducerConsumerQueueEndClosed); producer.join(); } TEST_F(ProducerConsumerQueueTest, closeProducerEndImmediate) { ProducerConsumerQueue pcq{}; pcq.push(MoveOnly(1)); pcq.closeProducerEnd(); runThread("Consumer", [&]() { ASSERT_EQUALS(pcq.pop(), MoveOnly(1)); ASSERT_THROWS_CODE(pcq.pop(), DBException, ErrorCodes::ProducerConsumerQueueEndClosed); }).join(); } TEST_F(ProducerConsumerQueueTest, closeProducerEndBlocking) { ProducerConsumerQueue pcq{}; auto consumer = runThread("Consumer", [&]() { ASSERT_THROWS_CODE(pcq.pop(), DBException, ErrorCodes::ProducerConsumerQueueEndClosed); }); pcq.closeProducerEnd(); consumer.join(); } TEST_F(ProducerConsumerQueueTest, popsWithTimeout) { ProducerConsumerQueue pcq{}; runThread("Consumer", [&]() { ASSERT_THROWS_CODE(pcq.pop(Milliseconds(100)), DBException, ErrorCodes::ExceededTimeLimit); std::vector vec; ASSERT_THROWS_CODE(pcq.popMany(std::back_inserter(vec), Milliseconds(100)), DBException, ErrorCodes::ExceededTimeLimit); ASSERT_THROWS_CODE(pcq.popManyUpTo(1000, std::back_inserter(vec), Milliseconds(100)), DBException, ErrorCodes::ExceededTimeLimit); }).join(); ASSERT_EQUALS(pcq.sizeForTest(), 0ul); } TEST_F(ProducerConsumerQueueTest, pushesWithTimeout) { ProducerConsumerQueue pcq{1}; { MoveOnly mo(1); pcq.push(std::move(mo)); ASSERT(mo.movedFrom()); } runThread("Consumer", [&]() { { MoveOnly mo(2); ASSERT_THROWS_CODE(pcq.push(std::move(mo), Milliseconds(100)), DBException, ErrorCodes::ExceededTimeLimit); ASSERT_EQUALS(pcq.sizeForTest(), 1ul); ASSERT(!mo.movedFrom()); ASSERT_EQUALS(mo, MoveOnly(2)); } { std::vector vec; vec.emplace_back(MoveOnly(2)); auto iter = begin(vec); ASSERT_THROWS_CODE(pcq.pushMany(iter, end(vec), Milliseconds(100)), DBException, ErrorCodes::ExceededTimeLimit); ASSERT_EQUALS(pcq.sizeForTest(), 1ul); ASSERT(!vec[0].movedFrom()); ASSERT_EQUALS(vec[0], MoveOnly(2)); } }).join(); ASSERT_EQUALS(pcq.sizeForTest(), 1ul); } TEST_F(ProducerConsumerQueueTest, basicPushPopWithBlocking) { ProducerConsumerQueue pcq{}; auto consumer = runThread("Consumer", [&]() { ASSERT_EQUALS(pcq.pop(), MoveOnly(1)); }); auto producer = runThread("Producer", [&]() { pcq.push(MoveOnly(1)); }); consumer.join(); producer.join(); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, multipleStepPushPopWithBlocking) { ProducerConsumerQueue pcq{1}; auto consumer = runThread("Consumer", [&]() { for (int i = 0; i < 10; ++i) { ASSERT_EQUALS(pcq.pop(), MoveOnly(i)); } }); auto producer = runThread("Producer", [&]() { for (int i = 0; i < 10; ++i) { pcq.push(MoveOnly(i)); } }); consumer.join(); producer.join(); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, pushTooLarge) { { ProducerConsumerQueue pcq{1}; runThread("Producer", [&]() { ASSERT_THROWS_CODE( pcq.push(MoveOnly(2)), DBException, ErrorCodes::ProducerConsumerQueueBatchTooLarge); }).join(); } { ProducerConsumerQueue pcq{4}; std::vector vec; vec.push_back(MoveOnly(3)); vec.push_back(MoveOnly(3)); runThread("Producer", [&]() { ASSERT_THROWS_CODE(pcq.pushMany(begin(vec), end(vec)), DBException, ErrorCodes::ProducerConsumerQueueBatchTooLarge); }).join(); } } TEST_F(ProducerConsumerQueueTest, pushManyPopWithoutBlocking) { ProducerConsumerQueue pcq{}; runThread("Producer", [&]() { std::vector vec; for (int i = 0; i < 10; ++i) { vec.emplace_back(MoveOnly(i)); } pcq.pushMany(begin(vec), end(vec)); }).join(); runThread("Consumer", [&]() { for (int i = 0; i < 10; ++i) { ASSERT_EQUALS(pcq.pop(), MoveOnly(i)); } }).join(); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, popManyPopWithBlocking) { ProducerConsumerQueue pcq{2}; auto consumer = runThread("Consumer", [&]() { for (int i = 0; i < 10; i = i + 2) { std::vector out; pcq.popMany(std::back_inserter(out)); ASSERT_EQUALS(out.size(), 2ul); ASSERT_EQUALS(out[0], MoveOnly(i)); ASSERT_EQUALS(out[1], MoveOnly(i + 1)); } }); auto producer = runThread("Producer", [&]() { std::vector vec; for (int i = 0; i < 10; ++i) { vec.emplace_back(MoveOnly(i)); } for (auto iter = begin(vec); iter != end(vec); iter += 2) { pcq.pushMany(iter, iter + 2); } }); consumer.join(); producer.join(); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, popManyUpToPopWithBlocking) { ProducerConsumerQueue pcq{4}; auto consumer = runThread("Consumer", [&]() { for (int i = 0; i < 10; i = i + 2) { std::vector out; size_t spent; std::tie(spent, std::ignore) = pcq.popManyUpTo(2, std::back_inserter(out)); ASSERT_EQUALS(spent, 2ul); ASSERT_EQUALS(out.size(), 2ul); ASSERT_EQUALS(out[0], MoveOnly(i)); ASSERT_EQUALS(out[1], MoveOnly(i + 1)); } }); auto producer = runThread("Producer", [&]() { std::vector vec; for (int i = 0; i < 10; ++i) { vec.emplace_back(MoveOnly(i)); } for (auto iter = begin(vec); iter != end(vec); iter += 2) { pcq.pushMany(iter, iter + 2); } }); consumer.join(); producer.join(); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, popManyUpToPopWithBlockingWithSpecialCost) { ProducerConsumerQueue pcq{}; auto consumer = runThread("Consumer", [&]() { { std::vector out; size_t spent; std::tie(spent, std::ignore) = pcq.popManyUpTo(5, std::back_inserter(out)); ASSERT_EQUALS(spent, 6ul); ASSERT_EQUALS(out.size(), 3ul); ASSERT_EQUALS(out[0], MoveOnly(1)); ASSERT_EQUALS(out[1], MoveOnly(2)); ASSERT_EQUALS(out[2], MoveOnly(3)); } { std::vector out; size_t spent; std::tie(spent, std::ignore) = pcq.popManyUpTo(15, std::back_inserter(out)); ASSERT_EQUALS(spent, 9ul); ASSERT_EQUALS(out.size(), 2ul); ASSERT_EQUALS(out[0], MoveOnly(4)); ASSERT_EQUALS(out[1], MoveOnly(5)); } }); auto producer = runThread("Producer", [&]() { std::vector vec; for (int i = 1; i < 6; ++i) { vec.emplace_back(MoveOnly(i)); } pcq.pushMany(begin(vec), end(vec)); }); consumer.join(); producer.join(); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, singleProducerMultiConsumer) { ProducerConsumerQueue pcq{}; stdx::mutex mutex; size_t success = 0; size_t failure = 0; std::array threads; for (auto& thread : threads) { thread = runThread("Consumer", [&]() { { try { pcq.pop(); stdx::lock_guard lk(mutex); success++; } catch (const DBException& exception) { ASSERT_EQUALS(exception.getCode(), ErrorCodes::ProducerConsumerQueueEndClosed); stdx::lock_guard lk(mutex); failure++; } } }); } pcq.push(MoveOnly(1)); pcq.push(MoveOnly(2)); pcq.closeProducerEnd(); for (auto& thread : threads) { thread.join(); } ASSERT_EQUALS(success, 2ul); ASSERT_EQUALS(failure, 1ul); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, basicTryPop) { ProducerConsumerQueue pcq{}; ASSERT_FALSE(pcq.tryPop()); ASSERT_TRUE(pcq.tryPush(MoveOnly(1))); ASSERT_EQUALS(pcq.sizeForTest(), 1ul); auto val = pcq.tryPop(); ASSERT_FALSE(pcq.tryPop()); ASSERT_TRUE(val); ASSERT_EQUALS(*val, MoveOnly(1)); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, basicTryPush) { ProducerConsumerQueue pcq{1}; ASSERT_TRUE(pcq.tryPush(MoveOnly(1))); ASSERT_FALSE(pcq.tryPush(MoveOnly(2))); ASSERT_EQUALS(pcq.sizeForTest(), 1ul); auto val = pcq.tryPop(); ASSERT_FALSE(pcq.tryPop()); ASSERT_TRUE(val); ASSERT_EQUALS(*val, MoveOnly(1)); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, tryPushWithSpecialCost) { ProducerConsumerQueue pcq{5}; ASSERT_TRUE(pcq.tryPush(MoveOnly(1))); ASSERT_TRUE(pcq.tryPush(MoveOnly(2))); ASSERT_FALSE(pcq.tryPush(MoveOnly(3))); ASSERT_EQUALS(pcq.sizeForTest(), 3ul); auto val1 = pcq.tryPop(); ASSERT_EQUALS(pcq.sizeForTest(), 2ul); auto val2 = pcq.tryPop(); ASSERT_EQUALS(pcq.sizeForTest(), 0ul); ASSERT_FALSE(pcq.tryPop()); ASSERT_TRUE(val1); ASSERT_TRUE(val2); ASSERT_EQUALS(*val1, MoveOnly(1)); ASSERT_EQUALS(*val2, MoveOnly(2)); ASSERT_TRUE(pcq.emptyForTest()); } TEST_F(ProducerConsumerQueueTest, tryPushWithSpecialStatefulCost) { ProducerConsumerQueue pcq{5, MoveOnly::CostFunc(1)}; ASSERT_TRUE(pcq.tryPush(MoveOnly(1))); ASSERT_TRUE(pcq.tryPush(MoveOnly(2))); ASSERT_FALSE(pcq.tryPush(MoveOnly(3))); ASSERT_EQUALS(pcq.sizeForTest(), 5ul); auto val1 = pcq.tryPop(); ASSERT_EQUALS(pcq.sizeForTest(), 3ul); auto val2 = pcq.tryPop(); ASSERT_EQUALS(pcq.sizeForTest(), 0ul); ASSERT_FALSE(pcq.tryPop()); ASSERT_TRUE(val1); ASSERT_TRUE(val2); ASSERT_EQUALS(*val1, MoveOnly(1)); ASSERT_EQUALS(*val2, MoveOnly(2)); ASSERT_TRUE(pcq.emptyForTest()); } } // namespace } // namespace mongo