diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-08-27 18:11:43 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-08-28 17:21:00 -0400 |
commit | c727aeae4dbcafd8c33b607e8d21e2d3ae14c816 (patch) | |
tree | 55026a914aa945e62f153c37b593a8d919f7c953 /jstests/change_streams | |
parent | 1ee551653a1ff9c154d94e59c429529eb3ea9098 (diff) | |
download | mongo-c727aeae4dbcafd8c33b607e8d21e2d3ae14c816.tar.gz |
SERVER-36899 Accept new 'startAfter' option in watch() helpers
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/shell_helper.js | 40 |
1 files changed, 38 insertions, 2 deletions
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js index e833c053013..d440603b956 100644 --- a/jstests/change_streams/shell_helper.js +++ b/jstests/change_streams/shell_helper.js @@ -85,6 +85,11 @@ coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); checkNextChange(changeStreamCursor, {docId: 1}); + jsTestLog("Testing watch() with pipeline and startAfter"); + changeStreamCursor = + coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAfter: resumeToken}); + checkNextChange(changeStreamCursor, {docId: 1}); + jsTestLog("Testing watch() with pipeline and startAtOperationTime"); changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime}); @@ -150,7 +155,7 @@ }); jsTestLog("Testing the cursor gets closed when the collection gets dropped"); - changeStreamCursor = coll.watch([{$project: {_id: 0, clusterTime: 0}}]); + changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]); assert.writeOK(coll.insert({_id: 2, x: 1})); expected = { documentKey: {_id: 2}, @@ -170,5 +175,36 @@ expected = {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}; checkNextChange(changeStreamCursor, expected); // For single collection change streams, the drop should invalidate the stream. - assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"}); + const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"}); + + if (invalidateDoc) { + jsTestLog("Testing using the 'startAfter' option from the invalidate entry"); + assert.commandWorked(coll.insert({_id: "After drop"})); + let resumedFromInvalidate = + coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}}); + + // We should see the new insert after starting over. However, in sharded cluster + // passthroughs we may see more drop and invalidate notifications before we see the insert. + let firstChangeAfterDrop; + assert.soon(() => { + if (!resumedFromInvalidate.hasNext()) { + return false; + } + const next = resumedFromInvalidate.next(); + if (next.operationType == "invalidate") { + // Start again! + resumedFromInvalidate = + coll.watch([], {startAfter: next._id, collation: {locale: "simple"}}); + return false; + } + if (next.operationType == "drop") { + return false; + } + // THIS is the change we wanted. + firstChangeAfterDrop = next; + return true; + }); + + assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change)); + } }()); |