diff options
author | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-08-09 17:09:08 -0400 |
---|---|---|
committer | Martin Neupauer <martin.neupauer@mongodb.com> | 2018-08-30 14:17:06 -0400 |
commit | 47306b9f203abee01f6fc54aa8d7ab8f8e25c8c9 (patch) | |
tree | 9d1734d0958b5f07afd6dad4adede420696fba3a /jstests/noPassthroughWithMongod | |
parent | b46de3f6c06fab5cf9b7ea0f4176b32ff544a4bf (diff) | |
download | mongo-47306b9f203abee01f6fc54aa8d7ab8f8e25c8c9.tar.gz |
SERVER-35905 Plug pieces together to perform a distributed when applicable
Diffstat (limited to 'jstests/noPassthroughWithMongod')
-rw-r--r-- | jstests/noPassthroughWithMongod/exchangeProducer.js | 156 |
1 files changed, 156 insertions, 0 deletions
diff --git a/jstests/noPassthroughWithMongod/exchangeProducer.js b/jstests/noPassthroughWithMongod/exchangeProducer.js new file mode 100644 index 00000000000..024e8d67a2e --- /dev/null +++ b/jstests/noPassthroughWithMongod/exchangeProducer.js @@ -0,0 +1,156 @@ +/** + * Basic exchange producer tests. We test various document distribution policies. + */ + +// This test runs a getMore in a parallel shell, which will not inherit the implicit session of +// the cursor establishing command. +TestData.disableImplicitSessions = true; + +(function() { + "use strict"; + + const coll = db.testCollection; + coll.drop(); + + const numDocs = 10000; + + const bulk = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < numDocs; ++i) { + bulk.insert({a: i, b: 'abcdefghijklmnopqrstuvxyz'}); + } + + assert.commandWorked(bulk.execute()); + + /** + * A consumer runs in a parallel shell reading the cursor until exhausted and then asserts that + * it got the correct number of documents. + * + * @param {Object} cursor - the cursor that a consumer will read + * @param {int} count - number of expected documents + */ + function countingConsumer(cursor, count) { + let shell = startParallelShell(`{ + const dbCursor = new DBCommandCursor(db, ${tojsononeline(cursor)}); + + assert.eq(${count}, dbCursor.itcount()) + }`); + + return shell; + } + + const numConsumers = 4; + // For simplicity we assume that we can evenly distribute documents among consumers. + assert.eq(0, numDocs % numConsumers); + + /** + * RoundRobin - evenly distribute documents to consumers. + */ + (function testRoundRobin() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + exchange: { + policy: "roundrobin", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(1024) + }, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + for (let i = 0; i < numConsumers; ++i) { + parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers)); + } + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } + })(); + + /** + * Broadcast - send a document to all consumers. + */ + (function testBroadcast() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + exchange: { + policy: "broadcast", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(1024) + }, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + for (let i = 0; i < numConsumers; ++i) { + parallelShells.push(countingConsumer(res.cursors[i], numDocs)); + } + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } + })(); + + /** + * Range - send documents to consumer based on the range of values of the 'a' field. + */ + (function testRange() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + exchange: { + policy: "range", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(1024), + key: {a: 1}, + boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}], + consumerids: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] + }, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + for (let i = 0; i < numConsumers; ++i) { + parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers)); + } + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } + })(); + + /** + * Range with more complex pipeline. + */ + (function testRangeComplex() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{$match: {a: {$gte: 5000}}}, {$sort: {a: -1}}, {$project: {_id: 0, b: 0}}], + exchange: { + policy: "range", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(1024), + key: {a: 1}, + boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}], + consumerids: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] + }, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + parallelShells.push(countingConsumer(res.cursors[0], 0)); + parallelShells.push(countingConsumer(res.cursors[1], 0)); + parallelShells.push(countingConsumer(res.cursors[2], 2500)); + parallelShells.push(countingConsumer(res.cursors[3], 2500)); + + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } + })(); +})(); |