summaryrefslogtreecommitdiff
path: root/jstests/noPassthroughWithMongod
diff options
context:
space:
mode:
authorMartin Neupauer <martin.neupauer@mongodb.com>2018-08-09 17:09:08 -0400
committerMartin Neupauer <martin.neupauer@mongodb.com>2018-08-30 14:17:06 -0400
commit47306b9f203abee01f6fc54aa8d7ab8f8e25c8c9 (patch)
tree9d1734d0958b5f07afd6dad4adede420696fba3a /jstests/noPassthroughWithMongod
parentb46de3f6c06fab5cf9b7ea0f4176b32ff544a4bf (diff)
downloadmongo-47306b9f203abee01f6fc54aa8d7ab8f8e25c8c9.tar.gz
SERVER-35905 Plug pieces together to perform a distributed when applicable
Diffstat (limited to 'jstests/noPassthroughWithMongod')
-rw-r--r--jstests/noPassthroughWithMongod/exchangeProducer.js156
1 files changed, 156 insertions, 0 deletions
diff --git a/jstests/noPassthroughWithMongod/exchangeProducer.js b/jstests/noPassthroughWithMongod/exchangeProducer.js
new file mode 100644
index 00000000000..024e8d67a2e
--- /dev/null
+++ b/jstests/noPassthroughWithMongod/exchangeProducer.js
@@ -0,0 +1,156 @@
+/**
+ * Basic exchange producer tests. We test various document distribution policies.
+ */
+
+// This test runs a getMore in a parallel shell, which will not inherit the implicit session of
+// the cursor establishing command.
+TestData.disableImplicitSessions = true;
+
+(function() {
+ "use strict";
+
+ const coll = db.testCollection;
+ coll.drop();
+
+ const numDocs = 10000;
+
+ const bulk = coll.initializeUnorderedBulkOp();
+ for (let i = 0; i < numDocs; ++i) {
+ bulk.insert({a: i, b: 'abcdefghijklmnopqrstuvxyz'});
+ }
+
+ assert.commandWorked(bulk.execute());
+
+ /**
+ * A consumer runs in a parallel shell reading the cursor until exhausted and then asserts that
+ * it got the correct number of documents.
+ *
+ * @param {Object} cursor - the cursor that a consumer will read
+ * @param {int} count - number of expected documents
+ */
+ function countingConsumer(cursor, count) {
+ let shell = startParallelShell(`{
+ const dbCursor = new DBCommandCursor(db, ${tojsononeline(cursor)});
+
+ assert.eq(${count}, dbCursor.itcount())
+ }`);
+
+ return shell;
+ }
+
+ const numConsumers = 4;
+ // For simplicity we assume that we can evenly distribute documents among consumers.
+ assert.eq(0, numDocs % numConsumers);
+
+ /**
+ * RoundRobin - evenly distribute documents to consumers.
+ */
+ (function testRoundRobin() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ exchange: {
+ policy: "roundrobin",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(1024)
+ },
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers));
+ }
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+ })();
+
+ /**
+ * Broadcast - send a document to all consumers.
+ */
+ (function testBroadcast() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ exchange: {
+ policy: "broadcast",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(1024)
+ },
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells.push(countingConsumer(res.cursors[i], numDocs));
+ }
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+ })();
+
+ /**
+ * Range - send documents to consumer based on the range of values of the 'a' field.
+ */
+ (function testRange() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ exchange: {
+ policy: "range",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(1024),
+ key: {a: 1},
+ boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}],
+ consumerids: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
+ },
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers));
+ }
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+ })();
+
+ /**
+ * Range with more complex pipeline.
+ */
+ (function testRangeComplex() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$match: {a: {$gte: 5000}}}, {$sort: {a: -1}}, {$project: {_id: 0, b: 0}}],
+ exchange: {
+ policy: "range",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(1024),
+ key: {a: 1},
+ boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}],
+ consumerids: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
+ },
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ parallelShells.push(countingConsumer(res.cursors[0], 0));
+ parallelShells.push(countingConsumer(res.cursors[1], 0));
+ parallelShells.push(countingConsumer(res.cursors[2], 2500));
+ parallelShells.push(countingConsumer(res.cursors[3], 2500));
+
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+ })();
+})();