summaryrefslogtreecommitdiff
path: root/jstests/noPassthroughWithMongod/exchangeProducer.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/noPassthroughWithMongod/exchangeProducer.js')
-rw-r--r--jstests/noPassthroughWithMongod/exchangeProducer.js487
1 files changed, 239 insertions, 248 deletions
diff --git a/jstests/noPassthroughWithMongod/exchangeProducer.js b/jstests/noPassthroughWithMongod/exchangeProducer.js
index f3f23ee4e0d..5d609e04634 100644
--- a/jstests/noPassthroughWithMongod/exchangeProducer.js
+++ b/jstests/noPassthroughWithMongod/exchangeProducer.js
@@ -7,138 +7,258 @@
TestData.disableImplicitSessions = true;
(function() {
- "use strict";
+"use strict";
- load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
+load("jstests/aggregation/extras/utils.js"); // For assertErrorCode.
- const coll = db.testCollection;
- coll.drop();
+const coll = db.testCollection;
+coll.drop();
- const numDocs = 10000;
+const numDocs = 10000;
- const bulk = coll.initializeUnorderedBulkOp();
- for (let i = 0; i < numDocs; ++i) {
- bulk.insert({a: i, b: 'abcdefghijklmnopqrstuvxyz', c: {d: i}, e: [0, {f: i}]});
- }
+const bulk = coll.initializeUnorderedBulkOp();
+for (let i = 0; i < numDocs; ++i) {
+ bulk.insert({a: i, b: 'abcdefghijklmnopqrstuvxyz', c: {d: i}, e: [0, {f: i}]});
+}
+
+assert.commandWorked(bulk.execute());
- assert.commandWorked(bulk.execute());
-
- /**
- * A consumer runs in a parallel shell reading the cursor until exhausted and then asserts that
- * it got the correct number of documents.
- *
- * @param {Object} cursor - the cursor that a consumer will read
- * @param {int} count - number of expected documents
- */
- function countingConsumer(cursor, count) {
- let shell = startParallelShell(`{
+/**
+ * A consumer runs in a parallel shell reading the cursor until exhausted and then asserts that
+ * it got the correct number of documents.
+ *
+ * @param {Object} cursor - the cursor that a consumer will read
+ * @param {int} count - number of expected documents
+ */
+function countingConsumer(cursor, count) {
+ let shell = startParallelShell(`{
const dbCursor = new DBCommandCursor(db, ${tojsononeline(cursor)});
assert.eq(${count}, dbCursor.itcount())
}`);
- return shell;
- }
+ return shell;
+}
- /**
- * A consumer runs in a parallel shell reading the cursor expecting an error.
- *
- * @param {Object} cursor - the cursor that a consumer will read
- * @param {int} code - the expected error code
- */
- function failingConsumer(cursor, code) {
- let shell = startParallelShell(`{
+/**
+ * A consumer runs in a parallel shell reading the cursor expecting an error.
+ *
+ * @param {Object} cursor - the cursor that a consumer will read
+ * @param {int} code - the expected error code
+ */
+function failingConsumer(cursor, code) {
+ let shell = startParallelShell(`{
const dbCursor = new DBCommandCursor(db, ${tojsononeline(cursor)});
const cmdRes = db.runCommand({getMore: dbCursor._cursorid, collection: dbCursor._collName});
assert.commandFailedWithCode(cmdRes, ${code});
}`);
- return shell;
- }
-
- const numConsumers = 4;
- // For simplicity we assume that we can evenly distribute documents among consumers.
- assert.eq(0, numDocs % numConsumers);
-
- (function testParameterValidation() {
- const tooManyConsumers = 101;
- assertErrorCode(coll, [], 50950, "Expected too many consumers", {
- exchange: {
- policy: "roundrobin",
- consumers: NumberInt(tooManyConsumers),
- bufferSize: NumberInt(1024)
- },
- cursor: {batchSize: 0}
- });
-
- const bufferTooLarge = 200 * 1024 * 1024; // 200 MB
- assertErrorCode(coll, [], 50951, "Expected buffer too large", {
- exchange: {
- policy: "roundrobin",
- consumers: NumberInt(numConsumers),
- bufferSize: NumberInt(bufferTooLarge)
- },
- cursor: {batchSize: 0}
- });
+ return shell;
+}
+
+const numConsumers = 4;
+// For simplicity we assume that we can evenly distribute documents among consumers.
+assert.eq(0, numDocs % numConsumers);
+
+(function testParameterValidation() {
+ const tooManyConsumers = 101;
+ assertErrorCode(coll, [], 50950, "Expected too many consumers", {
+ exchange: {
+ policy: "roundrobin",
+ consumers: NumberInt(tooManyConsumers),
+ bufferSize: NumberInt(1024)
+ },
+ cursor: {batchSize: 0}
+ });
+
+ const bufferTooLarge = 200 * 1024 * 1024; // 200 MB
+ assertErrorCode(coll, [], 50951, "Expected buffer too large", {
+ exchange: {
+ policy: "roundrobin",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(bufferTooLarge)
+ },
+ cursor: {batchSize: 0}
+ });
+})();
- })();
+/**
+ * RoundRobin - evenly distribute documents to consumers.
+ */
+(function testRoundRobin() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ exchange:
+ {policy: "roundrobin", consumers: NumberInt(numConsumers), bufferSize: NumberInt(1024)},
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers));
+ }
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+})();
- /**
- * RoundRobin - evenly distribute documents to consumers.
- */
- (function testRoundRobin() {
- let res = assert.commandWorked(db.runCommand({
- aggregate: coll.getName(),
- pipeline: [],
- exchange: {
- policy: "roundrobin",
- consumers: NumberInt(numConsumers),
- bufferSize: NumberInt(1024)
- },
- cursor: {batchSize: 0}
- }));
- assert.eq(numConsumers, res.cursors.length);
+/**
+ * Broadcast - send a document to all consumers.
+ */
+(function testBroadcast() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ exchange:
+ {policy: "broadcast", consumers: NumberInt(numConsumers), bufferSize: NumberInt(1024)},
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells.push(countingConsumer(res.cursors[i], numDocs));
+ }
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+})();
- let parallelShells = [];
+/**
+ * Range - send documents to consumer based on the range of values of the 'a' field.
+ */
+(function testRange() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ exchange: {
+ policy: "keyRange",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(1024),
+ key: {a: 1},
+ boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}],
+ consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
+ },
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers));
+ }
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+})();
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers));
- }
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells[i]();
- }
- })();
+/**
+ * Range with more complex pipeline.
+ */
+(function testRangeComplex() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$match: {a: {$gte: 5000}}}, {$sort: {a: -1}}, {$project: {_id: 0, b: 0}}],
+ exchange: {
+ policy: "keyRange",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(1024),
+ key: {a: 1},
+ boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}],
+ consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
+ },
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ parallelShells.push(countingConsumer(res.cursors[0], 0));
+ parallelShells.push(countingConsumer(res.cursors[1], 0));
+ parallelShells.push(countingConsumer(res.cursors[2], 2500));
+ parallelShells.push(countingConsumer(res.cursors[3], 2500));
+
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+})();
- /**
- * Broadcast - send a document to all consumers.
- */
- (function testBroadcast() {
- let res = assert.commandWorked(db.runCommand({
- aggregate: coll.getName(),
- pipeline: [],
- exchange: {
- policy: "broadcast",
- consumers: NumberInt(numConsumers),
- bufferSize: NumberInt(1024)
- },
- cursor: {batchSize: 0}
- }));
- assert.eq(numConsumers, res.cursors.length);
+/**
+ * Range with a dotted path.
+ */
+(function testRangeDottedPath() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ exchange: {
+ policy: "keyRange",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(1024),
+ key: {"c.d": 1},
+ boundaries:
+ [{"c.d": MinKey}, {"c.d": 2500}, {"c.d": 5000}, {"c.d": 7500}, {"c.d": MaxKey}],
+ consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
+ },
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers));
+ }
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+})();
- let parallelShells = [];
+/**
+ * Range with a dotted path and array.
+ */
+(function testRangeDottedPath() {
+ let res = assert.commandWorked(db.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [],
+ exchange: {
+ policy: "keyRange",
+ consumers: NumberInt(numConsumers),
+ bufferSize: NumberInt(1024),
+ key: {"e.f": 1},
+ boundaries:
+ [{"e.f": MinKey}, {"e.f": 2500}, {"e.f": 5000}, {"e.f": 7500}, {"e.f": MaxKey}],
+ consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
+ },
+ cursor: {batchSize: 0}
+ }));
+ assert.eq(numConsumers, res.cursors.length);
+
+ let parallelShells = [];
+
+ // The e.f field contains an array and hence the exchange cannot compute the range. Instead
+ // it sends all such documents to the consumer 0 by fiat.
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells.push(countingConsumer(res.cursors[i], i == 0 ? numDocs : 0));
+ }
+ for (let i = 0; i < numConsumers; ++i) {
+ parallelShells[i]();
+ }
+})();
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells.push(countingConsumer(res.cursors[i], numDocs));
- }
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells[i]();
- }
- })();
+/**
+ * Range - simulate an exception in loading the batch.
+ */
+(function testRangeFailLoad() {
+ const kFailPointName = "exchangeFailLoadNextBatch";
+ try {
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: kFailPointName, mode: "alwaysOn"}));
- /**
- * Range - send documents to consumer based on the range of values of the 'a' field.
- */
- (function testRange() {
let res = assert.commandWorked(db.runCommand({
aggregate: coll.getName(),
pipeline: [],
@@ -155,148 +275,19 @@ TestData.disableImplicitSessions = true;
assert.eq(numConsumers, res.cursors.length);
let parallelShells = [];
+ failingConsumer(res.cursors[0], ErrorCodes.FailPointEnabled)();
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers));
+ // After the first consumer sees an error, each subsequent consumer should see an
+ // 'ExchangePassthrough' error.
+ for (let i = 0; i < numConsumers - 1; ++i) {
+ parallelShells.push(
+ failingConsumer(res.cursors[i + 1], ErrorCodes.ExchangePassthrough));
}
- for (let i = 0; i < numConsumers; ++i) {
+ for (let i = 0; i < numConsumers - 1; ++i) {
parallelShells[i]();
}
- })();
-
- /**
- * Range with more complex pipeline.
- */
- (function testRangeComplex() {
- let res = assert.commandWorked(db.runCommand({
- aggregate: coll.getName(),
- pipeline: [{$match: {a: {$gte: 5000}}}, {$sort: {a: -1}}, {$project: {_id: 0, b: 0}}],
- exchange: {
- policy: "keyRange",
- consumers: NumberInt(numConsumers),
- bufferSize: NumberInt(1024),
- key: {a: 1},
- boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}],
- consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
- },
- cursor: {batchSize: 0}
- }));
- assert.eq(numConsumers, res.cursors.length);
-
- let parallelShells = [];
-
- parallelShells.push(countingConsumer(res.cursors[0], 0));
- parallelShells.push(countingConsumer(res.cursors[1], 0));
- parallelShells.push(countingConsumer(res.cursors[2], 2500));
- parallelShells.push(countingConsumer(res.cursors[3], 2500));
-
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells[i]();
- }
- })();
-
- /**
- * Range with a dotted path.
- */
- (function testRangeDottedPath() {
- let res = assert.commandWorked(db.runCommand({
- aggregate: coll.getName(),
- pipeline: [],
- exchange: {
- policy: "keyRange",
- consumers: NumberInt(numConsumers),
- bufferSize: NumberInt(1024),
- key: {"c.d": 1},
- boundaries:
- [{"c.d": MinKey}, {"c.d": 2500}, {"c.d": 5000}, {"c.d": 7500}, {"c.d": MaxKey}],
- consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
- },
- cursor: {batchSize: 0}
- }));
- assert.eq(numConsumers, res.cursors.length);
-
- let parallelShells = [];
-
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells.push(countingConsumer(res.cursors[i], numDocs / numConsumers));
- }
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells[i]();
- }
- })();
-
- /**
- * Range with a dotted path and array.
- */
- (function testRangeDottedPath() {
- let res = assert.commandWorked(db.runCommand({
- aggregate: coll.getName(),
- pipeline: [],
- exchange: {
- policy: "keyRange",
- consumers: NumberInt(numConsumers),
- bufferSize: NumberInt(1024),
- key: {"e.f": 1},
- boundaries:
- [{"e.f": MinKey}, {"e.f": 2500}, {"e.f": 5000}, {"e.f": 7500}, {"e.f": MaxKey}],
- consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
- },
- cursor: {batchSize: 0}
- }));
- assert.eq(numConsumers, res.cursors.length);
-
- let parallelShells = [];
-
- // The e.f field contains an array and hence the exchange cannot compute the range. Instead
- // it sends all such documents to the consumer 0 by fiat.
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells.push(countingConsumer(res.cursors[i], i == 0 ? numDocs : 0));
- }
- for (let i = 0; i < numConsumers; ++i) {
- parallelShells[i]();
- }
- })();
-
- /**
- * Range - simulate an exception in loading the batch.
- */
- (function testRangeFailLoad() {
- const kFailPointName = "exchangeFailLoadNextBatch";
- try {
- assert.commandWorked(
- db.adminCommand({configureFailPoint: kFailPointName, mode: "alwaysOn"}));
-
- let res = assert.commandWorked(db.runCommand({
- aggregate: coll.getName(),
- pipeline: [],
- exchange: {
- policy: "keyRange",
- consumers: NumberInt(numConsumers),
- bufferSize: NumberInt(1024),
- key: {a: 1},
- boundaries: [{a: MinKey}, {a: 2500}, {a: 5000}, {a: 7500}, {a: MaxKey}],
- consumerIds: [NumberInt(0), NumberInt(1), NumberInt(2), NumberInt(3)]
- },
- cursor: {batchSize: 0}
- }));
- assert.eq(numConsumers, res.cursors.length);
-
- let parallelShells = [];
- failingConsumer(res.cursors[0], ErrorCodes.FailPointEnabled)();
-
- // After the first consumer sees an error, each subsequent consumer should see an
- // 'ExchangePassthrough' error.
- for (let i = 0; i < numConsumers - 1; ++i) {
- parallelShells.push(
- failingConsumer(res.cursors[i + 1], ErrorCodes.ExchangePassthrough));
- }
- for (let i = 0; i < numConsumers - 1; ++i) {
- parallelShells[i]();
- }
- } finally {
- assert.commandWorked(
- db.adminCommand({configureFailPoint: kFailPointName, mode: "off"}));
- }
- })();
-
+ } finally {
+ assert.commandWorked(db.adminCommand({configureFailPoint: kFailPointName, mode: "off"}));
+ }
+})();
})();