summaryrefslogtreecommitdiff
path: root/src/mongo/db/pipeline/document_source_exchange_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/pipeline/document_source_exchange_test.cpp')
-rw-r--r--src/mongo/db/pipeline/document_source_exchange_test.cpp329
1 files changed, 329 insertions, 0 deletions
diff --git a/src/mongo/db/pipeline/document_source_exchange_test.cpp b/src/mongo/db/pipeline/document_source_exchange_test.cpp
new file mode 100644
index 00000000000..81d25527fa4
--- /dev/null
+++ b/src/mongo/db/pipeline/document_source_exchange_test.cpp
@@ -0,0 +1,329 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/pipeline/aggregation_context_fixture.h"
+#include "mongo/db/pipeline/document_source_exchange.h"
+#include "mongo/db/pipeline/document_source_mock.h"
+#include "mongo/db/storage/key_string.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
+#include "mongo/platform/random.h"
+#include "mongo/unittest/temp_dir.h"
+#include "mongo/unittest/unittest.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/time_support.h"
+
+
+namespace mongo {
+
+class DocumentSourceExchangeTest : public AggregationContextFixture {
+protected:
+ std::unique_ptr<executor::TaskExecutor> _executor;
+ virtual void setUp() override {
+ auto net = executor::makeNetworkInterface("ExchangeTest");
+
+ ThreadPool::Options options;
+ auto pool = std::make_unique<ThreadPool>(options);
+
+ _executor =
+ std::make_unique<executor::ThreadPoolTaskExecutor>(std::move(pool), std::move(net));
+ _executor->startup();
+ }
+
+ virtual void tearDown() override {
+ _executor->shutdown();
+ _executor.reset();
+ }
+
+ auto getMockSource(int cnt) {
+ auto source = DocumentSourceMock::create();
+ for (int i = 0; i < cnt; ++i)
+ source->queue.emplace_back(Document{{"a", i}, {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}});
+
+ return source;
+ }
+
+ static auto getNewSeed() {
+ auto seed = Date_t::now().asInt64();
+ unittest::log() << "Generated new seed is " << seed;
+
+ return seed;
+ }
+
+ auto getRandomMockSource(size_t cnt, int64_t seed) {
+ PseudoRandom prng(seed);
+
+ auto source = DocumentSourceMock::create();
+ for (size_t i = 0; i < cnt; ++i)
+ source->queue.emplace_back(Document{{"a", static_cast<int>(prng.nextInt32() % cnt)},
+ {"b", "aaaaaaaaaaaaaaaaaaaaaaaaaaa"_sd}});
+
+ return source;
+ }
+};
+
+TEST_F(DocumentSourceExchangeTest, SimpleExchange1Consumer) {
+ const size_t nDocs = 500;
+
+ auto source = getMockSource(nDocs);
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kRoundRobin);
+ spec.setConsumers(1);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ ex->setSource(source.get());
+
+ auto input = ex->getNext(0);
+
+ size_t docs = 0;
+ for (; input.isAdvanced(); input = ex->getNext(0)) {
+ ++docs;
+ }
+
+ ASSERT_EQ(docs, nDocs);
+}
+
+TEST_F(DocumentSourceExchangeTest, SimpleExchangeNConsumer) {
+ const size_t nDocs = 500;
+ auto source = getMockSource(500);
+
+ const size_t nConsumers = 5;
+
+ ASSERT_EQ(nDocs % nConsumers, 0u);
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kRoundRobin);
+ spec.setConsumers(nConsumers);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
+
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.back()->setSource(source.get());
+ }
+
+ std::vector<executor::TaskExecutor::CallbackHandle> handles;
+
+ for (size_t id = 0; id < nConsumers; ++id) {
+ auto handle = _executor->scheduleWork(
+ [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) {
+ PseudoRandom prng(getNewSeed());
+
+ auto input = prods[id]->getNext();
+
+ size_t docs = 0;
+
+ for (; input.isAdvanced(); input = prods[id]->getNext()) {
+ sleepmillis(prng.nextInt32() % 20 + 1);
+ ++docs;
+ }
+ ASSERT_EQ(docs, nDocs / nConsumers);
+ });
+
+ handles.emplace_back(std::move(handle.getValue()));
+ }
+
+ for (auto& h : handles)
+ _executor->wait(h);
+}
+
+TEST_F(DocumentSourceExchangeTest, BroadcastExchangeNConsumer) {
+ const size_t nDocs = 500;
+ auto source = getMockSource(nDocs);
+
+ const size_t nConsumers = 5;
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kBroadcast);
+ spec.setConsumers(nConsumers);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
+
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.back()->setSource(source.get());
+ }
+
+ std::vector<executor::TaskExecutor::CallbackHandle> handles;
+
+ for (size_t id = 0; id < nConsumers; ++id) {
+ auto handle = _executor->scheduleWork(
+ [prods, id, nDocs](const executor::TaskExecutor::CallbackArgs& cb) {
+ size_t docs = 0;
+ for (auto input = prods[id]->getNext(); input.isAdvanced();
+ input = prods[id]->getNext()) {
+ ++docs;
+ }
+ ASSERT_EQ(docs, nDocs);
+ });
+
+ handles.emplace_back(std::move(handle.getValue()));
+ }
+
+ for (auto& h : handles)
+ _executor->wait(h);
+}
+
+TEST_F(DocumentSourceExchangeTest, RangeExchangeNConsumer) {
+ const size_t nDocs = 500;
+ auto source = getMockSource(nDocs);
+
+ std::vector<BSONObj> boundaries;
+ boundaries.push_back(BSON("a" << MINKEY));
+ boundaries.push_back(BSON("a" << 100));
+ boundaries.push_back(BSON("a" << 200));
+ boundaries.push_back(BSON("a" << 300));
+ boundaries.push_back(BSON("a" << 400));
+ boundaries.push_back(BSON("a" << MAXKEY));
+
+ const size_t nConsumers = boundaries.size() - 1;
+
+ ASSERT(nDocs % nConsumers == 0);
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kRange);
+ spec.setKey(BSON("a" << 1));
+ spec.setBoundaries(boundaries);
+ spec.setConsumers(nConsumers);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
+
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.back()->setSource(source.get());
+ }
+
+ std::vector<executor::TaskExecutor::CallbackHandle> handles;
+
+ for (size_t id = 0; id < nConsumers; ++id) {
+ auto handle = _executor->scheduleWork(
+ [prods, id, nDocs, nConsumers](const executor::TaskExecutor::CallbackArgs& cb) {
+ size_t docs = 0;
+ for (auto input = prods[id]->getNext(); input.isAdvanced();
+ input = prods[id]->getNext()) {
+ size_t value = input.getDocument()["a"].getInt();
+
+ ASSERT(value >= id * 100);
+ ASSERT(value < (id + 1) * 100);
+
+ ++docs;
+ }
+
+ ASSERT_EQ(docs, nDocs / nConsumers);
+ });
+
+ handles.emplace_back(std::move(handle.getValue()));
+ }
+
+ for (auto& h : handles)
+ _executor->wait(h);
+}
+
+TEST_F(DocumentSourceExchangeTest, RangeRandomExchangeNConsumer) {
+ const size_t nDocs = 500;
+ auto source = getRandomMockSource(nDocs, getNewSeed());
+
+ std::vector<BSONObj> boundaries;
+ boundaries.push_back(BSON("a" << MINKEY));
+ boundaries.push_back(BSON("a" << 100));
+ boundaries.push_back(BSON("a" << 200));
+ boundaries.push_back(BSON("a" << 300));
+ boundaries.push_back(BSON("a" << 400));
+ boundaries.push_back(BSON("a" << MAXKEY));
+
+ const size_t nConsumers = boundaries.size() - 1;
+
+ ASSERT(nDocs % nConsumers == 0);
+
+ ExchangeSpec spec;
+ spec.setPolicy(ExchangePolicyEnum::kRange);
+ spec.setKey(BSON("a" << 1));
+ spec.setBoundaries(boundaries);
+ spec.setConsumers(nConsumers);
+ spec.setBufferSize(1024);
+
+ boost::intrusive_ptr<Exchange> ex = new Exchange(spec);
+
+ std::vector<boost::intrusive_ptr<DocumentSourceExchange>> prods;
+
+ for (size_t idx = 0; idx < nConsumers; ++idx) {
+ prods.push_back(new DocumentSourceExchange(getExpCtx(), ex, idx));
+ prods.back()->setSource(source.get());
+ }
+
+ std::vector<executor::TaskExecutor::CallbackHandle> handles;
+
+ AtomicWord<size_t> processedDocs{0};
+
+ for (size_t id = 0; id < nConsumers; ++id) {
+ auto handle = _executor->scheduleWork(
+ [prods, id, &processedDocs](const executor::TaskExecutor::CallbackArgs& cb) {
+ PseudoRandom prng(getNewSeed());
+
+ auto input = prods[id]->getNext();
+
+ size_t docs = 0;
+ for (; input.isAdvanced(); input = prods[id]->getNext()) {
+ size_t value = input.getDocument()["a"].getInt();
+
+ ASSERT(value >= id * 100);
+ ASSERT(value < (id + 1) * 100);
+
+ ++docs;
+
+ // This helps randomizing thread scheduling forcing different threads to load
+ // buffers. The sleep API is inherently imprecise so we cannot guarantee 100%
+ // reproducibility.
+ sleepmillis(prng.nextInt32() % 50 + 1);
+ }
+ processedDocs.fetchAndAdd(docs);
+ });
+
+ handles.emplace_back(std::move(handle.getValue()));
+ }
+
+ for (auto& h : handles)
+ _executor->wait(h);
+
+ ASSERT_EQ(nDocs, processedDocs.load());
+}
+} // namespace mongo