summaryrefslogtreecommitdiff
path: root/jstests/change_streams/shell_helper.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams/shell_helper.js')
-rw-r--r--jstests/change_streams/shell_helper.js394
1 files changed, 197 insertions, 197 deletions
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js
index a044ba76e50..29330a433e9 100644
--- a/jstests/change_streams/shell_helper.js
+++ b/jstests/change_streams/shell_helper.js
@@ -7,215 +7,215 @@
// based on the commit oplog entry, which would cause this test to fail.
// @tags: [change_stream_does_not_expect_txns]
(function() {
- "use strict";
+"use strict";
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
- load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp.
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp.
- const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper");
+const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper");
- function checkNextChange(cursor, expected) {
- assert.soon(() => cursor.hasNext());
- const nextObj = cursor.next();
- assertChangeStreamEventEq(nextObj, expected);
- return nextObj;
- }
-
- function testCommandIsCalled(testFunc, checkFunc) {
- const mongoRunCommandOriginal = Mongo.prototype.runCommand;
+function checkNextChange(cursor, expected) {
+ assert.soon(() => cursor.hasNext());
+ const nextObj = cursor.next();
+ assertChangeStreamEventEq(nextObj, expected);
+ return nextObj;
+}
- const sentinel = {};
- let cmdObjSeen = sentinel;
+function testCommandIsCalled(testFunc, checkFunc) {
+ const mongoRunCommandOriginal = Mongo.prototype.runCommand;
- Mongo.prototype.runCommand = function runCommandSpy(dbName, cmdObj, options) {
- cmdObjSeen = cmdObj;
- return mongoRunCommandOriginal.apply(this, arguments);
- };
+ const sentinel = {};
+ let cmdObjSeen = sentinel;
- try {
- assert.doesNotThrow(testFunc);
- } finally {
- Mongo.prototype.runCommand = mongoRunCommandOriginal;
- }
-
- if (cmdObjSeen === sentinel) {
- throw new Error("Mongo.prototype.runCommand() was never called: " +
- testFunc.toString());
- }
+ Mongo.prototype.runCommand = function runCommandSpy(dbName, cmdObj, options) {
+ cmdObjSeen = cmdObj;
+ return mongoRunCommandOriginal.apply(this, arguments);
+ };
- checkFunc(cmdObjSeen);
+ try {
+ assert.doesNotThrow(testFunc);
+ } finally {
+ Mongo.prototype.runCommand = mongoRunCommandOriginal;
}
- jsTestLog("Testing watch() without options");
- let changeStreamCursor = coll.watch();
-
- assert(!changeStreamCursor.hasNext());
-
- // Write the first document into the collection. We will save the resume token from this change.
- assert.writeOK(coll.insert({_id: 0, x: 1}));
- let resumeToken;
-
- // Test that each of the change stream cursors picks up the change.
- assert.soon(() => changeStreamCursor.hasNext());
- let change = changeStreamCursor.next();
- assert(!changeStreamCursor.hasNext());
- let expected = {
- documentKey: {_id: 0},
- fullDocument: {_id: 0, x: 1},
- ns: {db: "test", coll: coll.getName()},
- operationType: "insert",
- };
- assert("_id" in change, "Got unexpected change: " + tojson(change));
- // Remember the _id of the first op to resume the stream.
- resumeToken = change._id;
- // Remove the fields we cannot predict, then test that the change is as expected.
- delete change._id;
- delete change.clusterTime;
- assert.docEq(change, expected);
-
- jsTestLog("Testing watch() with pipeline");
- changeStreamCursor = coll.watch([{$project: {clusterTime: 1, docId: "$documentKey._id"}}]);
-
- // Store the cluster time of the insert as the timestamp to start from.
- const resumeTime =
- assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]}))
- .operationTime;
- jsTestLog("Insert of document with _id 1 got operationTime " + tojson(resumeTime));
-
- const changeForInsert = checkNextChange(changeStreamCursor, {docId: 1});
- jsTestLog("Change stream event for document with _id 1 reports clusterTime " +
- tojson(changeForInsert.clusterTime));
-
- // We expect the clusterTime returned by the change stream event and the operationTime returned
- // by the insert to be the same.
- assert.eq(changeForInsert.clusterTime, resumeTime);
-
- jsTestLog("Testing watch() with pipeline and resumeAfter");
- changeStreamCursor =
- coll.watch([{$project: {docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
- checkNextChange(changeStreamCursor, {docId: 1});
-
- jsTestLog("Testing watch() with pipeline and startAfter");
- changeStreamCursor =
- coll.watch([{$project: {docId: "$documentKey._id"}}], {startAfter: resumeToken});
- checkNextChange(changeStreamCursor, {docId: 1});
-
- jsTestLog("Testing watch() with pipeline and startAtOperationTime");
- changeStreamCursor =
- coll.watch([{$project: {docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime});
- checkNextChange(changeStreamCursor, {docId: 1});
-
- jsTestLog("Testing watch() with updateLookup");
- changeStreamCursor = coll.watch([], {fullDocument: "updateLookup"});
-
- assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}}));
- expected = {
- documentKey: {_id: 0},
- fullDocument: {_id: 0, x: 10},
- ns: {db: "test", coll: coll.getName()},
- operationType: "update",
- updateDescription: {removedFields: [], updatedFields: {x: 10}},
- };
- checkNextChange(changeStreamCursor, expected);
-
- jsTestLog("Testing watch() with batchSize");
- // Only test mongod because mongos uses batch size 0 for aggregate commands internally to
- // establish cursors quickly. GetMore on mongos doesn't respect batch size due to SERVER-31992.
- const isMongos = FixtureHelpers.isMongos(db);
- if (!isMongos) {
- // Increase a field by 5 times and verify the batch size is respected.
- for (let i = 0; i < 5; i++) {
- assert.writeOK(coll.update({_id: 1}, {$inc: {x: 1}}));
- }
+ if (cmdObjSeen === sentinel) {
+ throw new Error("Mongo.prototype.runCommand() was never called: " + testFunc.toString());
+ }
- // Only watch the "update" changes of the specific doc since the beginning.
- changeStreamCursor =
- coll.watch([{$match: {documentKey: {_id: 1}, operationType: "update"}}],
- {resumeAfter: resumeToken, batchSize: 2});
-
- // Check the first batch.
- assert.eq(changeStreamCursor.objsLeftInBatch(), 2);
- // Consume the first batch.
- assert(changeStreamCursor.hasNext());
- changeStreamCursor.next();
- assert(changeStreamCursor.hasNext());
- changeStreamCursor.next();
- // Confirm that the batch is empty.
- assert.eq(changeStreamCursor.objsLeftInBatch(), 0);
-
- // Check the batch returned by getMore.
- assert(changeStreamCursor.hasNext());
- assert.eq(changeStreamCursor.objsLeftInBatch(), 2);
- changeStreamCursor.next();
- assert(changeStreamCursor.hasNext());
- changeStreamCursor.next();
- assert.eq(changeStreamCursor.objsLeftInBatch(), 0);
- // There are more changes coming, just not in the batch.
- assert(changeStreamCursor.hasNext());
+ checkFunc(cmdObjSeen);
+}
+
+jsTestLog("Testing watch() without options");
+let changeStreamCursor = coll.watch();
+
+assert(!changeStreamCursor.hasNext());
+
+// Write the first document into the collection. We will save the resume token from this change.
+assert.writeOK(coll.insert({_id: 0, x: 1}));
+let resumeToken;
+
+// Test that each of the change stream cursors picks up the change.
+assert.soon(() => changeStreamCursor.hasNext());
+let change = changeStreamCursor.next();
+assert(!changeStreamCursor.hasNext());
+let expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, x: 1},
+ ns: {db: "test", coll: coll.getName()},
+ operationType: "insert",
+};
+assert("_id" in change, "Got unexpected change: " + tojson(change));
+// Remember the _id of the first op to resume the stream.
+resumeToken = change._id;
+// Remove the fields we cannot predict, then test that the change is as expected.
+delete change._id;
+delete change.clusterTime;
+assert.docEq(change, expected);
+
+jsTestLog("Testing watch() with pipeline");
+changeStreamCursor = coll.watch([{$project: {clusterTime: 1, docId: "$documentKey._id"}}]);
+
+// Store the cluster time of the insert as the timestamp to start from.
+const resumeTime =
+ assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]}))
+ .operationTime;
+jsTestLog("Insert of document with _id 1 got operationTime " + tojson(resumeTime));
+
+const changeForInsert = checkNextChange(changeStreamCursor, {docId: 1});
+jsTestLog("Change stream event for document with _id 1 reports clusterTime " +
+ tojson(changeForInsert.clusterTime));
+
+// We expect the clusterTime returned by the change stream event and the operationTime returned
+// by the insert to be the same.
+assert.eq(changeForInsert.clusterTime, resumeTime);
+
+jsTestLog("Testing watch() with pipeline and resumeAfter");
+changeStreamCursor =
+ coll.watch([{$project: {docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
+checkNextChange(changeStreamCursor, {docId: 1});
+
+jsTestLog("Testing watch() with pipeline and startAfter");
+changeStreamCursor =
+ coll.watch([{$project: {docId: "$documentKey._id"}}], {startAfter: resumeToken});
+checkNextChange(changeStreamCursor, {docId: 1});
+
+jsTestLog("Testing watch() with pipeline and startAtOperationTime");
+changeStreamCursor =
+ coll.watch([{$project: {docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime});
+checkNextChange(changeStreamCursor, {docId: 1});
+
+jsTestLog("Testing watch() with updateLookup");
+changeStreamCursor = coll.watch([], {fullDocument: "updateLookup"});
+
+assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}}));
+expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, x: 10},
+ ns: {db: "test", coll: coll.getName()},
+ operationType: "update",
+ updateDescription: {removedFields: [], updatedFields: {x: 10}},
+};
+checkNextChange(changeStreamCursor, expected);
+
+jsTestLog("Testing watch() with batchSize");
+// Only test mongod because mongos uses batch size 0 for aggregate commands internally to
+// establish cursors quickly. GetMore on mongos doesn't respect batch size due to SERVER-31992.
+const isMongos = FixtureHelpers.isMongos(db);
+if (!isMongos) {
+ // Increase a field by 5 times and verify the batch size is respected.
+ for (let i = 0; i < 5; i++) {
+ assert.writeOK(coll.update({_id: 1}, {$inc: {x: 1}}));
}
- jsTestLog("Testing watch() with maxAwaitTimeMS");
- changeStreamCursor = coll.watch([], {maxAwaitTimeMS: 500});
- testCommandIsCalled(() => assert(!changeStreamCursor.hasNext()), (cmdObj) => {
- assert.eq("getMore",
- Object.keys(cmdObj)[0],
- "expected getMore command, but was: " + tojson(cmdObj));
- assert(cmdObj.hasOwnProperty("maxTimeMS"), "unexpected getMore command: " + tojson(cmdObj));
- assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj));
+ // Only watch the "update" changes of the specific doc since the beginning.
+ changeStreamCursor = coll.watch([{$match: {documentKey: {_id: 1}, operationType: "update"}}],
+ {resumeAfter: resumeToken, batchSize: 2});
+
+ // Check the first batch.
+ assert.eq(changeStreamCursor.objsLeftInBatch(), 2);
+ // Consume the first batch.
+ assert(changeStreamCursor.hasNext());
+ changeStreamCursor.next();
+ assert(changeStreamCursor.hasNext());
+ changeStreamCursor.next();
+ // Confirm that the batch is empty.
+ assert.eq(changeStreamCursor.objsLeftInBatch(), 0);
+
+ // Check the batch returned by getMore.
+ assert(changeStreamCursor.hasNext());
+ assert.eq(changeStreamCursor.objsLeftInBatch(), 2);
+ changeStreamCursor.next();
+ assert(changeStreamCursor.hasNext());
+ changeStreamCursor.next();
+ assert.eq(changeStreamCursor.objsLeftInBatch(), 0);
+ // There are more changes coming, just not in the batch.
+ assert(changeStreamCursor.hasNext());
+}
+
+jsTestLog("Testing watch() with maxAwaitTimeMS");
+changeStreamCursor = coll.watch([], {maxAwaitTimeMS: 500});
+testCommandIsCalled(() => assert(!changeStreamCursor.hasNext()), (cmdObj) => {
+ assert.eq(
+ "getMore", Object.keys(cmdObj)[0], "expected getMore command, but was: " + tojson(cmdObj));
+ assert(cmdObj.hasOwnProperty("maxTimeMS"), "unexpected getMore command: " + tojson(cmdObj));
+ assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj));
+});
+
+jsTestLog("Testing the cursor gets closed when the collection gets dropped");
+changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]);
+assert.writeOK(coll.insert({_id: 2, x: 1}));
+expected = {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2, x: 1},
+ ns: {db: "test", coll: coll.getName()},
+ operationType: "insert",
+};
+checkNextChange(changeStreamCursor, expected);
+assert(!changeStreamCursor.hasNext());
+assert(!changeStreamCursor.isClosed());
+assert(!changeStreamCursor.isExhausted());
+
+// Dropping the collection should trigger a drop notification.
+assertDropCollection(db, coll.getName());
+assert.soon(() => changeStreamCursor.hasNext());
+assert(!changeStreamCursor.isExhausted());
+expected = {
+ operationType: "drop",
+ ns: {db: db.getName(), coll: coll.getName()}
+};
+checkNextChange(changeStreamCursor, expected);
+// For single collection change streams, the drop should invalidate the stream.
+const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"});
+
+if (invalidateDoc) {
+ jsTestLog("Testing using the 'startAfter' option from the invalidate entry");
+ assert.commandWorked(coll.insert({_id: "After drop"}));
+ let resumedFromInvalidate =
+ coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}});
+
+ // We should see the new insert after starting over. However, in sharded cluster
+ // passthroughs we may see more drop and invalidate notifications before we see the insert.
+ let firstChangeAfterDrop;
+ assert.soon(() => {
+ if (!resumedFromInvalidate.hasNext()) {
+ return false;
+ }
+ const next = resumedFromInvalidate.next();
+ if (next.operationType == "invalidate") {
+ // Start again!
+ resumedFromInvalidate =
+ coll.watch([], {startAfter: next._id, collation: {locale: "simple"}});
+ return false;
+ }
+ if (next.operationType == "drop") {
+ return false;
+ }
+ // THIS is the change we wanted.
+ firstChangeAfterDrop = next;
+ return true;
});
- jsTestLog("Testing the cursor gets closed when the collection gets dropped");
- changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]);
- assert.writeOK(coll.insert({_id: 2, x: 1}));
- expected = {
- documentKey: {_id: 2},
- fullDocument: {_id: 2, x: 1},
- ns: {db: "test", coll: coll.getName()},
- operationType: "insert",
- };
- checkNextChange(changeStreamCursor, expected);
- assert(!changeStreamCursor.hasNext());
- assert(!changeStreamCursor.isClosed());
- assert(!changeStreamCursor.isExhausted());
-
- // Dropping the collection should trigger a drop notification.
- assertDropCollection(db, coll.getName());
- assert.soon(() => changeStreamCursor.hasNext());
- assert(!changeStreamCursor.isExhausted());
- expected = {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}};
- checkNextChange(changeStreamCursor, expected);
- // For single collection change streams, the drop should invalidate the stream.
- const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"});
-
- if (invalidateDoc) {
- jsTestLog("Testing using the 'startAfter' option from the invalidate entry");
- assert.commandWorked(coll.insert({_id: "After drop"}));
- let resumedFromInvalidate =
- coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}});
-
- // We should see the new insert after starting over. However, in sharded cluster
- // passthroughs we may see more drop and invalidate notifications before we see the insert.
- let firstChangeAfterDrop;
- assert.soon(() => {
- if (!resumedFromInvalidate.hasNext()) {
- return false;
- }
- const next = resumedFromInvalidate.next();
- if (next.operationType == "invalidate") {
- // Start again!
- resumedFromInvalidate =
- coll.watch([], {startAfter: next._id, collation: {locale: "simple"}});
- return false;
- }
- if (next.operationType == "drop") {
- return false;
- }
- // THIS is the change we wanted.
- firstChangeAfterDrop = next;
- return true;
- });
-
- assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change));
- }
+ assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change));
+}
}());