diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-06-29 12:39:29 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-07-03 13:51:37 -0400 |
commit | c457b59f9599c61144ceee50b5190aec8f8b888b (patch) | |
tree | a961111959966fd10abbd6fa7a1dfa203871d81e | |
parent | f4ad4f148a0ac37957bc854aa41c9ed53deceed9 (diff) | |
download | mongo-c457b59f9599c61144ceee50b5190aec8f8b888b.tar.gz |
SERVER-35917 Blacklist resumeAfter from sharded change streams suite
6 files changed, 172 insertions, 138 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml index 827f88fe878..1c305f62970 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml @@ -20,6 +20,9 @@ selector: # "Cowardly refusing to run test with overridden write concern when it uses a command that can # only perform w=1 writes: ..." - requires_eval_command + # SERVER-32088 Resuming a change stream may not work if not all shards have chunks for the + # collection, which can happen if the initial sharding of the collection has a failed migration. + - uses_resume_after executor: archive: diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 59e8df872f1..7570ecca52f 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -156,52 +156,5 @@ db.getMongo().forceReadMode('commands'); } - jsTestLog("Testing resumability"); - assertDropAndRecreateCollection(db, "resume1"); - - // Note we do not project away 'id.ts' as it is part of the resume token. - let resumeCursor = cst.startWatchingChanges( - {pipeline: [{$changeStream: {}}], collection: db.resume1, includeToken: true}); - - // Insert a document and save the resulting change stream. - assert.writeOK(db.resume1.insert({_id: 1})); - const firstInsertChangeDoc = cst.getOneChange(resumeCursor); - assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1}); - - jsTestLog("Testing resume after one document."); - resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], - collection: db.resume1, - includeToken: true, - aggregateOptions: {cursor: {batchSize: 0}}, - }); - - jsTestLog("Inserting additional documents."); - assert.writeOK(db.resume1.insert({_id: 2})); - const secondInsertChangeDoc = cst.getOneChange(resumeCursor); - assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2}); - assert.writeOK(db.resume1.insert({_id: 3})); - const thirdInsertChangeDoc = cst.getOneChange(resumeCursor); - assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3}); - - jsTestLog("Testing resume after first document of three."); - resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], - collection: db.resume1, - includeToken: true, - aggregateOptions: {cursor: {batchSize: 0}}, - }); - assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc); - assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); - - jsTestLog("Testing resume after second document of three."); - resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}], - collection: db.resume1, - includeToken: true, - aggregateOptions: {cursor: {batchSize: 0}}, - }); - assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); - cst.cleanUp(); }()); diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js index 58d20645ae1..949366fa04e 100644 --- a/jstests/change_streams/change_stream_shell_helper.js +++ b/jstests/change_streams/change_stream_shell_helper.js @@ -1,5 +1,5 @@ // Test DBCollection.watch() shell helper and its options. - +// @tags: [uses_resume_after] (function() { "use strict"; diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index bbf2b8027c7..ce22137f09c 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -116,92 +116,6 @@ assert.eq(latestChange.operationType, "update"); assert(latestChange.hasOwnProperty("fullDocument")); assert.eq(latestChange.fullDocument, null); - const deleteDocResumePoint = latestChange._id; - - // Test that looking up the post image of an update after the collection has been dropped will - // result in 'fullDocument' with a value of null. This must be done using getMore because new - // cursors cannot be established after a collection drop. - assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); - assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}})); - - // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet. - cursor = cst.startWatchingChanges({ - collection: coll, - pipeline: [ - {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, - {$match: {operationType: {$ne: "delete"}}} - ], - aggregateOptions: {cursor: {batchSize: 0}} - }); - - // Save another stream to test post-image lookup after the collection is recreated. - let cursorBeforeDrop = cst.startWatchingChanges({ - collection: coll, - pipeline: [ - {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, - {$match: {operationType: {$ne: "delete"}}} - ], - aggregateOptions: {cursor: {batchSize: 0}} - }); - - // Retrieve the 'insert' operation from the latter stream. This is necessary on a sharded - // collection so that the documentKey is retrieved before the collection is recreated; - // otherwise, per SERVER-31691, a uassert will occur. - // TODO SERVER-31847: all remaining operations on the old UUID should be visible even if we have - // not retrieved the first oplog entry before the collection is recreated. - latestChange = cst.getOneChange(cursorBeforeDrop); - assert.eq(latestChange.operationType, "insert"); - assert(latestChange.hasOwnProperty("fullDocument")); - assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"}); - - // Drop the collection and wait until two-phase drop finishes. - assertDropCollection(db, coll.getName()); - assert.soon(function() { - return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName()); - }); - // If this test is running with secondary read preference, it's necessary for the drop - // to propagate to all secondary nodes and be available for majority reads before we can - // assume looking up the document will fail. - FixtureHelpers.awaitLastOpCommitted(); - - // Check the next $changeStream entry; this is the test document inserted above. - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "insert"); - assert(latestChange.hasOwnProperty("fullDocument")); - assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"}); - - // The next entry is the 'update' operation. Because the collection has been dropped, our - // attempt to look up the post-image results in a null document. - latestChange = cst.getOneChange(cursor); - assert.eq(latestChange.operationType, "update"); - assert(latestChange.hasOwnProperty("fullDocument")); - assert.eq(latestChange.fullDocument, null); - - // Test establishing new cursors with resume token on dropped collections fails. - let res = db.runCommand({ - aggregate: coll.getName(), - pipeline: [ - {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, - {$match: {operationType: "update"}} - ], - cursor: {batchSize: 0} - }); - assert.commandFailedWithCode(res, 40615); - - // Test that looking up the post image of an update after the collection has been dropped and - // created again will result in 'fullDocument' with a value of null. This must be done using - // getMore because new cursors cannot be established after a collection drop. - - // Insert a document with the same _id, verify the change stream won't return it due to - // different UUID. - assertCreateCollection(db, coll.getName()); - assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); - - // Confirm that the next entry's post-image is null since new collection has a different UUID. - latestChange = cst.getOneChange(cursorBeforeDrop); - assert.eq(latestChange.operationType, "update"); - assert(latestChange.hasOwnProperty("fullDocument")); - assert.eq(latestChange.fullDocument, null); // Test that invalidate entries don't have 'fullDocument' even if 'updateLookup' is specified. const collInvalidate = assertDropAndRecreateCollection(db, "collInvalidate"); @@ -213,10 +127,9 @@ assert.writeOK(collInvalidate.insert({_id: "testing invalidate"})); assertDropCollection(db, collInvalidate.getName()); // Wait until two-phase drop finishes. - assert.soon(function() { - return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( - db, collInvalidate.getName()); - }); + assert.soon(() => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase( + db, collInvalidate.getName())); + latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "insert"); latestChange = cst.getOneChange(cursor, true); diff --git a/jstests/change_streams/lookup_post_image_resume_after.js b/jstests/change_streams/lookup_post_image_resume_after.js new file mode 100644 index 00000000000..b2d8e345679 --- /dev/null +++ b/jstests/change_streams/lookup_post_image_resume_after.js @@ -0,0 +1,108 @@ +// Tests the behavior of using fullDocument: "updateLookup" with a resumeToken, possibly from far +// enough in the past that the document doesn't exist yet. +// @tags: [uses_resume_after] +(function() { + "use strict"; + + load("jstests/libs/change_stream_util.js"); + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/fixture_helpers.js"); // For awaitLastOpCommitted(). + load("jstests/replsets/libs/two_phase_drops.js"); // For 'TwoPhaseDropCollectionTest'. + + let cst = new ChangeStreamTest(db); + const coll = assertDropAndRecreateCollection(db, "post_image_resume_after"); + const streamToGetResumeToken = cst.startWatchingChanges({ + collection: coll, + pipeline: [{$changeStream: {}}], + includeToken: true, + aggregateOptions: {cursor: {batchSize: 0}} + }); + assert.writeOK(coll.insert({_id: "for resuming later"})); + const resumePoint = cst.getOneChange(streamToGetResumeToken)._id; + + // Test that looking up the post image of an update after the collection has been dropped will + // result in 'fullDocument' with a value of null. This must be done using getMore because new + // cursors cannot be established after a collection drop. + assert.writeOK(coll.insert({_id: "TARGET"})); + assert.writeOK(coll.update({_id: "TARGET"}, {$set: {updated: true}})); + + // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet. + const firstResumeCursor = cst.startWatchingChanges({ + collection: coll, + pipeline: [{$changeStream: {fullDocument: "updateLookup", resumeAfter: resumePoint}}], + aggregateOptions: {cursor: {batchSize: 0}} + }); + + // Save another stream to test post-image lookup after the collection is recreated. We have to + // create this before we drop the collection because otherwise the resume token's UUID will not + // match that of the collection. + const secondResumeCursor = cst.startWatchingChanges({ + collection: coll, + pipeline: [{$changeStream: {fullDocument: "updateLookup", resumeAfter: resumePoint}}], + aggregateOptions: {cursor: {batchSize: 0}} + }); + + // Drop the collection and wait until two-phase drop finishes. + assertDropCollection(db, coll.getName()); + assert.soon( + () => !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName())); + + // If this test is running with secondary read preference, it's necessary for the drop to + // propagate to all secondary nodes and be available for majority reads before we can assume + // looking up the document will fail. + FixtureHelpers.awaitLastOpCommitted(); + + // Check the next $changeStream entry; this is the test document inserted above. + let latestChange = cst.getOneChange(firstResumeCursor); + assert.eq(latestChange.operationType, "insert"); + assert.eq(latestChange.fullDocument, {_id: "TARGET"}); + + // The next entry is the 'update' operation. Because the collection has been dropped, our + // attempt to look up the post-image results in a null document. + latestChange = cst.getOneChange(firstResumeCursor); + assert.eq(latestChange.operationType, "update"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, null); + + // Test establishing new cursors with resume token on dropped collections fails. + assert.commandFailedWithCode(db.runCommand({ + aggregate: coll.getName(), + pipeline: [ + {$changeStream: {fullDocument: "updateLookup", resumeAfter: resumePoint}}, + {$match: {operationType: "update"}} + ], + cursor: {batchSize: 0} + }), + 40615); + + // Before we re-create the collection we must consume the insert notification. This is to + // prevent the change stream from throwing an assertion when it goes to look up the shard key + // for the collection and finds that it has a mismatching UUID. It should proceed without error + // if the collection doesn't exist (has no UUID). + cst.assertNextChangesEqual({ + cursor: secondResumeCursor, + expectedChanges: [{ + documentKey: {_id: "TARGET"}, + fullDocument: {_id: "TARGET"}, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "insert" + }] + }); + + // Recreate the collection, insert a document with the same _id, and test that the change stream + // won't return it because the collection will now have a different UUID. + assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); + + cst.assertNextChangesEqual({ + cursor: secondResumeCursor, + expectedChanges: [{ + documentKey: {_id: "TARGET"}, + fullDocument: null, + ns: {db: db.getName(), coll: coll.getName()}, + operationType: "update", + updateDescription: {updatedFields: {updated: true}, removedFields: []} + }] + }); + + cst.cleanUp(); +}()); diff --git a/jstests/change_streams/resume_after.js b/jstests/change_streams/resume_after.js new file mode 100644 index 00000000000..41a9f665969 --- /dev/null +++ b/jstests/change_streams/resume_after.js @@ -0,0 +1,57 @@ +// Tests the ability to resume a change stream at different points in the stream. +// @tags: [uses_resume_after] +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/change_stream_util.js"); + + let cst = new ChangeStreamTest(db); + assertDropAndRecreateCollection(db, "resume_after"); + + // Note we do not project away 'id.ts' as it is part of the resume token. + let resumeCursor = cst.startWatchingChanges( + {pipeline: [{$changeStream: {}}], collection: db.resume_after, includeToken: true}); + + // Insert a document and save the resulting change stream. + assert.writeOK(db.resume_after.insert({_id: 1})); + const firstInsertChangeDoc = cst.getOneChange(resumeCursor); + assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1}); + + jsTestLog("Testing resume after one document."); + resumeCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], + collection: db.resume_after, + includeToken: true, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + + jsTestLog("Inserting additional documents."); + assert.writeOK(db.resume_after.insert({_id: 2})); + const secondInsertChangeDoc = cst.getOneChange(resumeCursor); + assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2}); + assert.writeOK(db.resume_after.insert({_id: 3})); + const thirdInsertChangeDoc = cst.getOneChange(resumeCursor); + assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3}); + + jsTestLog("Testing resume after first document of three."); + resumeCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], + collection: db.resume_after, + includeToken: true, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc); + assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); + + jsTestLog("Testing resume after second document of three."); + resumeCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}], + collection: db.resume_after, + includeToken: true, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); + + cst.cleanUp(); +}()); |