diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-30 13:28:25 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-20 11:34:41 -0400 |
commit | 43f8fc8c45b854c192f39368f843352c479008f5 (patch) | |
tree | c7010e3111ddb8fcfc290d26d767fca7d7b95ec7 /jstests/change_streams/metadata_notifications.js | |
parent | b4056d66c4a563ce0d3afda7720dde3c3cd01e05 (diff) | |
download | mongo-43f8fc8c45b854c192f39368f843352c479008f5.tar.gz |
SERVER-35030: Add 'startAfter' option to the $changeStream stage
Diffstat (limited to 'jstests/change_streams/metadata_notifications.js')
-rw-r--r-- | jstests/change_streams/metadata_notifications.js | 133 |
1 files changed, 87 insertions, 46 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js index ef17b2b4249..f1d4373606a 100644 --- a/jstests/change_streams/metadata_notifications.js +++ b/jstests/change_streams/metadata_notifications.js @@ -19,6 +19,17 @@ const collName = "test"; assertDropCollection(db, collName); + // Asserts that resuming a change stream with 'spec' and an explicit simple collation returns + // the results specified by 'expected'. + function assertResumeExpected({coll, spec, expected}) { + const cursor = cst.startWatchingChanges({ + collection: coll, + pipeline: [{$changeStream: spec}], + aggregateOptions: {collation: {locale: "simple"}} + }); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + } + // Cursor creation succeeds, but there are no results. We do not expect to see a notification // for collection creation. let cursor = cst.startWatchingChanges( @@ -50,14 +61,10 @@ assert.writeOK(coll.remove({_id: 1})); assertDropCollection(db, coll.getName()); - // Get a valid resume token that the next aggregate command can use. - change = cst.getOneChange(cursor); - assert.eq(change.operationType, "insert"); - const resumeToken = change._id; - - // We should get 4 oplog entries of type update, delete, drop, and invalidate. The cursor should - // be closed. + // We should get oplog entries of type insert, update, delete, drop, and invalidate. The cursor + // should be closed. let expectedChanges = [ + {operationType: "insert"}, {operationType: "update"}, {operationType: "delete"}, {operationType: "drop"}, @@ -65,6 +72,9 @@ ]; const changes = cst.assertNextChangesEqual( {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true}); + const resumeToken = changes[0]._id; + const resumeTokenDrop = changes[3]._id; + const resumeTokenInvalidate = changes[4]._id; // It should not be possible to resume a change stream after a collection drop without an // explicit collation, even if the invalidate has not been received. @@ -77,31 +87,39 @@ // Recreate the collection. coll = assertCreateCollection(db, collName); - assert.writeOK(coll.insert({_id: 0})); + assert.writeOK(coll.insert({_id: "after recreate"})); // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to - // be sure that it doesn't crash the server, but the ability to resume a change stream after an - // invalidate is a bug, not a feature. - - // Test resuming the change stream from the collection drop. - assert.doesNotThrow(function() { - const resumeTokenDrop = changes[2]._id; - const resumeCursor = - coll.watch([], {resumeAfter: resumeTokenDrop, collation: {locale: "simple"}}); - assert.soon(() => resumeCursor.hasNext()); - // Not checking the contents of the document returned, because we do not technically - // support this behavior. - resumeCursor.next(); + // be sure that it doesn't crash the server, but the ability to resume a change stream using + // 'resumeAfter' with a resume token from an invalidate is a bug, not a feature. + + // Test resuming the change stream from the collection drop using 'resumeAfter'. + expectedChanges = [{ + operationType: "insert", + ns: {db: db.getName(), coll: coll.getName()}, + fullDocument: {_id: "after recreate"}, + documentKey: {_id: "after recreate"} + }]; + assertResumeExpected( + {coll: coll.getName(), spec: {resumeAfter: resumeTokenDrop}, expected: expectedChanges}); + + // Test resuming the change stream from the invalidate after the drop using 'resumeAfter'. + assertResumeExpected({ + coll: coll.getName(), + spec: {resumeAfter: resumeTokenInvalidate}, + expected: expectedChanges }); - // Test resuming the change stream from the invalidate after the drop. - assert.doesNotThrow(function() { - const resumeTokenInvalidate = changes[3]._id; - const resumeCursor = - coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}}); - assert.soon(() => resumeCursor.hasNext()); - // Not checking the contents of the document returned, because we do not technically - // support this behavior. - resumeCursor.next(); + + // Test resuming the change stream from the collection drop using 'startAfter'. + assertResumeExpected( + {coll: coll.getName(), spec: {startAfter: resumeTokenDrop}, expected: expectedChanges}); + + // Test resuming the change stream from the 'invalidate' notification using 'startAfter'. This + // is expected to behave identical to resuming from the drop. + assertResumeExpected({ + coll: coll.getName(), + spec: {startAfter: resumeTokenInvalidate}, + expected: expectedChanges }); // Test that renaming a collection being watched generates a "rename" entry followed by an @@ -111,7 +129,7 @@ cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]}); assertDropCollection(db, "renamed_coll"); assert.writeOK(coll.renameCollection("renamed_coll")); - let expected = [ + expectedChanges = [ { operationType: "rename", ns: {db: db.getName(), coll: collName}, @@ -120,14 +138,14 @@ {operationType: "invalidate"} ]; cst.assertNextChangesEqual( - {cursor: cursor, expectedChanges: expected, expectInvalidate: true}); + {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true}); coll = db["renamed_coll"]; // Repeat the test, this time with a change stream open on the target. cursor = cst.startWatchingChanges({collection: collName, pipeline: [{$changeStream: {}}]}); assert.writeOK(coll.renameCollection(collName)); - expected = [ + expectedChanges = [ { operationType: "rename", ns: {db: db.getName(), coll: "renamed_coll"}, @@ -135,25 +153,48 @@ }, {operationType: "invalidate"} ]; - const changes = cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + const changes = + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expectedChanges}); + const resumeTokenRename = changes[0]._id; + const resumeTokenInvalidate = changes[1]._id; coll = db[collName]; + assert.writeOK(coll.insert({_id: "after rename"})); // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here // to be sure that it doesn't crash the server, but the ability to resume a change stream - // after an invalidate is a bug, not a feature. + // after an invalidate using 'resumeAfter' is a bug, not a feature. + + // Test resuming the change stream from the collection rename using 'resumeAfter'. + expectedChanges = [{ + operationType: "insert", + ns: {db: db.getName(), coll: coll.getName()}, + fullDocument: {_id: "after rename"}, + documentKey: {_id: "after rename"} + }]; + assertResumeExpected({ + coll: coll.getName(), + spec: {resumeAfter: resumeTokenRename}, + expected: expectedChanges + }); + // Test resuming the change stream from the invalidate after the rename using 'resumeAfter'. + assertResumeExpected({ + coll: coll.getName(), + spec: {resumeAfter: resumeTokenInvalidate}, + expected: expectedChanges + }); - // Test resuming the change stream from the collection rename. - assert.doesNotThrow(function() { - const resumeTokenRename = changes[0]._id; - const resumeCursor = - coll.watch([], {resumeAfter: resumeTokenRename, collation: {locale: "simple"}}); + // Test resuming the change stream from the rename using 'startAfter'. + assertResumeExpected({ + coll: coll.getName(), + spec: {startAfter: resumeTokenRename}, + expected: expectedChanges }); - // Test resuming the change stream from the invalidate after the drop. - assert.doesNotThrow(function() { - const resumeTokenInvalidate = changes[1]._id; - const resumeCursor = - coll.watch([], {resumeAfter: resumeTokenInvalidate, collation: {locale: "simple"}}); + // Test resuming the change stream from the invalidate after the rename using 'startAfter'. + assertResumeExpected({ + coll: coll.getName(), + spec: {startAfter: resumeTokenInvalidate}, + expected: expectedChanges }); assertDropAndRecreateCollection(db, "renamed_coll"); @@ -164,7 +205,7 @@ cursor = cst.startWatchingChanges({collection: "renamed_coll", pipeline: [{$changeStream: {}}]}); assert.writeOK(coll.renameCollection("renamed_coll", true /* dropTarget */)); - expected = [ + expectedChanges = [ { operationType: "rename", ns: {db: db.getName(), coll: collName}, @@ -173,7 +214,7 @@ {operationType: "invalidate"} ]; cst.assertNextChangesEqual( - {cursor: cursor, expectedChanges: expected, expectInvalidate: true}); + {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true}); coll = db["renamed_coll"]; |