diff options
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/collation.js | 657 | ||||
-rw-r--r-- | jstests/change_streams/metadata_notifications.js | 14 | ||||
-rw-r--r-- | jstests/change_streams/resume_from_high_water_mark_token.js | 89 |
3 files changed, 391 insertions, 369 deletions
diff --git a/jstests/change_streams/collation.js b/jstests/change_streams/collation.js index 77d345dc0b1..e99f6064b60 100644 --- a/jstests/change_streams/collation.js +++ b/jstests/change_streams/collation.js @@ -1,5 +1,6 @@ /** - * Tests that a change stream can use a user-specified, or collection-default collation. + * Tests that a change stream can take a user-specified collation, does not inherit the collection's + * default collation, and uses the simple collation if none is provided. */ (function() { "use strict"; @@ -7,353 +8,339 @@ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest' and // 'runCommandChangeStreamPassthroughAware'. - load("jstests/libs/fixture_helpers.js"); // For 'isMongos'. - - if (FixtureHelpers.isMongos(db)) { - // TODO: SERVER-33944 Change streams on sharded collection with non-simple default - // collation may be erroneously invalidated if a chunk migration occurs. When this bug is - // fixed, chunk migrations should be allowed in this test, and we should remove this call - // to stopBalancer(). - sh.stopBalancer(); - } - - try { - let cst = new ChangeStreamTest(db); - - const caseInsensitive = {locale: "en_US", strength: 2}; - - let caseInsensitiveCollection = "change_stream_case_insensitive"; - assertDropCollection(db, caseInsensitiveCollection); - - // Test that you can open a change stream before the collection exists, and it will use the - // simple collation. Tag this stream as 'doNotModifyInPassthroughs', since whole-db and - // cluster-wide streams do not adhere to the same collation rules that we will be testing - // with this cursor. - const simpleCollationStream = cst.startWatchingChanges({ - pipeline: [{$changeStream: {}}], - collection: caseInsensitiveCollection, - doNotModifyInPassthroughs: true - }); - // Create the collection with a non-default collation - this should invalidate the stream we - // opened before it existed. - caseInsensitiveCollection = - assertCreateCollection(db, caseInsensitiveCollection, {collation: caseInsensitive}); - cst.assertNextChangesEqual({ - cursor: simpleCollationStream, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); + let cst = new ChangeStreamTest(db); + + const caseInsensitive = {locale: "en_US", strength: 2}; - const caseInsensitivePipeline = [ + let caseInsensitiveCollection = "change_stream_case_insensitive"; + assertDropCollection(db, caseInsensitiveCollection); + + // Test that you can open a change stream before the collection exists, and it will use the + // simple collation. Tag this stream as 'doNotModifyInPassthroughs', since only individual + // collections have the concept of a default collation. + const simpleCollationStream = cst.startWatchingChanges({ + pipeline: [ {$changeStream: {}}, - {$match: {"fullDocument.text": "abc"}}, - {$project: {docId: "$documentKey._id"}} - ]; - - // Test that $changeStream will implicitly adopt the default collation of the collection on - // which it is run. Tag this stream as 'doNotModifyInPassthroughs', since whole-db and - // cluster-wide streams do not have default collations. - const implicitCaseInsensitiveStream = cst.startWatchingChanges({ - pipeline: caseInsensitivePipeline, - collection: caseInsensitiveCollection, - doNotModifyInPassthroughs: true - }); - // Test that a collation can be explicitly specified for the $changeStream. This does not - // need to be tagged 'doNotModifyInPassthroughs', since whole-db and cluster-wide - // changeStreams will use an explicit collation if present. - let explicitCaseInsensitiveStream = cst.startWatchingChanges({ - pipeline: caseInsensitivePipeline, - collection: caseInsensitiveCollection, - aggregateOptions: {collation: caseInsensitive} + { + $match: + {$or: [{"fullDocument._id": "INSERT_ONE"}, {"fullDocument._id": "INSERT_TWO"}]} + }, + {$project: {docId: "$fullDocument._id"}} + ], + collection: caseInsensitiveCollection, + doNotModifyInPassthroughs: true + }); + + // Create the collection with a non-default collation. The stream should continue to use the + // simple collation. + caseInsensitiveCollection = + assertCreateCollection(db, caseInsensitiveCollection, {collation: caseInsensitive}); + assert.commandWorked( + caseInsensitiveCollection.insert([{_id: "insert_one"}, {_id: "INSERT_TWO"}])); + cst.assertNextChangesEqual( + {cursor: simpleCollationStream, expectedChanges: [{docId: "INSERT_TWO"}]}); + + const caseInsensitivePipeline = [ + {$changeStream: {}}, + {$match: {"fullDocument.text": "abc"}}, + {$project: {docId: "$documentKey._id"}} + ]; + + // Test that $changeStream will not implicitly adopt the default collation of the collection on + // which it is run. Tag this stream as 'doNotModifyInPassthroughs'; whole-db and cluster-wide + // streams do not have default collations. + const didNotInheritCollationStream = cst.startWatchingChanges({ + pipeline: caseInsensitivePipeline, + collection: caseInsensitiveCollection, + doNotModifyInPassthroughs: true + }); + // Test that a collation can be explicitly specified for the $changeStream. This does not need + // to be tagged 'doNotModifyInPassthroughs', since whole-db and cluster-wide changeStreams will + // use an explicit collation if present. + let explicitCaseInsensitiveStream = cst.startWatchingChanges({ + pipeline: caseInsensitivePipeline, + collection: caseInsensitiveCollection, + aggregateOptions: {collation: caseInsensitive} + }); + + assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"})); + assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"})); + + // 'didNotInheritCollationStream' should not have inherited the collection's case-insensitive + // default collation, and should only see the second insert. 'explicitCaseInsensitiveStream' + // should see both inserts. + cst.assertNextChangesEqual( + {cursor: didNotInheritCollationStream, expectedChanges: [{docId: 1}]}); + cst.assertNextChangesEqual( + {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]}); + + // Test that the collation does not apply to the scan over the oplog. + const similarNameCollection = assertDropAndRecreateCollection( + db, "cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe", {collation: {locale: "en_US"}}); + + // We must recreate the explicitCaseInsensitiveStream and set 'doNotModifyInPassthroughs'. Whole + // db and cluster-wide streams use the simple collation while scanning the oplog, but they don't + // filter the oplog by collection name. The subsequent $match stage which we inject into the + // pipeline to filter for a specific collection will obey the pipeline's case-insensitive + // collation, meaning that 'cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe' will match + // 'change_stream_case_insensitive'. + explicitCaseInsensitiveStream = cst.startWatchingChanges({ + pipeline: caseInsensitivePipeline, + collection: caseInsensitiveCollection, + aggregateOptions: {collation: caseInsensitive}, + doNotModifyInPassthroughs: true + }); + + assert.writeOK(similarNameCollection.insert({_id: 0, text: "aBc"})); + assert.writeOK(caseInsensitiveCollection.insert({_id: 2, text: "ABC"})); + + // The case-insensitive stream should not see the first insert (to the other collection), only + // the second. We do not expect to see the insert in 'didNotInheritCollationStream'. + cst.assertNextChangesEqual( + {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]}); + + // Test that creating a collection without a collation does not invalidate any change streams + // that were opened before the collection existed. + (function() { + let noCollationCollection = "change_stream_no_collation"; + assertDropCollection(db, noCollationCollection); + + const streamCreatedBeforeNoCollationCollection = cst.startWatchingChanges({ + pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], + collection: noCollationCollection }); - assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"})); - assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"})); + noCollationCollection = assertCreateCollection(db, noCollationCollection); + assert.writeOK(noCollationCollection.insert({_id: 0})); cst.assertNextChangesEqual( - {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]}); + {cursor: streamCreatedBeforeNoCollationCollection, expectedChanges: [{docId: 0}]}); + }()); + + // Test that creating a collection and explicitly specifying the simple collation does not + // invalidate any change streams that were opened before the collection existed. + (function() { + let simpleCollationCollection = "change_stream_simple_collation"; + assertDropCollection(db, simpleCollationCollection); + + const streamCreatedBeforeSimpleCollationCollection = cst.startWatchingChanges({ + pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], + collection: simpleCollationCollection + }); + + simpleCollationCollection = + assertCreateCollection(db, simpleCollationCollection, {collation: {locale: "simple"}}); + assert.writeOK(simpleCollationCollection.insert({_id: 0})); + cst.assertNextChangesEqual( - {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]}); - - // Test that the collation does not apply to the scan over the oplog. - const similarNameCollection = assertDropAndRecreateCollection( - db, "cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe", {collation: {locale: "en_US"}}); - - // We must recreate the explicitCaseInsensitiveStream and set 'doNotModifyInPassthroughs'. - // Whole-db and cluster-wide streams use the simple collation while scanning the oplog, but - // do not filter the oplog by collection name. The subsequent $match stage which we inject - // into the pipeline to filter for a specific collection will obey the pipeline's - // case-insensitive collation, meaning that 'cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe' will match - // 'change_stream_case_insensitive'. - explicitCaseInsensitiveStream = cst.startWatchingChanges({ - pipeline: caseInsensitivePipeline, - collection: caseInsensitiveCollection, + {cursor: streamCreatedBeforeSimpleCollationCollection, expectedChanges: [{docId: 0}]}); + }()); + + // Test that creating a change stream with a non-default collation, then creating a collection + // with the same collation will not invalidate the change stream. + (function() { + let frenchCollection = "change_stream_french_collation"; + assertDropCollection(db, frenchCollection); + + const frenchChangeStream = cst.startWatchingChanges({ + pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], + aggregateOptions: {collation: {locale: "fr"}}, + collection: frenchCollection + }); + + frenchCollection = + assertCreateCollection(db, frenchCollection, {collation: {locale: "fr"}}); + assert.writeOK(frenchCollection.insert({_id: 0})); + + cst.assertNextChangesEqual({cursor: frenchChangeStream, expectedChanges: [{docId: 0}]}); + }()); + + // Test that creating a change stream with a non-default collation, then creating a collection + // with *a different* collation will not invalidate the change stream. + (function() { + let germanCollection = "change_stream_german_collation"; + assertDropCollection(db, germanCollection); + + const englishCaseInsensitiveStream = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {}}, + {$match: {"fullDocument.text": "abc"}}, + {$project: {docId: "$documentKey._id"}} + ], aggregateOptions: {collation: caseInsensitive}, - doNotModifyInPassthroughs: true + collection: germanCollection }); - assert.writeOK(similarNameCollection.insert({_id: 0, text: "aBc"})); - assert.writeOK(caseInsensitiveCollection.insert({_id: 2, text: "ABC"})); + germanCollection = + assertCreateCollection(db, germanCollection, {collation: {locale: "de"}}); + assert.writeOK(germanCollection.insert({_id: 0, text: "aBc"})); - // The existing stream should not see the first insert (to the other collection), but - // should see the second. cst.assertNextChangesEqual( - {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]}); + {cursor: englishCaseInsensitiveStream, expectedChanges: [{docId: 0}]}); + }()); + + // Test that creating a change stream with a non-default collation against a collection that has + // a non-simple default collation will use the collation specified on the operation. + (function() { + const caseInsensitiveCollection = assertDropAndRecreateCollection( + db, "change_stream_case_insensitive", {collation: caseInsensitive}); + + const englishCaseSensitiveStream = cst.startWatchingChanges({ + pipeline: [ + {$changeStream: {}}, + {$match: {"fullDocument.text": "abc"}}, + {$project: {docId: "$documentKey._id"}} + ], + aggregateOptions: {collation: {locale: "en_US"}}, + collection: caseInsensitiveCollection + }); + + assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"})); + assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"})); + cst.assertNextChangesEqual( - {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]}); - - // Test that creating a collection without a collation does not invalidate any change - // streams that were opened before the collection existed. - (function() { - let noCollationCollection = "change_stream_no_collation"; - assertDropCollection(db, noCollationCollection); - - const streamCreatedBeforeNoCollationCollection = cst.startWatchingChanges({ - pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], - collection: noCollationCollection - }); - - noCollationCollection = assertCreateCollection(db, noCollationCollection); - assert.writeOK(noCollationCollection.insert({_id: 0})); - - cst.assertNextChangesEqual( - {cursor: streamCreatedBeforeNoCollationCollection, expectedChanges: [{docId: 0}]}); - }()); - - // Test that creating a collection and explicitly specifying the simple collation does not - // invalidate any change streams that were opened before the collection existed. - (function() { - let simpleCollationCollection = "change_stream_simple_collation"; - assertDropCollection(db, simpleCollationCollection); - - const streamCreatedBeforeSimpleCollationCollection = cst.startWatchingChanges({ - pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], - collection: simpleCollationCollection - }); - - simpleCollationCollection = assertCreateCollection( - db, simpleCollationCollection, {collation: {locale: "simple"}}); - assert.writeOK(simpleCollationCollection.insert({_id: 0})); - - cst.assertNextChangesEqual({ - cursor: streamCreatedBeforeSimpleCollationCollection, - expectedChanges: [{docId: 0}] - }); - }()); - - // Test that creating a change stream with a non-default collation, then creating a - // collection with the same collation will not invalidate the change stream. - (function() { - let frenchCollection = "change_stream_french_collation"; - assertDropCollection(db, frenchCollection); - - const frenchChangeStream = cst.startWatchingChanges({ - pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}], - aggregateOptions: {collation: {locale: "fr"}}, - collection: frenchCollection - }); - - frenchCollection = - assertCreateCollection(db, frenchCollection, {collation: {locale: "fr"}}); - assert.writeOK(frenchCollection.insert({_id: 0})); - - cst.assertNextChangesEqual({cursor: frenchChangeStream, expectedChanges: [{docId: 0}]}); - }()); - - // Test that creating a change stream with a non-default collation, then creating a - // collection with *a different* collation will not invalidate the change stream. - (function() { - let germanCollection = "change_stream_german_collation"; - assertDropCollection(db, germanCollection); - - const englishCaseInsensitiveStream = cst.startWatchingChanges({ - pipeline: [ - {$changeStream: {}}, - {$match: {"fullDocument.text": "abc"}}, - {$project: {docId: "$documentKey._id"}} - ], - aggregateOptions: {collation: caseInsensitive}, - collection: germanCollection - }); - - germanCollection = - assertCreateCollection(db, germanCollection, {collation: {locale: "de"}}); - assert.writeOK(germanCollection.insert({_id: 0, text: "aBc"})); - - cst.assertNextChangesEqual( - {cursor: englishCaseInsensitiveStream, expectedChanges: [{docId: 0}]}); - }()); - - // Test that creating a change stream with a non-default collation against a collection - // that has a non-simple default collation will use the collation specified on the - // operation. - (function() { - const caseInsensitiveCollection = assertDropAndRecreateCollection( - db, "change_stream_case_insensitive", {collation: caseInsensitive}); - - const englishCaseSensitiveStream = cst.startWatchingChanges({ - pipeline: [ - {$changeStream: {}}, - {$match: {"fullDocument.text": "abc"}}, - {$project: {docId: "$documentKey._id"}} - ], - aggregateOptions: {collation: {locale: "en_US"}}, - collection: caseInsensitiveCollection - }); - - assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"})); - assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"})); - - cst.assertNextChangesEqual( - {cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]}); - }()); - - // Test that collation is supported by the shell helper. Test that creating a change - // stream with a non-default collation against a collection that has a simple default - // collation will use the collation specified on the operation. - (function() { - const noCollationCollection = - assertDropAndRecreateCollection(db, "change_stream_no_collation"); - - const cursor = noCollationCollection.watch( - [{$match: {"fullDocument.text": "abc"}}, {$project: {docId: "$documentKey._id"}}], - {collation: caseInsensitive}); - assert(!cursor.hasNext()); - assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"})); - assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"})); - assert.soon(() => cursor.hasNext()); - assertChangeStreamEventEq(cursor.next(), {docId: 0}); - assert.soon(() => cursor.hasNext()); - assertChangeStreamEventEq(cursor.next(), {docId: 1}); - assert(!cursor.hasNext()); - }()); - - // Test that resuming a change stream on a collection that has been dropped requires the - // user to explicitly specify the collation. This is testing that we cannot resume if we - // need to retrieve the collection metadata. - (function() { - const collName = "change_stream_case_insensitive"; - let caseInsensitiveCollection = - assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive}); - - let changeStream = caseInsensitiveCollection.watch( - [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive}); - - assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"})); - - assert.soon(() => changeStream.hasNext()); - const next = changeStream.next(); - assert.docEq(next.documentKey, {_id: 0}); - const resumeToken = next._id; - - // Insert a second document to see after resuming. - assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"})); - - // Drop the collection to invalidate the stream. - assertDropCollection(db, collName); - - // Test that a $changeStream is allowed to resume on the dropped collection if an - // explicit collation is provided, even if it doesn't match the original collection - // default collation. - changeStream = caseInsensitiveCollection.watch( - [{$match: {"fullDocument.text": "ABC"}}], - {resumeAfter: resumeToken, collation: {locale: "simple"}}); - - assert.soon(() => changeStream.hasNext()); - assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); - - // Test that a pipeline without an explicit collation is not allowed to resume the - // change stream after the collection has been dropped. Do not modify this command in - // the passthrough suite(s) since whole-db and whole-cluster change streams are allowed - // to resume without an explicit collation. - assert.commandFailedWithCode( - runCommandChangeStreamPassthroughAware( - db, - { - aggregate: collName, - pipeline: [{$changeStream: {resumeAfter: resumeToken}}], - cursor: {}, - }, - true), // doNotModifyInPassthroughs - ErrorCodes.InvalidResumeToken); - }()); - - // Test that the default collation of a new version of the collection is not applied when - // resuming a change stream from before a collection drop. - (function() { - const collName = "change_stream_case_insensitive"; - let caseInsensitiveCollection = - assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive}); - - let changeStream = caseInsensitiveCollection.watch( - [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive}); - - assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"})); - - assert.soon(() => changeStream.hasNext()); - const next = changeStream.next(); - assert.docEq(next.documentKey, {_id: 0}); - const resumeToken = next._id; - - // Insert a second document to see after resuming. - assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"})); - - // Recreate the collection with a different collation. - caseInsensitiveCollection = assertDropAndRecreateCollection( - db, caseInsensitiveCollection.getName(), {collation: {locale: "simple"}}); - assert.writeOK(caseInsensitiveCollection.insert({_id: "new collection", text: "abc"})); - - // Verify that the stream sees the insert before the drop and then is exhausted. We - // won't see the invalidate because the pipeline has a $match stage after the - // $changeStream. - assert.soon(() => changeStream.hasNext()); - assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"}); - // Only single-collection streams will be exhausted from the drop. Use 'next()' instead - // of 'isExhausted()' to force a getMore since the previous getMore may not include the - // collection drop, which is more likely with sharded collections on slow machines. - if (!isChangeStreamPassthrough()) { - assert.throws(() => changeStream.next()); - } - - // Test that a pipeline with an explicit collation is allowed to resume from before the - // collection is dropped and recreated. - changeStream = caseInsensitiveCollection.watch( - [{$match: {"fullDocument.text": "ABC"}}], - {resumeAfter: resumeToken, collation: {locale: "fr"}}); - - assert.soon(() => changeStream.hasNext()); - assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); - // Only single-collection streams will be exhausted from the drop. Use 'next()' instead - // of 'isExhausted()' to force a getMore since the previous getMore may not include the - // collection drop, which is more likely with sharded collections on slow machines. - if (!isChangeStreamPassthrough()) { - assert.throws(() => changeStream.next()); - } - - // Test that a pipeline without an explicit collation is not allowed to resume, - // even though the collection has been recreated with the same default collation as it - // had previously. Do not modify this command in the passthrough suite(s) since whole-db - // and whole-cluster change streams are allowed to resume without an explicit collation. - assert.commandFailedWithCode( - runCommandChangeStreamPassthroughAware( - db, - { - aggregate: collName, - pipeline: [{$changeStream: {resumeAfter: resumeToken}}], - cursor: {} - }, - true), // doNotModifyInPassthroughs - ErrorCodes.InvalidResumeToken); - }()); - - } finally { - if (FixtureHelpers.isMongos(db)) { - // TODO: SERVER-33944 Change streams on sharded collection with non-simple default - // collation may be erroneously invalidated if a chunk migration occurs. When this bug - // is fixed, chunk migrations should be allowed in this test, and we should remove this - // call to startBalancer() as well as the earlier call to stopBalancer(). - sh.startBalancer(); + {cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]}); + }()); + + // Test that collation is supported by the shell helper. Test that creating a change stream with + // a non-default collation against a collection that has a simple default collation will use the + // collation specified on the operation. + (function() { + const noCollationCollection = + assertDropAndRecreateCollection(db, "change_stream_no_collation"); + + const cursor = noCollationCollection.watch( + [{$match: {"fullDocument.text": "abc"}}, {$project: {docId: "$documentKey._id"}}], + {collation: caseInsensitive}); + assert(!cursor.hasNext()); + assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"})); + assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"})); + assert.soon(() => cursor.hasNext()); + assertChangeStreamEventEq(cursor.next(), {docId: 0}); + assert.soon(() => cursor.hasNext()); + assertChangeStreamEventEq(cursor.next(), {docId: 1}); + assert(!cursor.hasNext()); + }()); + + // Test that we can resume a change stream on a collection that has been dropped without + // requiring the user to explicitly specify the collation. + (function() { + const collName = "change_stream_case_insensitive"; + let caseInsensitiveCollection = + assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive}); + + let changeStream = caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "abc"}}], + {collation: caseInsensitive}); + + assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"})); + + assert.soon(() => changeStream.hasNext()); + const next = changeStream.next(); + assert.docEq(next.documentKey, {_id: 0}); + const resumeToken = next._id; + + // Insert a second document to see after resuming. + assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"})); + + // Drop the collection to invalidate the stream. + assertDropCollection(db, collName); + + // Test that a $changeStream is allowed to resume on the dropped collection with an explicit + // collation, even if it doesn't match the original collection's default collation. + changeStream = caseInsensitiveCollection.watch( + [{$match: {"fullDocument.text": "ABC"}}], + {resumeAfter: resumeToken, collation: {locale: "simple"}}); + + assert.soon(() => changeStream.hasNext()); + assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); + + // Test that a pipeline without an explicit collation is allowed to resume the change stream + // after the collection has been dropped, and it will use the simple collation. Do not + // modify this in the passthrough suite(s) since only individual collections have the + // concept of a default collation. + const doNotModifyInPassthroughs = true; + const cmdRes = assert.commandWorked(runCommandChangeStreamPassthroughAware( + db, + { + aggregate: collName, + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + cursor: {}, + }, + doNotModifyInPassthroughs)); + + changeStream = new DBCommandCursor(db, cmdRes); + assert.soon(() => changeStream.hasNext()); + assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); + }()); + + // Test that the default collation of a new version of the collection is not applied when + // resuming a change stream from before a collection drop. + (function() { + const collName = "change_stream_case_insensitive"; + let caseInsensitiveCollection = + assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive}); + + let changeStream = caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "abc"}}], + {collation: caseInsensitive}); + + assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"})); + + assert.soon(() => changeStream.hasNext()); + const next = changeStream.next(); + assert.docEq(next.documentKey, {_id: 0}); + const resumeToken = next._id; + + // Insert a second document to see after resuming. + assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"})); + + // Recreate the collection with a different collation. + caseInsensitiveCollection = assertDropAndRecreateCollection( + db, caseInsensitiveCollection.getName(), {collation: {locale: "simple"}}); + assert.writeOK(caseInsensitiveCollection.insert({_id: "new collection", text: "abc"})); + + // Verify that the stream sees the insert before the drop and then is exhausted. We won't + // see the invalidate because the pipeline has a $match stage after the $changeStream. + assert.soon(() => changeStream.hasNext()); + assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"}); + // Only single-collection streams will be exhausted from the drop. Use 'next()' instead of + // 'isExhausted()' to force a getMore since the previous getMore may not include the + // collection drop, which is more likely with sharded collections on slow machines. + if (!isChangeStreamPassthrough()) { + assert.throws(() => changeStream.next()); } - } + + // Test that a pipeline with an explicit collation is allowed to resume from before the + // collection is dropped and recreated. + changeStream = + caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "ABC"}}], + {resumeAfter: resumeToken, collation: {locale: "fr"}}); + + assert.soon(() => changeStream.hasNext()); + assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); + // Only single-collection streams will be exhausted from the drop. Use 'next()' instead of + // 'isExhausted()' to force a getMore since the previous getMore may not include the + // collection drop, which is more likely with sharded collections on slow machines. + if (!isChangeStreamPassthrough()) { + assert.throws(() => changeStream.next()); + } + + // Test that a pipeline without an explicit collation is allowed to resume, even though the + // collection has been recreated with the same default collation as it had previously. Do + // not modify this command in the passthrough suite(s) since only individual collections + // have the concept of a default collation. + const doNotModifyInPassthroughs = true; + const cmdRes = assert.commandWorked(runCommandChangeStreamPassthroughAware( + db, + { + aggregate: collName, + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + cursor: {} + }, + doNotModifyInPassthroughs)); + + changeStream = new DBCommandCursor(db, cmdRes); + assert.soon(() => changeStream.hasNext()); + assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"}); + }()); })(); diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js index b4073f44bef..b65bd9aaec2 100644 --- a/jstests/change_streams/metadata_notifications.js +++ b/jstests/change_streams/metadata_notifications.js @@ -77,22 +77,20 @@ const resumeTokenDrop = changes[3]._id; const resumeTokenInvalidate = changes[4]._id; - // It should not be possible to resume a change stream after a collection drop without an - // explicit collation, even if the invalidate has not been received. - assert.commandFailedWithCode(db.runCommand({ + // Verify that we can resume a stream after a collection drop without an explicit collation. + assert.commandWorked(db.runCommand({ aggregate: coll.getName(), pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {} - }), - ErrorCodes.InvalidResumeToken); + })); // Recreate the collection. coll = assertCreateCollection(db, collName); assert.writeOK(coll.insert({_id: "after recreate"})); - // Test resuming the change stream from the collection drop using 'resumeAfter'. - // If running in a sharded passthrough suite, resuming from the drop will first return the drop - // from the other shard before returning an invalidate. + // Test resuming the change stream from the collection drop using 'resumeAfter'. If running in a + // sharded passthrough suite, resuming from the drop will first return the drop from the other + // shard before returning an invalidate. cursor = cst.startWatchingChanges({ collection: coll, pipeline: [{$changeStream: {resumeAfter: resumeTokenDrop}}], diff --git a/jstests/change_streams/resume_from_high_water_mark_token.js b/jstests/change_streams/resume_from_high_water_mark_token.js index a58cefb8ea5..bc63fdb33d9 100644 --- a/jstests/change_streams/resume_from_high_water_mark_token.js +++ b/jstests/change_streams/resume_from_high_water_mark_token.js @@ -37,23 +37,33 @@ for (let resumeType of["startAfter", "resumeAfter"]) { cmdResBeforeCollExists = assert.commandWorked(runExactCommand(db, { aggregate: collName, - pipeline: [{$changeStream: {[resumeType]: pbrtBeforeCollExists}}], + pipeline: [ + {$changeStream: {[resumeType]: pbrtBeforeCollExists}}, + { + $match: { + $or: [ + {"fullDocument._id": "INSERT_ONE"}, + {"fullDocument._id": "INSERT_TWO"} + ] + } + } + ], cursor: {} })); } csCursor = new DBCommandCursor(db, cmdResBeforeCollExists); - // But if the collection is created with a non-simple collation, the resumed stream invalidates. + // If the collection is then created with a case-insensitive collation, the resumed stream + // continues to use the simple collation. We see 'INSERT_TWO' but not 'insert_one'. const testCollationCollection = assertCreateCollection(db, collName, {collation: {locale: "en_US", strength: 2}}); assert.commandWorked(testCollationCollection.insert({_id: "insert_one"})); assert.commandWorked(testCollationCollection.insert({_id: "INSERT_TWO"})); assert.soon(() => csCursor.hasNext()); - const invalidate = csCursor.next(); - assert.eq(invalidate.operationType, "invalidate"); // We don't see either insert. + assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_TWO"}); csCursor.close(); - // We can resume from the pre-creation high water mark if we specify an explicit collation... + // We can resume from the pre-creation high water mark if we do not specify a collation... let cmdResResumeFromBeforeCollCreated = assert.commandWorked(runExactCommand(db, { aggregate: collName, pipeline: [ @@ -63,14 +73,35 @@ {$or: [{"fullDocument._id": "INSERT_ONE"}, {"fullDocument._id": "INSERT_TWO"}]} } ], - collation: {locale: "simple"}, cursor: {} })); - // This time the stream does not invalidate. We override the default case-insensitive collation - // with the explicit simple collation we specified, so we match INSERT_TWO but not INSERT_ONE. + // ... but we will not inherit the collection's case-insensitive collation, instead defaulting + // to the simple collation. We will therefore match 'INSERT_TWO' but not 'insert_one'. + csCursor = new DBCommandCursor(db, cmdResResumeFromBeforeCollCreated); + assert.soon(() => csCursor.hasNext()); + assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_TWO"}); + csCursor.close(); + + // If we do specify a non-simple collation, it will be adopted by the pipeline. + cmdResResumeFromBeforeCollCreated = assert.commandWorked(runExactCommand(db, { + aggregate: collName, + pipeline: [ + {$changeStream: {resumeAfter: pbrtBeforeCollExists}}, + { + $match: + {$or: [{"fullDocument._id": "INSERT_ONE"}, {"fullDocument._id": "INSERT_TWO"}]} + } + ], + collation: {locale: "en_US", strength: 2}, + cursor: {} + })); + + // Now we match both 'insert_one' and 'INSERT_TWO'. csCursor = new DBCommandCursor(db, cmdResResumeFromBeforeCollCreated); assert.soon(() => csCursor.hasNext()); + assert.docEq(csCursor.next().fullDocument, {_id: "insert_one"}); + assert.soon(() => csCursor.hasNext()); assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_TWO"}); csCursor.close(); @@ -96,13 +127,21 @@ aggregate: collName, pipeline: [ {$changeStream: {resumeAfter: hwmFromCollWithCollation}}, - {$match: {"fullDocument._id": "insert_four"}} + { + $match: { + $or: [ + {"fullDocument._id": "INSERT_THREE"}, + {"fullDocument._id": "INSERT_FOUR"} + ] + } + } ], cursor: {} })); csCursor = new DBCommandCursor(db, cmdResResumeWithCollation); - // ... and we inherit the collection's case-insensitive collation, matching {_id:"insert_four"}. + // ... but we do not inherit the collection's case-insensitive collation, matching 'INSERT_FOUR' + // but not the preceding 'insert_three'. assert.soon(() => csCursor.hasNext()); assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_FOUR"}); csCursor.close(); @@ -194,23 +233,21 @@ hwmPostCreation = csCursor.getResumeToken(); csCursor.close(); - // Because this HWM has a UUID, we cannot resume from the token if the collection is dropped... + // We can resume from the token if the collection is dropped... assertDropCollection(db, collName); - assert.commandFailedWithCode(runExactCommand(db, { - aggregate: collName, - pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}], - cursor: {} - }), - ErrorCodes.InvalidResumeToken); + assert.commandWorked(runExactCommand(db, { + aggregate: collName, + pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}], + cursor: {} + })); // ... or if the collection is recreated with a different UUID... assertCreateCollection(db, collName); - assert.commandFailedWithCode(runExactCommand(db, { - aggregate: collName, - pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}], - cursor: {} - }), - ErrorCodes.InvalidResumeToken); - // ... unless we specify an explicit collation. + assert.commandWorked(runExactCommand(db, { + aggregate: collName, + pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}], + cursor: {} + })); + // ... or if we specify an explicit collation. assert.commandWorked(runExactCommand(db, { aggregate: collName, pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}], @@ -218,13 +255,13 @@ cursor: {} })); - // But even after the collection is recreated, we can still resume from the pre-creation HWM... + // Even after the collection is recreated, we can still resume from the pre-creation HWM... cmdResResumeFromBeforeCollCreated = assert.commandWorked(runExactCommand(db, { aggregate: collName, pipeline: [{$changeStream: {resumeAfter: pbrtBeforeCollExists}}], cursor: {} })); - // ...and can still see all the events from the collection's original incarnation... + // ...and we can still see all the events from the collection's original incarnation... csCursor = new DBCommandCursor(db, cmdResResumeFromBeforeCollCreated); docCount = 0; assert.soon(() => { |