summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/collation.js657
-rw-r--r--jstests/change_streams/metadata_notifications.js14
-rw-r--r--jstests/change_streams/resume_from_high_water_mark_token.js89
3 files changed, 391 insertions, 369 deletions
diff --git a/jstests/change_streams/collation.js b/jstests/change_streams/collation.js
index 77d345dc0b1..e99f6064b60 100644
--- a/jstests/change_streams/collation.js
+++ b/jstests/change_streams/collation.js
@@ -1,5 +1,6 @@
/**
- * Tests that a change stream can use a user-specified, or collection-default collation.
+ * Tests that a change stream can take a user-specified collation, does not inherit the collection's
+ * default collation, and uses the simple collation if none is provided.
*/
(function() {
"use strict";
@@ -7,353 +8,339 @@
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest' and
// 'runCommandChangeStreamPassthroughAware'.
- load("jstests/libs/fixture_helpers.js"); // For 'isMongos'.
-
- if (FixtureHelpers.isMongos(db)) {
- // TODO: SERVER-33944 Change streams on sharded collection with non-simple default
- // collation may be erroneously invalidated if a chunk migration occurs. When this bug is
- // fixed, chunk migrations should be allowed in this test, and we should remove this call
- // to stopBalancer().
- sh.stopBalancer();
- }
-
- try {
- let cst = new ChangeStreamTest(db);
-
- const caseInsensitive = {locale: "en_US", strength: 2};
-
- let caseInsensitiveCollection = "change_stream_case_insensitive";
- assertDropCollection(db, caseInsensitiveCollection);
-
- // Test that you can open a change stream before the collection exists, and it will use the
- // simple collation. Tag this stream as 'doNotModifyInPassthroughs', since whole-db and
- // cluster-wide streams do not adhere to the same collation rules that we will be testing
- // with this cursor.
- const simpleCollationStream = cst.startWatchingChanges({
- pipeline: [{$changeStream: {}}],
- collection: caseInsensitiveCollection,
- doNotModifyInPassthroughs: true
- });
- // Create the collection with a non-default collation - this should invalidate the stream we
- // opened before it existed.
- caseInsensitiveCollection =
- assertCreateCollection(db, caseInsensitiveCollection, {collation: caseInsensitive});
- cst.assertNextChangesEqual({
- cursor: simpleCollationStream,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
+ let cst = new ChangeStreamTest(db);
+
+ const caseInsensitive = {locale: "en_US", strength: 2};
- const caseInsensitivePipeline = [
+ let caseInsensitiveCollection = "change_stream_case_insensitive";
+ assertDropCollection(db, caseInsensitiveCollection);
+
+ // Test that you can open a change stream before the collection exists, and it will use the
+ // simple collation. Tag this stream as 'doNotModifyInPassthroughs', since only individual
+ // collections have the concept of a default collation.
+ const simpleCollationStream = cst.startWatchingChanges({
+ pipeline: [
{$changeStream: {}},
- {$match: {"fullDocument.text": "abc"}},
- {$project: {docId: "$documentKey._id"}}
- ];
-
- // Test that $changeStream will implicitly adopt the default collation of the collection on
- // which it is run. Tag this stream as 'doNotModifyInPassthroughs', since whole-db and
- // cluster-wide streams do not have default collations.
- const implicitCaseInsensitiveStream = cst.startWatchingChanges({
- pipeline: caseInsensitivePipeline,
- collection: caseInsensitiveCollection,
- doNotModifyInPassthroughs: true
- });
- // Test that a collation can be explicitly specified for the $changeStream. This does not
- // need to be tagged 'doNotModifyInPassthroughs', since whole-db and cluster-wide
- // changeStreams will use an explicit collation if present.
- let explicitCaseInsensitiveStream = cst.startWatchingChanges({
- pipeline: caseInsensitivePipeline,
- collection: caseInsensitiveCollection,
- aggregateOptions: {collation: caseInsensitive}
+ {
+ $match:
+ {$or: [{"fullDocument._id": "INSERT_ONE"}, {"fullDocument._id": "INSERT_TWO"}]}
+ },
+ {$project: {docId: "$fullDocument._id"}}
+ ],
+ collection: caseInsensitiveCollection,
+ doNotModifyInPassthroughs: true
+ });
+
+ // Create the collection with a non-default collation. The stream should continue to use the
+ // simple collation.
+ caseInsensitiveCollection =
+ assertCreateCollection(db, caseInsensitiveCollection, {collation: caseInsensitive});
+ assert.commandWorked(
+ caseInsensitiveCollection.insert([{_id: "insert_one"}, {_id: "INSERT_TWO"}]));
+ cst.assertNextChangesEqual(
+ {cursor: simpleCollationStream, expectedChanges: [{docId: "INSERT_TWO"}]});
+
+ const caseInsensitivePipeline = [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id"}}
+ ];
+
+ // Test that $changeStream will not implicitly adopt the default collation of the collection on
+ // which it is run. Tag this stream as 'doNotModifyInPassthroughs'; whole-db and cluster-wide
+ // streams do not have default collations.
+ const didNotInheritCollationStream = cst.startWatchingChanges({
+ pipeline: caseInsensitivePipeline,
+ collection: caseInsensitiveCollection,
+ doNotModifyInPassthroughs: true
+ });
+ // Test that a collation can be explicitly specified for the $changeStream. This does not need
+ // to be tagged 'doNotModifyInPassthroughs', since whole-db and cluster-wide changeStreams will
+ // use an explicit collation if present.
+ let explicitCaseInsensitiveStream = cst.startWatchingChanges({
+ pipeline: caseInsensitivePipeline,
+ collection: caseInsensitiveCollection,
+ aggregateOptions: {collation: caseInsensitive}
+ });
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
+
+ // 'didNotInheritCollationStream' should not have inherited the collection's case-insensitive
+ // default collation, and should only see the second insert. 'explicitCaseInsensitiveStream'
+ // should see both inserts.
+ cst.assertNextChangesEqual(
+ {cursor: didNotInheritCollationStream, expectedChanges: [{docId: 1}]});
+ cst.assertNextChangesEqual(
+ {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]});
+
+ // Test that the collation does not apply to the scan over the oplog.
+ const similarNameCollection = assertDropAndRecreateCollection(
+ db, "cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe", {collation: {locale: "en_US"}});
+
+ // We must recreate the explicitCaseInsensitiveStream and set 'doNotModifyInPassthroughs'. Whole
+ // db and cluster-wide streams use the simple collation while scanning the oplog, but they don't
+ // filter the oplog by collection name. The subsequent $match stage which we inject into the
+ // pipeline to filter for a specific collection will obey the pipeline's case-insensitive
+ // collation, meaning that 'cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe' will match
+ // 'change_stream_case_insensitive'.
+ explicitCaseInsensitiveStream = cst.startWatchingChanges({
+ pipeline: caseInsensitivePipeline,
+ collection: caseInsensitiveCollection,
+ aggregateOptions: {collation: caseInsensitive},
+ doNotModifyInPassthroughs: true
+ });
+
+ assert.writeOK(similarNameCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 2, text: "ABC"}));
+
+ // The case-insensitive stream should not see the first insert (to the other collection), only
+ // the second. We do not expect to see the insert in 'didNotInheritCollationStream'.
+ cst.assertNextChangesEqual(
+ {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]});
+
+ // Test that creating a collection without a collation does not invalidate any change streams
+ // that were opened before the collection existed.
+ (function() {
+ let noCollationCollection = "change_stream_no_collation";
+ assertDropCollection(db, noCollationCollection);
+
+ const streamCreatedBeforeNoCollationCollection = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
+ collection: noCollationCollection
});
- assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
- assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
+ noCollationCollection = assertCreateCollection(db, noCollationCollection);
+ assert.writeOK(noCollationCollection.insert({_id: 0}));
cst.assertNextChangesEqual(
- {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]});
+ {cursor: streamCreatedBeforeNoCollationCollection, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a collection and explicitly specifying the simple collation does not
+ // invalidate any change streams that were opened before the collection existed.
+ (function() {
+ let simpleCollationCollection = "change_stream_simple_collation";
+ assertDropCollection(db, simpleCollationCollection);
+
+ const streamCreatedBeforeSimpleCollationCollection = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
+ collection: simpleCollationCollection
+ });
+
+ simpleCollationCollection =
+ assertCreateCollection(db, simpleCollationCollection, {collation: {locale: "simple"}});
+ assert.writeOK(simpleCollationCollection.insert({_id: 0}));
+
cst.assertNextChangesEqual(
- {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]});
-
- // Test that the collation does not apply to the scan over the oplog.
- const similarNameCollection = assertDropAndRecreateCollection(
- db, "cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe", {collation: {locale: "en_US"}});
-
- // We must recreate the explicitCaseInsensitiveStream and set 'doNotModifyInPassthroughs'.
- // Whole-db and cluster-wide streams use the simple collation while scanning the oplog, but
- // do not filter the oplog by collection name. The subsequent $match stage which we inject
- // into the pipeline to filter for a specific collection will obey the pipeline's
- // case-insensitive collation, meaning that 'cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe' will match
- // 'change_stream_case_insensitive'.
- explicitCaseInsensitiveStream = cst.startWatchingChanges({
- pipeline: caseInsensitivePipeline,
- collection: caseInsensitiveCollection,
+ {cursor: streamCreatedBeforeSimpleCollationCollection, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a change stream with a non-default collation, then creating a collection
+ // with the same collation will not invalidate the change stream.
+ (function() {
+ let frenchCollection = "change_stream_french_collation";
+ assertDropCollection(db, frenchCollection);
+
+ const frenchChangeStream = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
+ aggregateOptions: {collation: {locale: "fr"}},
+ collection: frenchCollection
+ });
+
+ frenchCollection =
+ assertCreateCollection(db, frenchCollection, {collation: {locale: "fr"}});
+ assert.writeOK(frenchCollection.insert({_id: 0}));
+
+ cst.assertNextChangesEqual({cursor: frenchChangeStream, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a change stream with a non-default collation, then creating a collection
+ // with *a different* collation will not invalidate the change stream.
+ (function() {
+ let germanCollection = "change_stream_german_collation";
+ assertDropCollection(db, germanCollection);
+
+ const englishCaseInsensitiveStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id"}}
+ ],
aggregateOptions: {collation: caseInsensitive},
- doNotModifyInPassthroughs: true
+ collection: germanCollection
});
- assert.writeOK(similarNameCollection.insert({_id: 0, text: "aBc"}));
- assert.writeOK(caseInsensitiveCollection.insert({_id: 2, text: "ABC"}));
+ germanCollection =
+ assertCreateCollection(db, germanCollection, {collation: {locale: "de"}});
+ assert.writeOK(germanCollection.insert({_id: 0, text: "aBc"}));
- // The existing stream should not see the first insert (to the other collection), but
- // should see the second.
cst.assertNextChangesEqual(
- {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]});
+ {cursor: englishCaseInsensitiveStream, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a change stream with a non-default collation against a collection that has
+ // a non-simple default collation will use the collation specified on the operation.
+ (function() {
+ const caseInsensitiveCollection = assertDropAndRecreateCollection(
+ db, "change_stream_case_insensitive", {collation: caseInsensitive});
+
+ const englishCaseSensitiveStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id"}}
+ ],
+ aggregateOptions: {collation: {locale: "en_US"}},
+ collection: caseInsensitiveCollection
+ });
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
+
cst.assertNextChangesEqual(
- {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]});
-
- // Test that creating a collection without a collation does not invalidate any change
- // streams that were opened before the collection existed.
- (function() {
- let noCollationCollection = "change_stream_no_collation";
- assertDropCollection(db, noCollationCollection);
-
- const streamCreatedBeforeNoCollationCollection = cst.startWatchingChanges({
- pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
- collection: noCollationCollection
- });
-
- noCollationCollection = assertCreateCollection(db, noCollationCollection);
- assert.writeOK(noCollationCollection.insert({_id: 0}));
-
- cst.assertNextChangesEqual(
- {cursor: streamCreatedBeforeNoCollationCollection, expectedChanges: [{docId: 0}]});
- }());
-
- // Test that creating a collection and explicitly specifying the simple collation does not
- // invalidate any change streams that were opened before the collection existed.
- (function() {
- let simpleCollationCollection = "change_stream_simple_collation";
- assertDropCollection(db, simpleCollationCollection);
-
- const streamCreatedBeforeSimpleCollationCollection = cst.startWatchingChanges({
- pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
- collection: simpleCollationCollection
- });
-
- simpleCollationCollection = assertCreateCollection(
- db, simpleCollationCollection, {collation: {locale: "simple"}});
- assert.writeOK(simpleCollationCollection.insert({_id: 0}));
-
- cst.assertNextChangesEqual({
- cursor: streamCreatedBeforeSimpleCollationCollection,
- expectedChanges: [{docId: 0}]
- });
- }());
-
- // Test that creating a change stream with a non-default collation, then creating a
- // collection with the same collation will not invalidate the change stream.
- (function() {
- let frenchCollection = "change_stream_french_collation";
- assertDropCollection(db, frenchCollection);
-
- const frenchChangeStream = cst.startWatchingChanges({
- pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
- aggregateOptions: {collation: {locale: "fr"}},
- collection: frenchCollection
- });
-
- frenchCollection =
- assertCreateCollection(db, frenchCollection, {collation: {locale: "fr"}});
- assert.writeOK(frenchCollection.insert({_id: 0}));
-
- cst.assertNextChangesEqual({cursor: frenchChangeStream, expectedChanges: [{docId: 0}]});
- }());
-
- // Test that creating a change stream with a non-default collation, then creating a
- // collection with *a different* collation will not invalidate the change stream.
- (function() {
- let germanCollection = "change_stream_german_collation";
- assertDropCollection(db, germanCollection);
-
- const englishCaseInsensitiveStream = cst.startWatchingChanges({
- pipeline: [
- {$changeStream: {}},
- {$match: {"fullDocument.text": "abc"}},
- {$project: {docId: "$documentKey._id"}}
- ],
- aggregateOptions: {collation: caseInsensitive},
- collection: germanCollection
- });
-
- germanCollection =
- assertCreateCollection(db, germanCollection, {collation: {locale: "de"}});
- assert.writeOK(germanCollection.insert({_id: 0, text: "aBc"}));
-
- cst.assertNextChangesEqual(
- {cursor: englishCaseInsensitiveStream, expectedChanges: [{docId: 0}]});
- }());
-
- // Test that creating a change stream with a non-default collation against a collection
- // that has a non-simple default collation will use the collation specified on the
- // operation.
- (function() {
- const caseInsensitiveCollection = assertDropAndRecreateCollection(
- db, "change_stream_case_insensitive", {collation: caseInsensitive});
-
- const englishCaseSensitiveStream = cst.startWatchingChanges({
- pipeline: [
- {$changeStream: {}},
- {$match: {"fullDocument.text": "abc"}},
- {$project: {docId: "$documentKey._id"}}
- ],
- aggregateOptions: {collation: {locale: "en_US"}},
- collection: caseInsensitiveCollection
- });
-
- assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
- assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
-
- cst.assertNextChangesEqual(
- {cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]});
- }());
-
- // Test that collation is supported by the shell helper. Test that creating a change
- // stream with a non-default collation against a collection that has a simple default
- // collation will use the collation specified on the operation.
- (function() {
- const noCollationCollection =
- assertDropAndRecreateCollection(db, "change_stream_no_collation");
-
- const cursor = noCollationCollection.watch(
- [{$match: {"fullDocument.text": "abc"}}, {$project: {docId: "$documentKey._id"}}],
- {collation: caseInsensitive});
- assert(!cursor.hasNext());
- assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"}));
- assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"}));
- assert.soon(() => cursor.hasNext());
- assertChangeStreamEventEq(cursor.next(), {docId: 0});
- assert.soon(() => cursor.hasNext());
- assertChangeStreamEventEq(cursor.next(), {docId: 1});
- assert(!cursor.hasNext());
- }());
-
- // Test that resuming a change stream on a collection that has been dropped requires the
- // user to explicitly specify the collation. This is testing that we cannot resume if we
- // need to retrieve the collection metadata.
- (function() {
- const collName = "change_stream_case_insensitive";
- let caseInsensitiveCollection =
- assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
-
- let changeStream = caseInsensitiveCollection.watch(
- [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive});
-
- assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
-
- assert.soon(() => changeStream.hasNext());
- const next = changeStream.next();
- assert.docEq(next.documentKey, {_id: 0});
- const resumeToken = next._id;
-
- // Insert a second document to see after resuming.
- assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
-
- // Drop the collection to invalidate the stream.
- assertDropCollection(db, collName);
-
- // Test that a $changeStream is allowed to resume on the dropped collection if an
- // explicit collation is provided, even if it doesn't match the original collection
- // default collation.
- changeStream = caseInsensitiveCollection.watch(
- [{$match: {"fullDocument.text": "ABC"}}],
- {resumeAfter: resumeToken, collation: {locale: "simple"}});
-
- assert.soon(() => changeStream.hasNext());
- assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
-
- // Test that a pipeline without an explicit collation is not allowed to resume the
- // change stream after the collection has been dropped. Do not modify this command in
- // the passthrough suite(s) since whole-db and whole-cluster change streams are allowed
- // to resume without an explicit collation.
- assert.commandFailedWithCode(
- runCommandChangeStreamPassthroughAware(
- db,
- {
- aggregate: collName,
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- cursor: {},
- },
- true), // doNotModifyInPassthroughs
- ErrorCodes.InvalidResumeToken);
- }());
-
- // Test that the default collation of a new version of the collection is not applied when
- // resuming a change stream from before a collection drop.
- (function() {
- const collName = "change_stream_case_insensitive";
- let caseInsensitiveCollection =
- assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
-
- let changeStream = caseInsensitiveCollection.watch(
- [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive});
-
- assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
-
- assert.soon(() => changeStream.hasNext());
- const next = changeStream.next();
- assert.docEq(next.documentKey, {_id: 0});
- const resumeToken = next._id;
-
- // Insert a second document to see after resuming.
- assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
-
- // Recreate the collection with a different collation.
- caseInsensitiveCollection = assertDropAndRecreateCollection(
- db, caseInsensitiveCollection.getName(), {collation: {locale: "simple"}});
- assert.writeOK(caseInsensitiveCollection.insert({_id: "new collection", text: "abc"}));
-
- // Verify that the stream sees the insert before the drop and then is exhausted. We
- // won't see the invalidate because the pipeline has a $match stage after the
- // $changeStream.
- assert.soon(() => changeStream.hasNext());
- assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"});
- // Only single-collection streams will be exhausted from the drop. Use 'next()' instead
- // of 'isExhausted()' to force a getMore since the previous getMore may not include the
- // collection drop, which is more likely with sharded collections on slow machines.
- if (!isChangeStreamPassthrough()) {
- assert.throws(() => changeStream.next());
- }
-
- // Test that a pipeline with an explicit collation is allowed to resume from before the
- // collection is dropped and recreated.
- changeStream = caseInsensitiveCollection.watch(
- [{$match: {"fullDocument.text": "ABC"}}],
- {resumeAfter: resumeToken, collation: {locale: "fr"}});
-
- assert.soon(() => changeStream.hasNext());
- assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
- // Only single-collection streams will be exhausted from the drop. Use 'next()' instead
- // of 'isExhausted()' to force a getMore since the previous getMore may not include the
- // collection drop, which is more likely with sharded collections on slow machines.
- if (!isChangeStreamPassthrough()) {
- assert.throws(() => changeStream.next());
- }
-
- // Test that a pipeline without an explicit collation is not allowed to resume,
- // even though the collection has been recreated with the same default collation as it
- // had previously. Do not modify this command in the passthrough suite(s) since whole-db
- // and whole-cluster change streams are allowed to resume without an explicit collation.
- assert.commandFailedWithCode(
- runCommandChangeStreamPassthroughAware(
- db,
- {
- aggregate: collName,
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- cursor: {}
- },
- true), // doNotModifyInPassthroughs
- ErrorCodes.InvalidResumeToken);
- }());
-
- } finally {
- if (FixtureHelpers.isMongos(db)) {
- // TODO: SERVER-33944 Change streams on sharded collection with non-simple default
- // collation may be erroneously invalidated if a chunk migration occurs. When this bug
- // is fixed, chunk migrations should be allowed in this test, and we should remove this
- // call to startBalancer() as well as the earlier call to stopBalancer().
- sh.startBalancer();
+ {cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]});
+ }());
+
+ // Test that collation is supported by the shell helper. Test that creating a change stream with
+ // a non-default collation against a collection that has a simple default collation will use the
+ // collation specified on the operation.
+ (function() {
+ const noCollationCollection =
+ assertDropAndRecreateCollection(db, "change_stream_no_collation");
+
+ const cursor = noCollationCollection.watch(
+ [{$match: {"fullDocument.text": "abc"}}, {$project: {docId: "$documentKey._id"}}],
+ {collation: caseInsensitive});
+ assert(!cursor.hasNext());
+ assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"}));
+ assert.soon(() => cursor.hasNext());
+ assertChangeStreamEventEq(cursor.next(), {docId: 0});
+ assert.soon(() => cursor.hasNext());
+ assertChangeStreamEventEq(cursor.next(), {docId: 1});
+ assert(!cursor.hasNext());
+ }());
+
+ // Test that we can resume a change stream on a collection that has been dropped without
+ // requiring the user to explicitly specify the collation.
+ (function() {
+ const collName = "change_stream_case_insensitive";
+ let caseInsensitiveCollection =
+ assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
+
+ let changeStream = caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "abc"}}],
+ {collation: caseInsensitive});
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
+
+ assert.soon(() => changeStream.hasNext());
+ const next = changeStream.next();
+ assert.docEq(next.documentKey, {_id: 0});
+ const resumeToken = next._id;
+
+ // Insert a second document to see after resuming.
+ assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
+
+ // Drop the collection to invalidate the stream.
+ assertDropCollection(db, collName);
+
+ // Test that a $changeStream is allowed to resume on the dropped collection with an explicit
+ // collation, even if it doesn't match the original collection's default collation.
+ changeStream = caseInsensitiveCollection.watch(
+ [{$match: {"fullDocument.text": "ABC"}}],
+ {resumeAfter: resumeToken, collation: {locale: "simple"}});
+
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+
+ // Test that a pipeline without an explicit collation is allowed to resume the change stream
+ // after the collection has been dropped, and it will use the simple collation. Do not
+ // modify this in the passthrough suite(s) since only individual collections have the
+ // concept of a default collation.
+ const doNotModifyInPassthroughs = true;
+ const cmdRes = assert.commandWorked(runCommandChangeStreamPassthroughAware(
+ db,
+ {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {},
+ },
+ doNotModifyInPassthroughs));
+
+ changeStream = new DBCommandCursor(db, cmdRes);
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+ }());
+
+ // Test that the default collation of a new version of the collection is not applied when
+ // resuming a change stream from before a collection drop.
+ (function() {
+ const collName = "change_stream_case_insensitive";
+ let caseInsensitiveCollection =
+ assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
+
+ let changeStream = caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "abc"}}],
+ {collation: caseInsensitive});
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
+
+ assert.soon(() => changeStream.hasNext());
+ const next = changeStream.next();
+ assert.docEq(next.documentKey, {_id: 0});
+ const resumeToken = next._id;
+
+ // Insert a second document to see after resuming.
+ assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
+
+ // Recreate the collection with a different collation.
+ caseInsensitiveCollection = assertDropAndRecreateCollection(
+ db, caseInsensitiveCollection.getName(), {collation: {locale: "simple"}});
+ assert.writeOK(caseInsensitiveCollection.insert({_id: "new collection", text: "abc"}));
+
+ // Verify that the stream sees the insert before the drop and then is exhausted. We won't
+ // see the invalidate because the pipeline has a $match stage after the $changeStream.
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"});
+ // Only single-collection streams will be exhausted from the drop. Use 'next()' instead of
+ // 'isExhausted()' to force a getMore since the previous getMore may not include the
+ // collection drop, which is more likely with sharded collections on slow machines.
+ if (!isChangeStreamPassthrough()) {
+ assert.throws(() => changeStream.next());
}
- }
+
+ // Test that a pipeline with an explicit collation is allowed to resume from before the
+ // collection is dropped and recreated.
+ changeStream =
+ caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "ABC"}}],
+ {resumeAfter: resumeToken, collation: {locale: "fr"}});
+
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+ // Only single-collection streams will be exhausted from the drop. Use 'next()' instead of
+ // 'isExhausted()' to force a getMore since the previous getMore may not include the
+ // collection drop, which is more likely with sharded collections on slow machines.
+ if (!isChangeStreamPassthrough()) {
+ assert.throws(() => changeStream.next());
+ }
+
+ // Test that a pipeline without an explicit collation is allowed to resume, even though the
+ // collection has been recreated with the same default collation as it had previously. Do
+ // not modify this command in the passthrough suite(s) since only individual collections
+ // have the concept of a default collation.
+ const doNotModifyInPassthroughs = true;
+ const cmdRes = assert.commandWorked(runCommandChangeStreamPassthroughAware(
+ db,
+ {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {}
+ },
+ doNotModifyInPassthroughs));
+
+ changeStream = new DBCommandCursor(db, cmdRes);
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+ }());
})();
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js
index b4073f44bef..b65bd9aaec2 100644
--- a/jstests/change_streams/metadata_notifications.js
+++ b/jstests/change_streams/metadata_notifications.js
@@ -77,22 +77,20 @@
const resumeTokenDrop = changes[3]._id;
const resumeTokenInvalidate = changes[4]._id;
- // It should not be possible to resume a change stream after a collection drop without an
- // explicit collation, even if the invalidate has not been received.
- assert.commandFailedWithCode(db.runCommand({
+ // Verify that we can resume a stream after a collection drop without an explicit collation.
+ assert.commandWorked(db.runCommand({
aggregate: coll.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
+ }));
// Recreate the collection.
coll = assertCreateCollection(db, collName);
assert.writeOK(coll.insert({_id: "after recreate"}));
- // Test resuming the change stream from the collection drop using 'resumeAfter'.
- // If running in a sharded passthrough suite, resuming from the drop will first return the drop
- // from the other shard before returning an invalidate.
+ // Test resuming the change stream from the collection drop using 'resumeAfter'. If running in a
+ // sharded passthrough suite, resuming from the drop will first return the drop from the other
+ // shard before returning an invalidate.
cursor = cst.startWatchingChanges({
collection: coll,
pipeline: [{$changeStream: {resumeAfter: resumeTokenDrop}}],
diff --git a/jstests/change_streams/resume_from_high_water_mark_token.js b/jstests/change_streams/resume_from_high_water_mark_token.js
index a58cefb8ea5..bc63fdb33d9 100644
--- a/jstests/change_streams/resume_from_high_water_mark_token.js
+++ b/jstests/change_streams/resume_from_high_water_mark_token.js
@@ -37,23 +37,33 @@
for (let resumeType of["startAfter", "resumeAfter"]) {
cmdResBeforeCollExists = assert.commandWorked(runExactCommand(db, {
aggregate: collName,
- pipeline: [{$changeStream: {[resumeType]: pbrtBeforeCollExists}}],
+ pipeline: [
+ {$changeStream: {[resumeType]: pbrtBeforeCollExists}},
+ {
+ $match: {
+ $or: [
+ {"fullDocument._id": "INSERT_ONE"},
+ {"fullDocument._id": "INSERT_TWO"}
+ ]
+ }
+ }
+ ],
cursor: {}
}));
}
csCursor = new DBCommandCursor(db, cmdResBeforeCollExists);
- // But if the collection is created with a non-simple collation, the resumed stream invalidates.
+ // If the collection is then created with a case-insensitive collation, the resumed stream
+ // continues to use the simple collation. We see 'INSERT_TWO' but not 'insert_one'.
const testCollationCollection =
assertCreateCollection(db, collName, {collation: {locale: "en_US", strength: 2}});
assert.commandWorked(testCollationCollection.insert({_id: "insert_one"}));
assert.commandWorked(testCollationCollection.insert({_id: "INSERT_TWO"}));
assert.soon(() => csCursor.hasNext());
- const invalidate = csCursor.next();
- assert.eq(invalidate.operationType, "invalidate"); // We don't see either insert.
+ assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_TWO"});
csCursor.close();
- // We can resume from the pre-creation high water mark if we specify an explicit collation...
+ // We can resume from the pre-creation high water mark if we do not specify a collation...
let cmdResResumeFromBeforeCollCreated = assert.commandWorked(runExactCommand(db, {
aggregate: collName,
pipeline: [
@@ -63,14 +73,35 @@
{$or: [{"fullDocument._id": "INSERT_ONE"}, {"fullDocument._id": "INSERT_TWO"}]}
}
],
- collation: {locale: "simple"},
cursor: {}
}));
- // This time the stream does not invalidate. We override the default case-insensitive collation
- // with the explicit simple collation we specified, so we match INSERT_TWO but not INSERT_ONE.
+ // ... but we will not inherit the collection's case-insensitive collation, instead defaulting
+ // to the simple collation. We will therefore match 'INSERT_TWO' but not 'insert_one'.
+ csCursor = new DBCommandCursor(db, cmdResResumeFromBeforeCollCreated);
+ assert.soon(() => csCursor.hasNext());
+ assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_TWO"});
+ csCursor.close();
+
+ // If we do specify a non-simple collation, it will be adopted by the pipeline.
+ cmdResResumeFromBeforeCollCreated = assert.commandWorked(runExactCommand(db, {
+ aggregate: collName,
+ pipeline: [
+ {$changeStream: {resumeAfter: pbrtBeforeCollExists}},
+ {
+ $match:
+ {$or: [{"fullDocument._id": "INSERT_ONE"}, {"fullDocument._id": "INSERT_TWO"}]}
+ }
+ ],
+ collation: {locale: "en_US", strength: 2},
+ cursor: {}
+ }));
+
+ // Now we match both 'insert_one' and 'INSERT_TWO'.
csCursor = new DBCommandCursor(db, cmdResResumeFromBeforeCollCreated);
assert.soon(() => csCursor.hasNext());
+ assert.docEq(csCursor.next().fullDocument, {_id: "insert_one"});
+ assert.soon(() => csCursor.hasNext());
assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_TWO"});
csCursor.close();
@@ -96,13 +127,21 @@
aggregate: collName,
pipeline: [
{$changeStream: {resumeAfter: hwmFromCollWithCollation}},
- {$match: {"fullDocument._id": "insert_four"}}
+ {
+ $match: {
+ $or: [
+ {"fullDocument._id": "INSERT_THREE"},
+ {"fullDocument._id": "INSERT_FOUR"}
+ ]
+ }
+ }
],
cursor: {}
}));
csCursor = new DBCommandCursor(db, cmdResResumeWithCollation);
- // ... and we inherit the collection's case-insensitive collation, matching {_id:"insert_four"}.
+ // ... but we do not inherit the collection's case-insensitive collation, matching 'INSERT_FOUR'
+ // but not the preceding 'insert_three'.
assert.soon(() => csCursor.hasNext());
assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_FOUR"});
csCursor.close();
@@ -194,23 +233,21 @@
hwmPostCreation = csCursor.getResumeToken();
csCursor.close();
- // Because this HWM has a UUID, we cannot resume from the token if the collection is dropped...
+ // We can resume from the token if the collection is dropped...
assertDropCollection(db, collName);
- assert.commandFailedWithCode(runExactCommand(db, {
- aggregate: collName,
- pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
- cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
+ assert.commandWorked(runExactCommand(db, {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
+ cursor: {}
+ }));
// ... or if the collection is recreated with a different UUID...
assertCreateCollection(db, collName);
- assert.commandFailedWithCode(runExactCommand(db, {
- aggregate: collName,
- pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
- cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
- // ... unless we specify an explicit collation.
+ assert.commandWorked(runExactCommand(db, {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
+ cursor: {}
+ }));
+ // ... or if we specify an explicit collation.
assert.commandWorked(runExactCommand(db, {
aggregate: collName,
pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
@@ -218,13 +255,13 @@
cursor: {}
}));
- // But even after the collection is recreated, we can still resume from the pre-creation HWM...
+ // Even after the collection is recreated, we can still resume from the pre-creation HWM...
cmdResResumeFromBeforeCollCreated = assert.commandWorked(runExactCommand(db, {
aggregate: collName,
pipeline: [{$changeStream: {resumeAfter: pbrtBeforeCollExists}}],
cursor: {}
}));
- // ...and can still see all the events from the collection's original incarnation...
+ // ...and we can still see all the events from the collection's original incarnation...
csCursor = new DBCommandCursor(db, cmdResResumeFromBeforeCollCreated);
docCount = 0;
assert.soon(() => {