From c727aeae4dbcafd8c33b607e8d21e2d3ae14c816 Mon Sep 17 00:00:00 2001 From: Charlie Swanson Date: Mon, 27 Aug 2018 18:11:43 -0400 Subject: SERVER-36899 Accept new 'startAfter' option in watch() helpers --- jstests/change_streams/shell_helper.js | 40 ++++++++++++++++++++++++++++++++-- 1 file changed, 38 insertions(+), 2 deletions(-) (limited to 'jstests/change_streams') diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js index e833c053013..d440603b956 100644 --- a/jstests/change_streams/shell_helper.js +++ b/jstests/change_streams/shell_helper.js @@ -85,6 +85,11 @@ coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); checkNextChange(changeStreamCursor, {docId: 1}); + jsTestLog("Testing watch() with pipeline and startAfter"); + changeStreamCursor = + coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAfter: resumeToken}); + checkNextChange(changeStreamCursor, {docId: 1}); + jsTestLog("Testing watch() with pipeline and startAtOperationTime"); changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime}); @@ -150,7 +155,7 @@ }); jsTestLog("Testing the cursor gets closed when the collection gets dropped"); - changeStreamCursor = coll.watch([{$project: {_id: 0, clusterTime: 0}}]); + changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]); assert.writeOK(coll.insert({_id: 2, x: 1})); expected = { documentKey: {_id: 2}, @@ -170,5 +175,36 @@ expected = {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}; checkNextChange(changeStreamCursor, expected); // For single collection change streams, the drop should invalidate the stream. - assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"}); + const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"}); + + if (invalidateDoc) { + jsTestLog("Testing using the 'startAfter' option from the invalidate entry"); + assert.commandWorked(coll.insert({_id: "After drop"})); + let resumedFromInvalidate = + coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}}); + + // We should see the new insert after starting over. However, in sharded cluster + // passthroughs we may see more drop and invalidate notifications before we see the insert. + let firstChangeAfterDrop; + assert.soon(() => { + if (!resumedFromInvalidate.hasNext()) { + return false; + } + const next = resumedFromInvalidate.next(); + if (next.operationType == "invalidate") { + // Start again! + resumedFromInvalidate = + coll.watch([], {startAfter: next._id, collation: {locale: "simple"}}); + return false; + } + if (next.operationType == "drop") { + return false; + } + // THIS is the change we wanted. + firstChangeAfterDrop = next; + return true; + }); + + assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change)); + } }()); -- cgit v1.2.1