diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-18 17:27:43 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-27 09:38:59 -0400 |
commit | bbe67b5bdd596f4720b19f9f4c8c38cfadb9c1dd (patch) | |
tree | 594dba7d8898c73ab488137e20112cc99a2d5dda /jstests/change_streams | |
parent | 752069dbc79f22b6ae4691073d455d76c9bbf18d (diff) | |
download | mongo-bbe67b5bdd596f4720b19f9f4c8c38cfadb9c1dd.tar.gz |
SERVER-34789: Using resume token from 'invalidate' notification for 'resumeAfter' should error
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/metadata_notifications.js | 78 | ||||
-rw-r--r-- | jstests/change_streams/whole_db_resumability.js | 43 |
2 files changed, 62 insertions, 59 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js index f1d4373606a..be892c114da 100644 --- a/jstests/change_streams/metadata_notifications.js +++ b/jstests/change_streams/metadata_notifications.js @@ -70,7 +70,7 @@ {operationType: "drop"}, {operationType: "invalidate"}, ]; - const changes = cst.assertNextChangesEqual( + let changes = cst.assertNextChangesEqual( {cursor: cursor, expectedChanges: expectedChanges, expectInvalidate: true}); const resumeToken = changes[0]._id; const resumeTokenDrop = changes[3]._id; @@ -89,33 +89,36 @@ coll = assertCreateCollection(db, collName); assert.writeOK(coll.insert({_id: "after recreate"})); - // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to - // be sure that it doesn't crash the server, but the ability to resume a change stream using - // 'resumeAfter' with a resume token from an invalidate is a bug, not a feature. - // Test resuming the change stream from the collection drop using 'resumeAfter'. - expectedChanges = [{ - operationType: "insert", - ns: {db: db.getName(), coll: coll.getName()}, - fullDocument: {_id: "after recreate"}, - documentKey: {_id: "after recreate"} - }]; - assertResumeExpected( - {coll: coll.getName(), spec: {resumeAfter: resumeTokenDrop}, expected: expectedChanges}); - - // Test resuming the change stream from the invalidate after the drop using 'resumeAfter'. assertResumeExpected({ coll: coll.getName(), - spec: {resumeAfter: resumeTokenInvalidate}, - expected: expectedChanges + spec: {resumeAfter: resumeTokenDrop}, + expected: [{operationType: "invalidate"}] }); + // Test resuming the change stream from the invalidate after the drop using 'resumeAfter'. + assert.commandFailedWithCode(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}], + cursor: {}, + collation: {locale: "simple"}, + }), + ErrorCodes.InvalidResumeToken); + // Test resuming the change stream from the collection drop using 'startAfter'. - assertResumeExpected( - {coll: coll.getName(), spec: {startAfter: resumeTokenDrop}, expected: expectedChanges}); + assertResumeExpected({ + coll: coll.getName(), + spec: {startAfter: resumeTokenDrop}, + expected: [{operationType: "invalidate"}] + }); - // Test resuming the change stream from the 'invalidate' notification using 'startAfter'. This - // is expected to behave identical to resuming from the drop. + // Test resuming the change stream from the 'invalidate' notification using 'startAfter'. + expectedChanges = [{ + operationType: "insert", + ns: {db: db.getName(), coll: coll.getName()}, + fullDocument: {_id: "after recreate"}, + documentKey: {_id: "after recreate"} + }]; assertResumeExpected({ coll: coll.getName(), spec: {startAfter: resumeTokenInvalidate}, @@ -161,36 +164,35 @@ coll = db[collName]; assert.writeOK(coll.insert({_id: "after rename"})); - // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here - // to be sure that it doesn't crash the server, but the ability to resume a change stream - // after an invalidate using 'resumeAfter' is a bug, not a feature. - // Test resuming the change stream from the collection rename using 'resumeAfter'. - expectedChanges = [{ - operationType: "insert", - ns: {db: db.getName(), coll: coll.getName()}, - fullDocument: {_id: "after rename"}, - documentKey: {_id: "after rename"} - }]; assertResumeExpected({ coll: coll.getName(), spec: {resumeAfter: resumeTokenRename}, - expected: expectedChanges + expected: [{operationType: "invalidate"}] }); // Test resuming the change stream from the invalidate after the rename using 'resumeAfter'. - assertResumeExpected({ - coll: coll.getName(), - spec: {resumeAfter: resumeTokenInvalidate}, - expected: expectedChanges - }); + assert.commandFailedWithCode(db.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}], + cursor: {}, + collation: {locale: "simple"}, + }), + ErrorCodes.InvalidResumeToken); // Test resuming the change stream from the rename using 'startAfter'. assertResumeExpected({ coll: coll.getName(), spec: {startAfter: resumeTokenRename}, - expected: expectedChanges + expected: [{operationType: "invalidate"}] }); + // Test resuming the change stream from the invalidate after the rename using 'startAfter'. + expectedChanges = [{ + operationType: "insert", + ns: {db: db.getName(), coll: coll.getName()}, + fullDocument: {_id: "after rename"}, + documentKey: {_id: "after rename"} + }]; assertResumeExpected({ coll: coll.getName(), spec: {startAfter: resumeTokenInvalidate}, diff --git a/jstests/change_streams/whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js index ba311400c87..c34d4067af2 100644 --- a/jstests/change_streams/whole_db_resumability.js +++ b/jstests/change_streams/whole_db_resumability.js @@ -131,6 +131,9 @@ ]; const dropDbChanges = cst.assertNextChangesEqual( {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop}); + const resumeTokenDrop = dropDbChanges[0]._id; + const resumeTokenDropDb = dropDbChanges[1]._id; + const resumeTokenInvalidate = dropDbChanges[2]._id; // Resume from the first collection drop. resumeCursor = cst.startWatchingChanges({ @@ -142,7 +145,7 @@ // Resume from the second collection drop using 'resumeAfter'. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: dropDbChanges[0]._id}}], + pipeline: [{$changeStream: {resumeAfter: resumeTokenDrop}}], collection: 1, }); cst.assertNextChangesEqual( @@ -150,55 +153,53 @@ // Resume from the second collection drop using 'startAfter'. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {startAfter: dropDbChanges[0]._id}}], + pipeline: [{$changeStream: {startAfter: resumeTokenDrop}}], collection: 1, }); cst.assertNextChangesEqual( {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop.slice(1)}); // Recreate the test collection. - coll = assertCreateCollection(testDB, coll.getName()); assert.writeOK(coll.insert({_id: "after recreate"})); - let expectedInsert = { + let expectedInsert = [{ operationType: "insert", - ns: {db: db.getName(), coll: coll.getName()}, + ns: {db: testDB.getName(), coll: coll.getName()}, fullDocument: {_id: "after recreate"}, documentKey: {_id: "after recreate"} - }; - - // TODO SERVER-34789: The code below should throw an error. We exercise this behavior here to - // be sure that it doesn't crash the server, but the ability to resume a change stream using - // 'resumeAfter' with a resume token from an invalidate is a bug, not a feature. + }]; // Test resuming from the 'dropDatabase' entry using 'resumeAfter'. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: dropDbChanges[1]._id}}], + pipeline: [{$changeStream: {resumeAfter: resumeTokenDropDb}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); - cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert}); + cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]}); // Test resuming from the 'invalidate' entry using 'resumeAfter'. - resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}], - collection: 1, - aggregateOptions: {cursor: {batchSize: 0}}, - }); - cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert}); + assert.commandFailedWithCode(db.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}], + cursor: {}, + collation: {locale: "simple"}, + }), + ErrorCodes.InvalidResumeToken); // Test resuming from the 'dropDatabase' entry using 'startAfter'. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {startAfter: dropDbChanges[1]._id}}], + pipeline: [{$changeStream: {startAfter: resumeTokenDropDb}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); - cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert}); + cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]}); // Test resuming from the 'invalidate' entry using 'startAfter' and verifies it picks up the // insert after recreating the db/collection. resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}], + pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); |