diff options
Diffstat (limited to 'jstests/noPassthroughWithMongod/exchangeProducer.js')
-rw-r--r-- | jstests/noPassthroughWithMongod/exchangeProducer.js | 487 |
1 files changed, 239 insertions, 248 deletions
diff --git a/jstests/noPassthroughWithMongod/exchangeProducer.js b/jstests/noPassthroughWithMongod/exchangeProducer.js index f3f23ee4e0d..5d609e04634 100644 --- a/jstests/noPassthroughWithMongod/exchangeProducer.js +++ b/jstests/noPassthroughWithMongod/exchangeProducer.js @@ -7,138 +7,258 @@ TestData.disableImplicitSessions = true; (function() { - "use strict"; +"use strict"; - load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. +load("jstests/aggregation/extras/utils.js"); // For assertErrorCode. - const coll = db.testCollection; - coll.drop(); +const coll = db.testCollection; +coll.drop(); - const numDocs = 10000; +const numDocs = 10000; - const bulk = coll.initializeUnorderedBulkOp(); - for (let i = 0; i < numDocs; ++i) { - bulk.insert({a: i, b: 'abcdefghijklmnopqrstuvxyz', c: {d: i}, e: [0, {f: i}]}); - } +const bulk = coll.initializeUnorderedBulkOp(); +for (let i = 0; i < numDocs; ++i) { + bulk.insert({a: i, b: 'abcdefghijklmnopqrstuvxyz', c: {d: i}, e: [0, {f: i}]}); +} + +assert.commandWorked(bulk.execute()); - assert.commandWorked(bulk.execute()); - - /** - * A consumer runs in a parallel shell reading the cursor until exhausted and then asserts that - * it got the correct number of documents. - * - * @param {Object} cursor - the cursor that a consumer will read - * @param {int} count - number of expected documents - */ - function countingConsumer(cursor, count) { - let shell = startParallelShell(`{ +/** + * A consumer runs in a parallel shell reading the cursor until exhausted and then asserts that + * it got the correct number of documents. + * + * @param {Object} cursor - the cursor that a consumer will read + * @param {int} count - number of expected documents + */ +function countingConsumer(cursor, count) { + let shell = startParallelShell(`{ const dbCursor = new DBCommandCursor(db, ${tojsononeline(cursor)}); assert.eq(${count}, dbCursor.itcount()) }`); - return shell; - } + return shell; +} - /** - * A consumer runs in a parallel shell reading the cursor expecting an error. - * - * @param {Object} cursor - the cursor that a consumer will read - * @param {int} code - the expected error code - */ - function failingConsumer(cursor, code) { - let shell = startParallelShell(`{ +/** + * A consumer runs in a parallel shell reading the cursor expecting an error. + * + * @param {Object} cursor - the cursor that a consumer will read + * @param {int} code - the expected error code + */ +function failingConsumer(cursor, code) { + let shell = startParallelShell(`{ const dbCursor = new DBCommandCursor(db, ${tojsononeline(cursor)}); const cmdRes = db.runCommand({getMore: dbCursor._cursorid, collection: dbCursor._collName}); assert.commandFailedWithCode(cmdRes, ${code}); }`); - return shell; - } - - const numConsumers = 4; - // For simplicity we assume that we can evenly distribute documents among consumers. - assert.eq(0, numDocs % numConsumers); - - (function testParameterValidation() { - const tooManyConsumers = 101; - assertErrorCode(coll, [], 50950, "Expected too many consumers", { - exchange: { - policy: "roundrobin", - consumers: NumberInt(tooManyConsumers), - bufferSize: NumberInt(1024) - }, - cursor: {batchSize: 0} - }); - - const bufferTooLarge = 200 * 1024 * 1024; // 200 MB - assertErrorCode(coll, [], 50951, "Expected buffer too large", { - exchange: { - policy: "roundrobin", - consumers: NumberInt(numConsumers), - bufferSize: NumberInt(bufferTooLarge) - }, - cursor: {batchSize: 0} - }); + return shell; +} + +const numConsumers = 4; +// For simplicity we assume that we can evenly distribute documents among consumers. +assert.eq(0, numDocs % numConsumers); + +(function testParameterValidation() { + const tooManyConsumers = 101; + assertErrorCode(coll, [], 50950, "Expected too many consumers", { + exchange: { + policy: "roundrobin", + consumers: NumberInt(tooManyConsumers), + bufferSize: NumberInt(1024) + }, + cursor: {batchSize: 0} + }); + + const bufferTooLarge = 200 * 1024 * 1024; // 200 MB + assertErrorCode(coll, [], 50951, "Expected buffer too large", { + exchange: { + policy: "roundrobin", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(bufferTooLarge) + }, + cursor: {batchSize: 0} + }); +})(); - })(); +/** + * RoundRobin - evenly distribute documents to consumers. + */ +(function testRoundRobin() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + exchange: + {policy: "roundrobin", consumers: NumberInt(numConsumers), bufferSize: NumberInt(1024)}, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + for (let i = 0; i < numConsumers; ++i) { + parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers)); + } + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } +})(); - /** - * RoundRobin - evenly distribute documents to consumers. - */ - (function testRoundRobin() { - let res = assert.commandWorked(db.runCommand({ - aggregate: coll.getName(), - pipeline: [], - exchange: { - policy: "roundrobin", - consumers: NumberInt(numConsumers), - bufferSize: NumberInt(1024) - }, - cursor: {batchSize: 0} - })); - assert.eq(numConsumers, res.cursors.length); +/** + * Broadcast - send a document to all consumers. + */ +(function testBroadcast() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + exchange: + {policy: "broadcast", consumers: NumberInt(numConsumers), bufferSize: NumberInt(1024)}, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + for (let i = 0; i < numConsumers; ++i) { + parallelShells.push(countingConsumer(res.cursors[i], numDocs)); + } + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } +})(); - let parallelShells = []; +/** + * Range - send documents to consumer based on the range of values of the 'a' field. + */ +(function testRange() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + exchange: { + policy: "keyRange", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(1024), + key: {a: 1}, + boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}], + consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] + }, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + for (let i = 0; i < numConsumers; ++i) { + parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers)); + } + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } +})(); - for (let i = 0; i < numConsumers; ++i) { - parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers)); - } - for (let i = 0; i < numConsumers; ++i) { - parallelShells[i](); - } - })(); +/** + * Range with more complex pipeline. + */ +(function testRangeComplex() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{$match: {a: {$gte: 5000}}}, {$sort: {a: -1}}, {$project: {_id: 0, b: 0}}], + exchange: { + policy: "keyRange", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(1024), + key: {a: 1}, + boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}], + consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] + }, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + parallelShells.push(countingConsumer(res.cursors[0], 0)); + parallelShells.push(countingConsumer(res.cursors[1], 0)); + parallelShells.push(countingConsumer(res.cursors[2], 2500)); + parallelShells.push(countingConsumer(res.cursors[3], 2500)); + + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } +})(); - /** - * Broadcast - send a document to all consumers. - */ - (function testBroadcast() { - let res = assert.commandWorked(db.runCommand({ - aggregate: coll.getName(), - pipeline: [], - exchange: { - policy: "broadcast", - consumers: NumberInt(numConsumers), - bufferSize: NumberInt(1024) - }, - cursor: {batchSize: 0} - })); - assert.eq(numConsumers, res.cursors.length); +/** + * Range with a dotted path. + */ +(function testRangeDottedPath() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + exchange: { + policy: "keyRange", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(1024), + key: {"c.d": 1}, + boundaries: + [{"c.d": MinKey}, {"c.d": 2500}, {"c.d": 5000}, {"c.d": 7500}, {"c.d": MaxKey}], + consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] + }, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + for (let i = 0; i < numConsumers; ++i) { + parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers)); + } + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } +})(); - let parallelShells = []; +/** + * Range with a dotted path and array. + */ +(function testRangeDottedPath() { + let res = assert.commandWorked(db.runCommand({ + aggregate: coll.getName(), + pipeline: [], + exchange: { + policy: "keyRange", + consumers: NumberInt(numConsumers), + bufferSize: NumberInt(1024), + key: {"e.f": 1}, + boundaries: + [{"e.f": MinKey}, {"e.f": 2500}, {"e.f": 5000}, {"e.f": 7500}, {"e.f": MaxKey}], + consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] + }, + cursor: {batchSize: 0} + })); + assert.eq(numConsumers, res.cursors.length); + + let parallelShells = []; + + // The e.f field contains an array and hence the exchange cannot compute the range. Instead + // it sends all such documents to the consumer 0 by fiat. + for (let i = 0; i < numConsumers; ++i) { + parallelShells.push(countingConsumer(res.cursors[i], i == 0 ? numDocs : 0)); + } + for (let i = 0; i < numConsumers; ++i) { + parallelShells[i](); + } +})(); - for (let i = 0; i < numConsumers; ++i) { - parallelShells.push(countingConsumer(res.cursors[i], numDocs)); - } - for (let i = 0; i < numConsumers; ++i) { - parallelShells[i](); - } - })(); +/** + * Range - simulate an exception in loading the batch. + */ +(function testRangeFailLoad() { + const kFailPointName = "exchangeFailLoadNextBatch"; + try { + assert.commandWorked( + db.adminCommand({configureFailPoint: kFailPointName, mode: "alwaysOn"})); - /** - * Range - send documents to consumer based on the range of values of the 'a' field. - */ - (function testRange() { let res = assert.commandWorked(db.runCommand({ aggregate: coll.getName(), pipeline: [], @@ -155,148 +275,19 @@ TestData.disableImplicitSessions = true; assert.eq(numConsumers, res.cursors.length); let parallelShells = []; + failingConsumer(res.cursors[0], ErrorCodes.FailPointEnabled)(); - for (let i = 0; i < numConsumers; ++i) { - parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers)); + // After the first consumer sees an error, each subsequent consumer should see an + // 'ExchangePassthrough' error. + for (let i = 0; i < numConsumers - 1; ++i) { + parallelShells.push( + failingConsumer(res.cursors[i + 1], ErrorCodes.ExchangePassthrough)); } - for (let i = 0; i < numConsumers; ++i) { + for (let i = 0; i < numConsumers - 1; ++i) { parallelShells[i](); } - })(); - - /** - * Range with more complex pipeline. - */ - (function testRangeComplex() { - let res = assert.commandWorked(db.runCommand({ - aggregate: coll.getName(), - pipeline: [{$match: {a: {$gte: 5000}}}, {$sort: {a: -1}}, {$project: {_id: 0, b: 0}}], - exchange: { - policy: "keyRange", - consumers: NumberInt(numConsumers), - bufferSize: NumberInt(1024), - key: {a: 1}, - boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}], - consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] - }, - cursor: {batchSize: 0} - })); - assert.eq(numConsumers, res.cursors.length); - - let parallelShells = []; - - parallelShells.push(countingConsumer(res.cursors[0], 0)); - parallelShells.push(countingConsumer(res.cursors[1], 0)); - parallelShells.push(countingConsumer(res.cursors[2], 2500)); - parallelShells.push(countingConsumer(res.cursors[3], 2500)); - - for (let i = 0; i < numConsumers; ++i) { - parallelShells[i](); - } - })(); - - /** - * Range with a dotted path. - */ - (function testRangeDottedPath() { - let res = assert.commandWorked(db.runCommand({ - aggregate: coll.getName(), - pipeline: [], - exchange: { - policy: "keyRange", - consumers: NumberInt(numConsumers), - bufferSize: NumberInt(1024), - key: {"c.d": 1}, - boundaries: - [{"c.d": MinKey}, {"c.d": 2500}, {"c.d": 5000}, {"c.d": 7500}, {"c.d": MaxKey}], - consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] - }, - cursor: {batchSize: 0} - })); - assert.eq(numConsumers, res.cursors.length); - - let parallelShells = []; - - for (let i = 0; i < numConsumers; ++i) { - parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers)); - } - for (let i = 0; i < numConsumers; ++i) { - parallelShells[i](); - } - })(); - - /** - * Range with a dotted path and array. - */ - (function testRangeDottedPath() { - let res = assert.commandWorked(db.runCommand({ - aggregate: coll.getName(), - pipeline: [], - exchange: { - policy: "keyRange", - consumers: NumberInt(numConsumers), - bufferSize: NumberInt(1024), - key: {"e.f": 1}, - boundaries: - [{"e.f": MinKey}, {"e.f": 2500}, {"e.f": 5000}, {"e.f": 7500}, {"e.f": MaxKey}], - consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] - }, - cursor: {batchSize: 0} - })); - assert.eq(numConsumers, res.cursors.length); - - let parallelShells = []; - - // The e.f field contains an array and hence the exchange cannot compute the range. Instead - // it sends all such documents to the consumer 0 by fiat. - for (let i = 0; i < numConsumers; ++i) { - parallelShells.push(countingConsumer(res.cursors[i], i == 0 ? numDocs : 0)); - } - for (let i = 0; i < numConsumers; ++i) { - parallelShells[i](); - } - })(); - - /** - * Range - simulate an exception in loading the batch. - */ - (function testRangeFailLoad() { - const kFailPointName = "exchangeFailLoadNextBatch"; - try { - assert.commandWorked( - db.adminCommand({configureFailPoint: kFailPointName, mode: "alwaysOn"})); - - let res = assert.commandWorked(db.runCommand({ - aggregate: coll.getName(), - pipeline: [], - exchange: { - policy: "keyRange", - consumers: NumberInt(numConsumers), - bufferSize: NumberInt(1024), - key: {a: 1}, - boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}], - consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)] - }, - cursor: {batchSize: 0} - })); - assert.eq(numConsumers, res.cursors.length); - - let parallelShells = []; - failingConsumer(res.cursors[0], ErrorCodes.FailPointEnabled)(); - - // After the first consumer sees an error, each subsequent consumer should see an - // 'ExchangePassthrough' error. - for (let i = 0; i < numConsumers - 1; ++i) { - parallelShells.push( - failingConsumer(res.cursors[i + 1], ErrorCodes.ExchangePassthrough)); - } - for (let i = 0; i < numConsumers - 1; ++i) { - parallelShells[i](); - } - } finally { - assert.commandWorked( - db.adminCommand({configureFailPoint: kFailPointName, mode: "off"})); - } - })(); - + } finally { + assert.commandWorked(db.adminCommand({configureFailPoint: kFailPointName, mode: "off"})); + } +})(); })(); |