diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-10 10:04:41 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-05-29 09:51:51 -0400 |
commit | a76082905d63ac8aaaae25e5c76812e6edf9bc07 (patch) | |
tree | 5882b480ff3b7378a30f27106cd8fad6e8271407 /jstests/change_streams | |
parent | 8150b50f14579b6cbd673f12968726670f6e1b78 (diff) | |
download | mongo-a76082905d63ac8aaaae25e5c76812e6edf9bc07.tar.gz |
SERVER-32088: ChangeStream resumeAfter does not work on sharded collections if not all shards have chunks for the collection
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/change_stream_collation.js | 112 | ||||
-rw-r--r-- | jstests/change_streams/change_stream_invalidation.js | 113 | ||||
-rw-r--r-- | jstests/change_streams/change_stream_rename_resumability.js | 7 | ||||
-rw-r--r-- | jstests/change_streams/lookup_post_image.js | 24 |
4 files changed, 211 insertions, 45 deletions
diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js index ceb1eb463dc..4375da16843 100644 --- a/jstests/change_streams/change_stream_collation.js +++ b/jstests/change_streams/change_stream_collation.js @@ -5,7 +5,8 @@ "use strict"; load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'. + load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest' and + // 'runCommandChangeStreamPassthroughAware'. load("jstests/libs/fixture_helpers.js"); // For 'isMongos'. if (FixtureHelpers.isMongos(db)) { @@ -230,6 +231,115 @@ assert.docEq(cursor.next(), {docId: 1}); assert(!cursor.hasNext()); }()); + + // Test that resuming a change stream on a collection that has been dropped requires the + // user to explicitly specify the collation. This is testing that we cannot resume if we + // need to retrieve the collection metadata. + (function() { + const collName = "change_stream_case_insensitive"; + let caseInsensitiveCollection = + assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive}); + + let changeStream = caseInsensitiveCollection.watch( + [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive}); + + assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"})); + + assert.soon(() => changeStream.hasNext()); + const next = changeStream.next(); + assert.docEq(next.documentKey, {_id: 0}); + const resumeToken = next._id; + + // Insert a second document to see after resuming. + assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"})); + + // Drop the collection to invalidate the stream. + assertDropCollection(db, collName); + + // Test that a $changeStream is allowed to resume on the dropped collection if an + // explicit collation is provided, even if it doesn't match the original collection + // default collation. + changeStream = caseInsensitiveCollection.watch( + [{$match: {"fullDocument.text": "ABC"}}], + {resumeAfter: resumeToken, collation: {locale: "simple"}}); + + assert.soon(() => changeStream.hasNext()); + assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); + + // Test that a pipeline without an explicit collation is not allowed to resume the + // change stream after the collection has been dropped. Do not modify this command in + // the passthrough suite(s) since whole-db and whole-cluster change streams are allowed + // to resume without an explicit collation. + assert.commandFailedWithCode( + runCommandChangeStreamPassthroughAware( + db, + { + aggregate: collName, + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + cursor: {}, + }, + true), // doNotModifyInPassthroughs + ErrorCodes.InvalidResumeToken); + }()); + + // Test that the default collation of a new version of the collection is not applied when + // resuming a change stream from before a collection drop. + (function() { + const collName = "change_stream_case_insensitive"; + let caseInsensitiveCollection = + assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive}); + + let changeStream = caseInsensitiveCollection.watch( + [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive}); + + assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"})); + + assert.soon(() => changeStream.hasNext()); + const next = changeStream.next(); + assert.docEq(next.documentKey, {_id: 0}); + const resumeToken = next._id; + + // Insert a second document to see after resuming. + assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"})); + + // Recreate the collection with a different collation. + caseInsensitiveCollection = assertDropAndRecreateCollection( + db, caseInsensitiveCollection.getName(), {collation: {locale: "simple"}}); + assert.writeOK(caseInsensitiveCollection.insert({_id: "new collection", text: "abc"})); + + // Verify that the stream sees the insert before the drop and then is exhausted. We + // won't see the invalidate because the pipeline has a $match stage after the + // $changeStream. + assert.soon(() => changeStream.hasNext()); + assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"}); + assert(changeStream.isExhausted()); + + // Test that a pipeline with an explicit collation is allowed to resume from before the + // collection is dropped and recreated. + changeStream = caseInsensitiveCollection.watch( + [{$match: {"fullDocument.text": "ABC"}}], + {resumeAfter: resumeToken, collation: {locale: "fr"}}); + + assert.soon(() => changeStream.hasNext()); + assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); + assert(changeStream.isExhausted()); + + // Test that a pipeline without an explicit collation is not allowed to resume, + // even though the collection has been recreated with the same default collation as it + // had previously. Do not modify this command in the passthrough suite(s) since whole-db + // and whole-cluster change streams are allowed to resume without an explicit collation. + assert.commandFailedWithCode( + runCommandChangeStreamPassthroughAware( + db, + { + aggregate: collName, + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + cursor: {} + }, + true), // doNotModifyInPassthroughs + ErrorCodes.InvalidResumeToken); + }()); + } finally { if (FixtureHelpers.isMongos(db)) { // TODO: SERVER-33944 Change streams on sharded collection with non-simple default diff --git a/jstests/change_streams/change_stream_invalidation.js b/jstests/change_streams/change_stream_invalidation.js index c525ece3f16..e955af3af87 100644 --- a/jstests/change_streams/change_stream_invalidation.js +++ b/jstests/change_streams/change_stream_invalidation.js @@ -3,26 +3,18 @@ (function() { "use strict"; - load("jstests/libs/change_stream_util.js"); - load('jstests/libs/uuid_util.js'); load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - let cst = new ChangeStreamTest(db); - db.getMongo().forceReadMode('commands'); // Write a document to the collection and test that the change stream returns it // and getMore command closes the cursor afterwards. const collGetMore = assertDropAndRecreateCollection(db, "change_stream_getmore_invalidations"); // We awaited the replication of the first write, so the change stream shouldn't return it. - // Use { w: "majority" } to deal with journaling correctly, even though we only have one node. - assert.writeOK(collGetMore.insert({_id: 0, a: 1}, {writeConcern: {w: "majority"}})); - - let aggcursor = - cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collGetMore}); + assert.writeOK(collGetMore.insert({_id: 0, a: 1})); - const collGetMoreUuid = getUUIDFromListCollections(db, collGetMore.getName()); + let changeStream = collGetMore.watch(); // Drop the collection and test that we return "invalidate" entry and close the cursor. However, // we return all oplog entries preceding the drop. @@ -33,45 +25,61 @@ assert.writeOK(collGetMore.remove({_id: 1})); // Drop the collection. assert.commandWorked(db.runCommand({drop: collGetMore.getName()})); + // We should get 4 oplog entries of type insert, update, delete, and invalidate. The cursor // should be closed. - let change = cst.getOneChange(aggcursor); - assert.eq(change.operationType, "insert", tojson(change)); - change = cst.getOneChange(aggcursor); - assert.eq(change.operationType, "update", tojson(change)); - change = cst.getOneChange(aggcursor); - assert.eq(change.operationType, "delete", tojson(change)); - cst.assertNextChangesEqual({ - cursor: aggcursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "insert"); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "update"); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "delete"); + assert.soon(() => changeStream.hasNext()); + assert.eq(changeStream.next().operationType, "invalidate"); + assert(changeStream.isExhausted()); jsTestLog("Testing aggregate command closes cursor for invalidate entries"); const collAgg = assertDropAndRecreateCollection(db, "change_stream_agg_invalidations"); - const collAggUuid = getUUIDFromListCollections(db, collAgg.getName()); + // Get a valid resume token that the next aggregate command can use. - aggcursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collAgg}); + changeStream = collAgg.watch(); - assert.writeOK(collAgg.insert({_id: 1}, {writeConcern: {w: "majority"}})); + assert.writeOK(collAgg.insert({_id: 1})); - change = cst.getOneChange(aggcursor, false); + assert.soon(() => changeStream.hasNext()); + let change = changeStream.next(); + assert.eq(change.operationType, "insert"); + assert.eq(change.documentKey, {_id: 1}); const resumeToken = change._id; - // It should not possible to resume a change stream after a collection drop, even if the - // invalidate has not been received. + // Insert another document after storing the resume token. + assert.writeOK(collAgg.insert({_id: 2})); + + assert.soon(() => changeStream.hasNext()); + change = changeStream.next(); + assert.eq(change.operationType, "insert"); + assert.eq(change.documentKey, {_id: 2}); + + // Drop the collection and invalidate the change stream. assertDropCollection(db, collAgg.getName()); // Wait for two-phase drop to complete, so that the UUID no longer exists. assert.soon(function() { return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, collAgg.getName()); }); - ChangeStreamTest.assertChangeStreamThrowsCode({ - db: db, - collName: collAgg.getName(), - pipeline: [{$changeStream: {resumeAfter: resumeToken}}], - expectedCode: 40615, - doNotModifyInPassthroughs: true - }); + + // Resume the change stream after the collection drop, up to and including the invalidate. This + // is allowed if an explicit collation is provided. + changeStream = collAgg.watch([], {resumeAfter: resumeToken, collation: {locale: "simple"}}); + + assert.soon(() => changeStream.hasNext()); + change = changeStream.next(); + assert.eq(change.operationType, "insert"); + assert.eq(change.documentKey, {_id: 2}); + + assert.soon(() => changeStream.hasNext()); + change = changeStream.next(); + assert.eq(change.operationType, "invalidate"); + assert(changeStream.isExhausted()); // Test that it is possible to open a new change stream cursor on a collection that does not // exist. @@ -80,20 +88,41 @@ assertDropCollection(db, collDoesNotExistName); // Cursor creation succeeds, but there are no results. - aggcursor = cst.startWatchingChanges( - {collection: collDoesNotExistName, pipeline: [{$changeStream: {}}]}); + const cursorObj = assert + .commandWorked(db.runCommand({ + aggregate: collDoesNotExistName, + pipeline: [{$changeStream: {}}], + cursor: {batchSize: 1}, + })) + .cursor; // We explicitly test getMore, to ensure that the getMore command for a non-existent collection // does not return an error. - aggcursor = cst.getNextBatch(aggcursor); - assert.neq(aggcursor.id, 0); - assert.eq(aggcursor.nextBatch.length, 0, tojson(aggcursor.nextBatch)); + let getMoreResult = + assert + .commandWorked(db.runCommand( + {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1})) + .cursor; + assert.neq(getMoreResult.id, 0); + assert.eq(getMoreResult.nextBatch.length, 0, tojson(getMoreResult.nextBatch)); // After collection creation, we see oplog entries for the collection. const collNowExists = assertCreateCollection(db, collDoesNotExistName); assert.writeOK(collNowExists.insert({_id: 0})); - change = cst.getOneChange(aggcursor); - assert.eq(change.operationType, "insert", tojson(change)); - cst.cleanUp(); + assert.soon(function() { + getMoreResult = + assert + .commandWorked(db.runCommand( + {getMore: cursorObj.id, collection: collDoesNotExistName, batchSize: 1})) + .cursor; + assert.neq(getMoreResult.id, 0); + return getMoreResult.nextBatch.length > 0; + }, "Timed out waiting for another result from getMore on non-existent collection."); + assert.eq(getMoreResult.nextBatch.length, 1); + assert.eq(getMoreResult.nextBatch[0].operationType, "insert"); + assert.eq(getMoreResult.nextBatch[0].documentKey, {_id: 0}); + + assert.commandWorked( + db.runCommand({killCursors: collDoesNotExistName, cursors: [getMoreResult.id]})); }()); diff --git a/jstests/change_streams/change_stream_rename_resumability.js b/jstests/change_streams/change_stream_rename_resumability.js index 5d7e8d3c153..46a3b3575b0 100644 --- a/jstests/change_streams/change_stream_rename_resumability.js +++ b/jstests/change_streams/change_stream_rename_resumability.js @@ -5,7 +5,7 @@ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - const coll = assertDropAndRecreateCollection(db, "change_stream_invalidate_resumability"); + let coll = assertDropAndRecreateCollection(db, "change_stream_invalidate_resumability"); // Drop the collection we'll rename to _before_ starting the changeStream, so that we don't // get accidentally an invalidate when running on the whole DB or cluster. @@ -19,8 +19,11 @@ assert.commandWorked(coll.renameCollection(coll.getName() + "_renamed")); + // Update 'coll' to point to the renamed collection. + coll = db[coll.getName() + "_renamed"]; + // Insert another document after the rename. - assert.commandWorked(coll.insert({_id: 2})); + assert.writeOK(coll.insert({_id: 2})); // We should get 2 oplog entries of type insert and invalidate. assert.soon(() => cursor.hasNext()); diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index 61e51f989d2..ec7f47d57f6 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -169,6 +169,30 @@ assert(latestChange.hasOwnProperty("fullDocument")); assert.eq(latestChange.fullDocument, null); + // Test that we can resume a change stream with 'fullDocument: updateLookup' after the + // collection has been dropped. This is only allowed if an explicit collation is provided. + cursor = cst.startWatchingChanges({ + collection: coll, + pipeline: [ + {$changeStream: {resumeAfter: deleteDocResumePoint, fullDocument: "updateLookup"}}, + {$match: {operationType: {$ne: "delete"}}} + ], + aggregateOptions: {cursor: {batchSize: 0}, collation: {locale: "simple"}} + }); + + // Check the next $changeStream entry; this is the test document inserted above. + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "insert"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, {_id: "fullDocument is lookup 2"}); + + // The next entry is the 'update' operation. Because the collection has been dropped, our + // attempt to look up the post-image results in a null document. + latestChange = cst.getOneChange(cursor); + assert.eq(latestChange.operationType, "update"); + assert(latestChange.hasOwnProperty("fullDocument")); + assert.eq(latestChange.fullDocument, null); + // Test that looking up the post image of an update after the collection has been dropped // and created again will result in 'fullDocument' with a value of null. This must be done // using getMore because new cursors cannot be established after a collection drop. |