summaryrefslogtreecommitdiff
path: root/jstests/change_streams/shell_helper.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams/shell_helper.js')
-rw-r--r--jstests/change_streams/shell_helper.js40
1 files changed, 38 insertions, 2 deletions
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js
index e833c053013..d440603b956 100644
--- a/jstests/change_streams/shell_helper.js
+++ b/jstests/change_streams/shell_helper.js
@@ -85,6 +85,11 @@
coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
checkNextChange(changeStreamCursor, {docId: 1});
+ jsTestLog("Testing watch() with pipeline and startAfter");
+ changeStreamCursor =
+ coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAfter: resumeToken});
+ checkNextChange(changeStreamCursor, {docId: 1});
+
jsTestLog("Testing watch() with pipeline and startAtOperationTime");
changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
{startAtOperationTime: resumeTime});
@@ -150,7 +155,7 @@
});
jsTestLog("Testing the cursor gets closed when the collection gets dropped");
- changeStreamCursor = coll.watch([{$project: {_id: 0, clusterTime: 0}}]);
+ changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]);
assert.writeOK(coll.insert({_id: 2, x: 1}));
expected = {
documentKey: {_id: 2},
@@ -170,5 +175,36 @@
expected = {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}};
checkNextChange(changeStreamCursor, expected);
// For single collection change streams, the drop should invalidate the stream.
- assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"});
+ const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"});
+
+ if (invalidateDoc) {
+ jsTestLog("Testing using the 'startAfter' option from the invalidate entry");
+ assert.commandWorked(coll.insert({_id: "After drop"}));
+ let resumedFromInvalidate =
+ coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}});
+
+ // We should see the new insert after starting over. However, in sharded cluster
+ // passthroughs we may see more drop and invalidate notifications before we see the insert.
+ let firstChangeAfterDrop;
+ assert.soon(() => {
+ if (!resumedFromInvalidate.hasNext()) {
+ return false;
+ }
+ const next = resumedFromInvalidate.next();
+ if (next.operationType == "invalidate") {
+ // Start again!
+ resumedFromInvalidate =
+ coll.watch([], {startAfter: next._id, collation: {locale: "simple"}});
+ return false;
+ }
+ if (next.operationType == "drop") {
+ return false;
+ }
+ // THIS is the change we wanted.
+ firstChangeAfterDrop = next;
+ return true;
+ });
+
+ assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change));
+ }
}());