summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2019-03-04 17:23:04 +0000
committerBernard Gorman <bernard.gorman@gmail.com>2019-03-11 20:02:07 -0400
commit4054c2b07cb658a44fc51d145a1688483e18d666 (patch)
tree40340abd5c396a27954ca43eafc979ecdb951c44
parent371197e4bab715a83272a4472e118ee5c5cbbf7c (diff)
downloadmongo-4054c2b07cb658a44fc51d145a1688483e18d666.tar.gz
SERVER-39302 Change streams should always use the simple collation when no explicit collation is provided
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--jstests/change_streams/collation.js657
-rw-r--r--jstests/change_streams/metadata_notifications.js14
-rw-r--r--jstests/change_streams/resume_from_high_water_mark_token.js89
-rw-r--r--jstests/noPassthrough/change_streams_collation_chunk_migration.js64
-rw-r--r--jstests/noPassthrough/change_streams_update_lookup_collation.js3
-rw-r--r--jstests/sharding/change_stream_metadata_notifications.js16
-rw-r--r--jstests/sharding/change_streams.js8
-rw-r--r--src/mongo/db/commands/run_aggregate.cpp66
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp65
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h36
-rw-r--r--src/mongo/db/pipeline/lite_parsed_document_source.h8
-rw-r--r--src/mongo/db/pipeline/lite_parsed_pipeline.h12
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp8
14 files changed, 511 insertions, 537 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 35a61e63855..e02ff3d704b 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -31,6 +31,8 @@ selector:
# Enable when 4.2 becomes last-stable.
- jstests/sharding/aggregation_internal_parameters.js
- jstests/sharding/agg_error_reports_shard_host_and_port.js
+ - jstests/sharding/change_stream_metadata_notifications.js
+ - jstests/sharding/change_streams.js
- jstests/sharding/collation_lookup.js
- jstests/sharding/collation_targeting.js
- jstests/sharding/collation_targeting_inherited.js
diff --git a/jstests/change_streams/collation.js b/jstests/change_streams/collation.js
index 77d345dc0b1..e99f6064b60 100644
--- a/jstests/change_streams/collation.js
+++ b/jstests/change_streams/collation.js
@@ -1,5 +1,6 @@
/**
- * Tests that a change stream can use a user-specified, or collection-default collation.
+ * Tests that a change stream can take a user-specified collation, does not inherit the collection's
+ * default collation, and uses the simple collation if none is provided.
*/
(function() {
"use strict";
@@ -7,353 +8,339 @@
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest' and
// 'runCommandChangeStreamPassthroughAware'.
- load("jstests/libs/fixture_helpers.js"); // For 'isMongos'.
-
- if (FixtureHelpers.isMongos(db)) {
- // TODO: SERVER-33944 Change streams on sharded collection with non-simple default
- // collation may be erroneously invalidated if a chunk migration occurs. When this bug is
- // fixed, chunk migrations should be allowed in this test, and we should remove this call
- // to stopBalancer().
- sh.stopBalancer();
- }
-
- try {
- let cst = new ChangeStreamTest(db);
-
- const caseInsensitive = {locale: "en_US", strength: 2};
-
- let caseInsensitiveCollection = "change_stream_case_insensitive";
- assertDropCollection(db, caseInsensitiveCollection);
-
- // Test that you can open a change stream before the collection exists, and it will use the
- // simple collation. Tag this stream as 'doNotModifyInPassthroughs', since whole-db and
- // cluster-wide streams do not adhere to the same collation rules that we will be testing
- // with this cursor.
- const simpleCollationStream = cst.startWatchingChanges({
- pipeline: [{$changeStream: {}}],
- collection: caseInsensitiveCollection,
- doNotModifyInPassthroughs: true
- });
- // Create the collection with a non-default collation - this should invalidate the stream we
- // opened before it existed.
- caseInsensitiveCollection =
- assertCreateCollection(db, caseInsensitiveCollection, {collation: caseInsensitive});
- cst.assertNextChangesEqual({
- cursor: simpleCollationStream,
- expectedChanges: [{operationType: "invalidate"}],
- expectInvalidate: true
- });
+ let cst = new ChangeStreamTest(db);
+
+ const caseInsensitive = {locale: "en_US", strength: 2};
- const caseInsensitivePipeline = [
+ let caseInsensitiveCollection = "change_stream_case_insensitive";
+ assertDropCollection(db, caseInsensitiveCollection);
+
+ // Test that you can open a change stream before the collection exists, and it will use the
+ // simple collation. Tag this stream as 'doNotModifyInPassthroughs', since only individual
+ // collections have the concept of a default collation.
+ const simpleCollationStream = cst.startWatchingChanges({
+ pipeline: [
{$changeStream: {}},
- {$match: {"fullDocument.text": "abc"}},
- {$project: {docId: "$documentKey._id"}}
- ];
-
- // Test that $changeStream will implicitly adopt the default collation of the collection on
- // which it is run. Tag this stream as 'doNotModifyInPassthroughs', since whole-db and
- // cluster-wide streams do not have default collations.
- const implicitCaseInsensitiveStream = cst.startWatchingChanges({
- pipeline: caseInsensitivePipeline,
- collection: caseInsensitiveCollection,
- doNotModifyInPassthroughs: true
- });
- // Test that a collation can be explicitly specified for the $changeStream. This does not
- // need to be tagged 'doNotModifyInPassthroughs', since whole-db and cluster-wide
- // changeStreams will use an explicit collation if present.
- let explicitCaseInsensitiveStream = cst.startWatchingChanges({
- pipeline: caseInsensitivePipeline,
- collection: caseInsensitiveCollection,
- aggregateOptions: {collation: caseInsensitive}
+ {
+ $match:
+ {$or: [{"fullDocument._id": "INSERT_ONE"}, {"fullDocument._id": "INSERT_TWO"}]}
+ },
+ {$project: {docId: "$fullDocument._id"}}
+ ],
+ collection: caseInsensitiveCollection,
+ doNotModifyInPassthroughs: true
+ });
+
+ // Create the collection with a non-default collation. The stream should continue to use the
+ // simple collation.
+ caseInsensitiveCollection =
+ assertCreateCollection(db, caseInsensitiveCollection, {collation: caseInsensitive});
+ assert.commandWorked(
+ caseInsensitiveCollection.insert([{_id: "insert_one"}, {_id: "INSERT_TWO"}]));
+ cst.assertNextChangesEqual(
+ {cursor: simpleCollationStream, expectedChanges: [{docId: "INSERT_TWO"}]});
+
+ const caseInsensitivePipeline = [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id"}}
+ ];
+
+ // Test that $changeStream will not implicitly adopt the default collation of the collection on
+ // which it is run. Tag this stream as 'doNotModifyInPassthroughs'; whole-db and cluster-wide
+ // streams do not have default collations.
+ const didNotInheritCollationStream = cst.startWatchingChanges({
+ pipeline: caseInsensitivePipeline,
+ collection: caseInsensitiveCollection,
+ doNotModifyInPassthroughs: true
+ });
+ // Test that a collation can be explicitly specified for the $changeStream. This does not need
+ // to be tagged 'doNotModifyInPassthroughs', since whole-db and cluster-wide changeStreams will
+ // use an explicit collation if present.
+ let explicitCaseInsensitiveStream = cst.startWatchingChanges({
+ pipeline: caseInsensitivePipeline,
+ collection: caseInsensitiveCollection,
+ aggregateOptions: {collation: caseInsensitive}
+ });
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
+
+ // 'didNotInheritCollationStream' should not have inherited the collection's case-insensitive
+ // default collation, and should only see the second insert. 'explicitCaseInsensitiveStream'
+ // should see both inserts.
+ cst.assertNextChangesEqual(
+ {cursor: didNotInheritCollationStream, expectedChanges: [{docId: 1}]});
+ cst.assertNextChangesEqual(
+ {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]});
+
+ // Test that the collation does not apply to the scan over the oplog.
+ const similarNameCollection = assertDropAndRecreateCollection(
+ db, "cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe", {collation: {locale: "en_US"}});
+
+ // We must recreate the explicitCaseInsensitiveStream and set 'doNotModifyInPassthroughs'. Whole
+ // db and cluster-wide streams use the simple collation while scanning the oplog, but they don't
+ // filter the oplog by collection name. The subsequent $match stage which we inject into the
+ // pipeline to filter for a specific collection will obey the pipeline's case-insensitive
+ // collation, meaning that 'cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe' will match
+ // 'change_stream_case_insensitive'.
+ explicitCaseInsensitiveStream = cst.startWatchingChanges({
+ pipeline: caseInsensitivePipeline,
+ collection: caseInsensitiveCollection,
+ aggregateOptions: {collation: caseInsensitive},
+ doNotModifyInPassthroughs: true
+ });
+
+ assert.writeOK(similarNameCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 2, text: "ABC"}));
+
+ // The case-insensitive stream should not see the first insert (to the other collection), only
+ // the second. We do not expect to see the insert in 'didNotInheritCollationStream'.
+ cst.assertNextChangesEqual(
+ {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]});
+
+ // Test that creating a collection without a collation does not invalidate any change streams
+ // that were opened before the collection existed.
+ (function() {
+ let noCollationCollection = "change_stream_no_collation";
+ assertDropCollection(db, noCollationCollection);
+
+ const streamCreatedBeforeNoCollationCollection = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
+ collection: noCollationCollection
});
- assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
- assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
+ noCollationCollection = assertCreateCollection(db, noCollationCollection);
+ assert.writeOK(noCollationCollection.insert({_id: 0}));
cst.assertNextChangesEqual(
- {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]});
+ {cursor: streamCreatedBeforeNoCollationCollection, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a collection and explicitly specifying the simple collation does not
+ // invalidate any change streams that were opened before the collection existed.
+ (function() {
+ let simpleCollationCollection = "change_stream_simple_collation";
+ assertDropCollection(db, simpleCollationCollection);
+
+ const streamCreatedBeforeSimpleCollationCollection = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
+ collection: simpleCollationCollection
+ });
+
+ simpleCollationCollection =
+ assertCreateCollection(db, simpleCollationCollection, {collation: {locale: "simple"}});
+ assert.writeOK(simpleCollationCollection.insert({_id: 0}));
+
cst.assertNextChangesEqual(
- {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 0}, {docId: 1}]});
-
- // Test that the collation does not apply to the scan over the oplog.
- const similarNameCollection = assertDropAndRecreateCollection(
- db, "cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe", {collation: {locale: "en_US"}});
-
- // We must recreate the explicitCaseInsensitiveStream and set 'doNotModifyInPassthroughs'.
- // Whole-db and cluster-wide streams use the simple collation while scanning the oplog, but
- // do not filter the oplog by collection name. The subsequent $match stage which we inject
- // into the pipeline to filter for a specific collection will obey the pipeline's
- // case-insensitive collation, meaning that 'cHaNgE_sTrEaM_cAsE_iNsEnSiTiVe' will match
- // 'change_stream_case_insensitive'.
- explicitCaseInsensitiveStream = cst.startWatchingChanges({
- pipeline: caseInsensitivePipeline,
- collection: caseInsensitiveCollection,
+ {cursor: streamCreatedBeforeSimpleCollationCollection, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a change stream with a non-default collation, then creating a collection
+ // with the same collation will not invalidate the change stream.
+ (function() {
+ let frenchCollection = "change_stream_french_collation";
+ assertDropCollection(db, frenchCollection);
+
+ const frenchChangeStream = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
+ aggregateOptions: {collation: {locale: "fr"}},
+ collection: frenchCollection
+ });
+
+ frenchCollection =
+ assertCreateCollection(db, frenchCollection, {collation: {locale: "fr"}});
+ assert.writeOK(frenchCollection.insert({_id: 0}));
+
+ cst.assertNextChangesEqual({cursor: frenchChangeStream, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a change stream with a non-default collation, then creating a collection
+ // with *a different* collation will not invalidate the change stream.
+ (function() {
+ let germanCollection = "change_stream_german_collation";
+ assertDropCollection(db, germanCollection);
+
+ const englishCaseInsensitiveStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id"}}
+ ],
aggregateOptions: {collation: caseInsensitive},
- doNotModifyInPassthroughs: true
+ collection: germanCollection
});
- assert.writeOK(similarNameCollection.insert({_id: 0, text: "aBc"}));
- assert.writeOK(caseInsensitiveCollection.insert({_id: 2, text: "ABC"}));
+ germanCollection =
+ assertCreateCollection(db, germanCollection, {collation: {locale: "de"}});
+ assert.writeOK(germanCollection.insert({_id: 0, text: "aBc"}));
- // The existing stream should not see the first insert (to the other collection), but
- // should see the second.
cst.assertNextChangesEqual(
- {cursor: implicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]});
+ {cursor: englishCaseInsensitiveStream, expectedChanges: [{docId: 0}]});
+ }());
+
+ // Test that creating a change stream with a non-default collation against a collection that has
+ // a non-simple default collation will use the collation specified on the operation.
+ (function() {
+ const caseInsensitiveCollection = assertDropAndRecreateCollection(
+ db, "change_stream_case_insensitive", {collation: caseInsensitive});
+
+ const englishCaseSensitiveStream = cst.startWatchingChanges({
+ pipeline: [
+ {$changeStream: {}},
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id"}}
+ ],
+ aggregateOptions: {collation: {locale: "en_US"}},
+ collection: caseInsensitiveCollection
+ });
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
+
cst.assertNextChangesEqual(
- {cursor: explicitCaseInsensitiveStream, expectedChanges: [{docId: 2}]});
-
- // Test that creating a collection without a collation does not invalidate any change
- // streams that were opened before the collection existed.
- (function() {
- let noCollationCollection = "change_stream_no_collation";
- assertDropCollection(db, noCollationCollection);
-
- const streamCreatedBeforeNoCollationCollection = cst.startWatchingChanges({
- pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
- collection: noCollationCollection
- });
-
- noCollationCollection = assertCreateCollection(db, noCollationCollection);
- assert.writeOK(noCollationCollection.insert({_id: 0}));
-
- cst.assertNextChangesEqual(
- {cursor: streamCreatedBeforeNoCollationCollection, expectedChanges: [{docId: 0}]});
- }());
-
- // Test that creating a collection and explicitly specifying the simple collation does not
- // invalidate any change streams that were opened before the collection existed.
- (function() {
- let simpleCollationCollection = "change_stream_simple_collation";
- assertDropCollection(db, simpleCollationCollection);
-
- const streamCreatedBeforeSimpleCollationCollection = cst.startWatchingChanges({
- pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
- collection: simpleCollationCollection
- });
-
- simpleCollationCollection = assertCreateCollection(
- db, simpleCollationCollection, {collation: {locale: "simple"}});
- assert.writeOK(simpleCollationCollection.insert({_id: 0}));
-
- cst.assertNextChangesEqual({
- cursor: streamCreatedBeforeSimpleCollationCollection,
- expectedChanges: [{docId: 0}]
- });
- }());
-
- // Test that creating a change stream with a non-default collation, then creating a
- // collection with the same collation will not invalidate the change stream.
- (function() {
- let frenchCollection = "change_stream_french_collation";
- assertDropCollection(db, frenchCollection);
-
- const frenchChangeStream = cst.startWatchingChanges({
- pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey._id"}}],
- aggregateOptions: {collation: {locale: "fr"}},
- collection: frenchCollection
- });
-
- frenchCollection =
- assertCreateCollection(db, frenchCollection, {collation: {locale: "fr"}});
- assert.writeOK(frenchCollection.insert({_id: 0}));
-
- cst.assertNextChangesEqual({cursor: frenchChangeStream, expectedChanges: [{docId: 0}]});
- }());
-
- // Test that creating a change stream with a non-default collation, then creating a
- // collection with *a different* collation will not invalidate the change stream.
- (function() {
- let germanCollection = "change_stream_german_collation";
- assertDropCollection(db, germanCollection);
-
- const englishCaseInsensitiveStream = cst.startWatchingChanges({
- pipeline: [
- {$changeStream: {}},
- {$match: {"fullDocument.text": "abc"}},
- {$project: {docId: "$documentKey._id"}}
- ],
- aggregateOptions: {collation: caseInsensitive},
- collection: germanCollection
- });
-
- germanCollection =
- assertCreateCollection(db, germanCollection, {collation: {locale: "de"}});
- assert.writeOK(germanCollection.insert({_id: 0, text: "aBc"}));
-
- cst.assertNextChangesEqual(
- {cursor: englishCaseInsensitiveStream, expectedChanges: [{docId: 0}]});
- }());
-
- // Test that creating a change stream with a non-default collation against a collection
- // that has a non-simple default collation will use the collation specified on the
- // operation.
- (function() {
- const caseInsensitiveCollection = assertDropAndRecreateCollection(
- db, "change_stream_case_insensitive", {collation: caseInsensitive});
-
- const englishCaseSensitiveStream = cst.startWatchingChanges({
- pipeline: [
- {$changeStream: {}},
- {$match: {"fullDocument.text": "abc"}},
- {$project: {docId: "$documentKey._id"}}
- ],
- aggregateOptions: {collation: {locale: "en_US"}},
- collection: caseInsensitiveCollection
- });
-
- assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "aBc"}));
- assert.writeOK(caseInsensitiveCollection.insert({_id: 1, text: "abc"}));
-
- cst.assertNextChangesEqual(
- {cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]});
- }());
-
- // Test that collation is supported by the shell helper. Test that creating a change
- // stream with a non-default collation against a collection that has a simple default
- // collation will use the collation specified on the operation.
- (function() {
- const noCollationCollection =
- assertDropAndRecreateCollection(db, "change_stream_no_collation");
-
- const cursor = noCollationCollection.watch(
- [{$match: {"fullDocument.text": "abc"}}, {$project: {docId: "$documentKey._id"}}],
- {collation: caseInsensitive});
- assert(!cursor.hasNext());
- assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"}));
- assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"}));
- assert.soon(() => cursor.hasNext());
- assertChangeStreamEventEq(cursor.next(), {docId: 0});
- assert.soon(() => cursor.hasNext());
- assertChangeStreamEventEq(cursor.next(), {docId: 1});
- assert(!cursor.hasNext());
- }());
-
- // Test that resuming a change stream on a collection that has been dropped requires the
- // user to explicitly specify the collation. This is testing that we cannot resume if we
- // need to retrieve the collection metadata.
- (function() {
- const collName = "change_stream_case_insensitive";
- let caseInsensitiveCollection =
- assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
-
- let changeStream = caseInsensitiveCollection.watch(
- [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive});
-
- assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
-
- assert.soon(() => changeStream.hasNext());
- const next = changeStream.next();
- assert.docEq(next.documentKey, {_id: 0});
- const resumeToken = next._id;
-
- // Insert a second document to see after resuming.
- assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
-
- // Drop the collection to invalidate the stream.
- assertDropCollection(db, collName);
-
- // Test that a $changeStream is allowed to resume on the dropped collection if an
- // explicit collation is provided, even if it doesn't match the original collection
- // default collation.
- changeStream = caseInsensitiveCollection.watch(
- [{$match: {"fullDocument.text": "ABC"}}],
- {resumeAfter: resumeToken, collation: {locale: "simple"}});
-
- assert.soon(() => changeStream.hasNext());
- assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
-
- // Test that a pipeline without an explicit collation is not allowed to resume the
- // change stream after the collection has been dropped. Do not modify this command in
- // the passthrough suite(s) since whole-db and whole-cluster change streams are allowed
- // to resume without an explicit collation.
- assert.commandFailedWithCode(
- runCommandChangeStreamPassthroughAware(
- db,
- {
- aggregate: collName,
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- cursor: {},
- },
- true), // doNotModifyInPassthroughs
- ErrorCodes.InvalidResumeToken);
- }());
-
- // Test that the default collation of a new version of the collection is not applied when
- // resuming a change stream from before a collection drop.
- (function() {
- const collName = "change_stream_case_insensitive";
- let caseInsensitiveCollection =
- assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
-
- let changeStream = caseInsensitiveCollection.watch(
- [{$match: {"fullDocument.text": "abc"}}], {collation: caseInsensitive});
-
- assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
-
- assert.soon(() => changeStream.hasNext());
- const next = changeStream.next();
- assert.docEq(next.documentKey, {_id: 0});
- const resumeToken = next._id;
-
- // Insert a second document to see after resuming.
- assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
-
- // Recreate the collection with a different collation.
- caseInsensitiveCollection = assertDropAndRecreateCollection(
- db, caseInsensitiveCollection.getName(), {collation: {locale: "simple"}});
- assert.writeOK(caseInsensitiveCollection.insert({_id: "new collection", text: "abc"}));
-
- // Verify that the stream sees the insert before the drop and then is exhausted. We
- // won't see the invalidate because the pipeline has a $match stage after the
- // $changeStream.
- assert.soon(() => changeStream.hasNext());
- assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"});
- // Only single-collection streams will be exhausted from the drop. Use 'next()' instead
- // of 'isExhausted()' to force a getMore since the previous getMore may not include the
- // collection drop, which is more likely with sharded collections on slow machines.
- if (!isChangeStreamPassthrough()) {
- assert.throws(() => changeStream.next());
- }
-
- // Test that a pipeline with an explicit collation is allowed to resume from before the
- // collection is dropped and recreated.
- changeStream = caseInsensitiveCollection.watch(
- [{$match: {"fullDocument.text": "ABC"}}],
- {resumeAfter: resumeToken, collation: {locale: "fr"}});
-
- assert.soon(() => changeStream.hasNext());
- assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
- // Only single-collection streams will be exhausted from the drop. Use 'next()' instead
- // of 'isExhausted()' to force a getMore since the previous getMore may not include the
- // collection drop, which is more likely with sharded collections on slow machines.
- if (!isChangeStreamPassthrough()) {
- assert.throws(() => changeStream.next());
- }
-
- // Test that a pipeline without an explicit collation is not allowed to resume,
- // even though the collection has been recreated with the same default collation as it
- // had previously. Do not modify this command in the passthrough suite(s) since whole-db
- // and whole-cluster change streams are allowed to resume without an explicit collation.
- assert.commandFailedWithCode(
- runCommandChangeStreamPassthroughAware(
- db,
- {
- aggregate: collName,
- pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
- cursor: {}
- },
- true), // doNotModifyInPassthroughs
- ErrorCodes.InvalidResumeToken);
- }());
-
- } finally {
- if (FixtureHelpers.isMongos(db)) {
- // TODO: SERVER-33944 Change streams on sharded collection with non-simple default
- // collation may be erroneously invalidated if a chunk migration occurs. When this bug
- // is fixed, chunk migrations should be allowed in this test, and we should remove this
- // call to startBalancer() as well as the earlier call to stopBalancer().
- sh.startBalancer();
+ {cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]});
+ }());
+
+ // Test that collation is supported by the shell helper. Test that creating a change stream with
+ // a non-default collation against a collection that has a simple default collation will use the
+ // collation specified on the operation.
+ (function() {
+ const noCollationCollection =
+ assertDropAndRecreateCollection(db, "change_stream_no_collation");
+
+ const cursor = noCollationCollection.watch(
+ [{$match: {"fullDocument.text": "abc"}}, {$project: {docId: "$documentKey._id"}}],
+ {collation: caseInsensitive});
+ assert(!cursor.hasNext());
+ assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"}));
+ assert.soon(() => cursor.hasNext());
+ assertChangeStreamEventEq(cursor.next(), {docId: 0});
+ assert.soon(() => cursor.hasNext());
+ assertChangeStreamEventEq(cursor.next(), {docId: 1});
+ assert(!cursor.hasNext());
+ }());
+
+ // Test that we can resume a change stream on a collection that has been dropped without
+ // requiring the user to explicitly specify the collation.
+ (function() {
+ const collName = "change_stream_case_insensitive";
+ let caseInsensitiveCollection =
+ assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
+
+ let changeStream = caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "abc"}}],
+ {collation: caseInsensitive});
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
+
+ assert.soon(() => changeStream.hasNext());
+ const next = changeStream.next();
+ assert.docEq(next.documentKey, {_id: 0});
+ const resumeToken = next._id;
+
+ // Insert a second document to see after resuming.
+ assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
+
+ // Drop the collection to invalidate the stream.
+ assertDropCollection(db, collName);
+
+ // Test that a $changeStream is allowed to resume on the dropped collection with an explicit
+ // collation, even if it doesn't match the original collection's default collation.
+ changeStream = caseInsensitiveCollection.watch(
+ [{$match: {"fullDocument.text": "ABC"}}],
+ {resumeAfter: resumeToken, collation: {locale: "simple"}});
+
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+
+ // Test that a pipeline without an explicit collation is allowed to resume the change stream
+ // after the collection has been dropped, and it will use the simple collation. Do not
+ // modify this in the passthrough suite(s) since only individual collections have the
+ // concept of a default collation.
+ const doNotModifyInPassthroughs = true;
+ const cmdRes = assert.commandWorked(runCommandChangeStreamPassthroughAware(
+ db,
+ {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {},
+ },
+ doNotModifyInPassthroughs));
+
+ changeStream = new DBCommandCursor(db, cmdRes);
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+ }());
+
+ // Test that the default collation of a new version of the collection is not applied when
+ // resuming a change stream from before a collection drop.
+ (function() {
+ const collName = "change_stream_case_insensitive";
+ let caseInsensitiveCollection =
+ assertDropAndRecreateCollection(db, collName, {collation: caseInsensitive});
+
+ let changeStream = caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "abc"}}],
+ {collation: caseInsensitive});
+
+ assert.writeOK(caseInsensitiveCollection.insert({_id: 0, text: "abc"}));
+
+ assert.soon(() => changeStream.hasNext());
+ const next = changeStream.next();
+ assert.docEq(next.documentKey, {_id: 0});
+ const resumeToken = next._id;
+
+ // Insert a second document to see after resuming.
+ assert.writeOK(caseInsensitiveCollection.insert({_id: "dropped_coll", text: "ABC"}));
+
+ // Recreate the collection with a different collation.
+ caseInsensitiveCollection = assertDropAndRecreateCollection(
+ db, caseInsensitiveCollection.getName(), {collation: {locale: "simple"}});
+ assert.writeOK(caseInsensitiveCollection.insert({_id: "new collection", text: "abc"}));
+
+ // Verify that the stream sees the insert before the drop and then is exhausted. We won't
+ // see the invalidate because the pipeline has a $match stage after the $changeStream.
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().fullDocument, {_id: "dropped_coll", text: "ABC"});
+ // Only single-collection streams will be exhausted from the drop. Use 'next()' instead of
+ // 'isExhausted()' to force a getMore since the previous getMore may not include the
+ // collection drop, which is more likely with sharded collections on slow machines.
+ if (!isChangeStreamPassthrough()) {
+ assert.throws(() => changeStream.next());
}
- }
+
+ // Test that a pipeline with an explicit collation is allowed to resume from before the
+ // collection is dropped and recreated.
+ changeStream =
+ caseInsensitiveCollection.watch([{$match: {"fullDocument.text": "ABC"}}],
+ {resumeAfter: resumeToken, collation: {locale: "fr"}});
+
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+ // Only single-collection streams will be exhausted from the drop. Use 'next()' instead of
+ // 'isExhausted()' to force a getMore since the previous getMore may not include the
+ // collection drop, which is more likely with sharded collections on slow machines.
+ if (!isChangeStreamPassthrough()) {
+ assert.throws(() => changeStream.next());
+ }
+
+ // Test that a pipeline without an explicit collation is allowed to resume, even though the
+ // collection has been recreated with the same default collation as it had previously. Do
+ // not modify this command in the passthrough suite(s) since only individual collections
+ // have the concept of a default collation.
+ const doNotModifyInPassthroughs = true;
+ const cmdRes = assert.commandWorked(runCommandChangeStreamPassthroughAware(
+ db,
+ {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
+ cursor: {}
+ },
+ doNotModifyInPassthroughs));
+
+ changeStream = new DBCommandCursor(db, cmdRes);
+ assert.soon(() => changeStream.hasNext());
+ assert.docEq(changeStream.next().documentKey, {_id: "dropped_coll"});
+ }());
})();
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js
index b4073f44bef..b65bd9aaec2 100644
--- a/jstests/change_streams/metadata_notifications.js
+++ b/jstests/change_streams/metadata_notifications.js
@@ -77,22 +77,20 @@
const resumeTokenDrop = changes[3]._id;
const resumeTokenInvalidate = changes[4]._id;
- // It should not be possible to resume a change stream after a collection drop without an
- // explicit collation, even if the invalidate has not been received.
- assert.commandFailedWithCode(db.runCommand({
+ // Verify that we can resume a stream after a collection drop without an explicit collation.
+ assert.commandWorked(db.runCommand({
aggregate: coll.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
+ }));
// Recreate the collection.
coll = assertCreateCollection(db, collName);
assert.writeOK(coll.insert({_id: "after recreate"}));
- // Test resuming the change stream from the collection drop using 'resumeAfter'.
- // If running in a sharded passthrough suite, resuming from the drop will first return the drop
- // from the other shard before returning an invalidate.
+ // Test resuming the change stream from the collection drop using 'resumeAfter'. If running in a
+ // sharded passthrough suite, resuming from the drop will first return the drop from the other
+ // shard before returning an invalidate.
cursor = cst.startWatchingChanges({
collection: coll,
pipeline: [{$changeStream: {resumeAfter: resumeTokenDrop}}],
diff --git a/jstests/change_streams/resume_from_high_water_mark_token.js b/jstests/change_streams/resume_from_high_water_mark_token.js
index a58cefb8ea5..bc63fdb33d9 100644
--- a/jstests/change_streams/resume_from_high_water_mark_token.js
+++ b/jstests/change_streams/resume_from_high_water_mark_token.js
@@ -37,23 +37,33 @@
for (let resumeType of["startAfter", "resumeAfter"]) {
cmdResBeforeCollExists = assert.commandWorked(runExactCommand(db, {
aggregate: collName,
- pipeline: [{$changeStream: {[resumeType]: pbrtBeforeCollExists}}],
+ pipeline: [
+ {$changeStream: {[resumeType]: pbrtBeforeCollExists}},
+ {
+ $match: {
+ $or: [
+ {"fullDocument._id": "INSERT_ONE"},
+ {"fullDocument._id": "INSERT_TWO"}
+ ]
+ }
+ }
+ ],
cursor: {}
}));
}
csCursor = new DBCommandCursor(db, cmdResBeforeCollExists);
- // But if the collection is created with a non-simple collation, the resumed stream invalidates.
+ // If the collection is then created with a case-insensitive collation, the resumed stream
+ // continues to use the simple collation. We see 'INSERT_TWO' but not 'insert_one'.
const testCollationCollection =
assertCreateCollection(db, collName, {collation: {locale: "en_US", strength: 2}});
assert.commandWorked(testCollationCollection.insert({_id: "insert_one"}));
assert.commandWorked(testCollationCollection.insert({_id: "INSERT_TWO"}));
assert.soon(() => csCursor.hasNext());
- const invalidate = csCursor.next();
- assert.eq(invalidate.operationType, "invalidate"); // We don't see either insert.
+ assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_TWO"});
csCursor.close();
- // We can resume from the pre-creation high water mark if we specify an explicit collation...
+ // We can resume from the pre-creation high water mark if we do not specify a collation...
let cmdResResumeFromBeforeCollCreated = assert.commandWorked(runExactCommand(db, {
aggregate: collName,
pipeline: [
@@ -63,14 +73,35 @@
{$or: [{"fullDocument._id": "INSERT_ONE"}, {"fullDocument._id": "INSERT_TWO"}]}
}
],
- collation: {locale: "simple"},
cursor: {}
}));
- // This time the stream does not invalidate. We override the default case-insensitive collation
- // with the explicit simple collation we specified, so we match INSERT_TWO but not INSERT_ONE.
+ // ... but we will not inherit the collection's case-insensitive collation, instead defaulting
+ // to the simple collation. We will therefore match 'INSERT_TWO' but not 'insert_one'.
+ csCursor = new DBCommandCursor(db, cmdResResumeFromBeforeCollCreated);
+ assert.soon(() => csCursor.hasNext());
+ assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_TWO"});
+ csCursor.close();
+
+ // If we do specify a non-simple collation, it will be adopted by the pipeline.
+ cmdResResumeFromBeforeCollCreated = assert.commandWorked(runExactCommand(db, {
+ aggregate: collName,
+ pipeline: [
+ {$changeStream: {resumeAfter: pbrtBeforeCollExists}},
+ {
+ $match:
+ {$or: [{"fullDocument._id": "INSERT_ONE"}, {"fullDocument._id": "INSERT_TWO"}]}
+ }
+ ],
+ collation: {locale: "en_US", strength: 2},
+ cursor: {}
+ }));
+
+ // Now we match both 'insert_one' and 'INSERT_TWO'.
csCursor = new DBCommandCursor(db, cmdResResumeFromBeforeCollCreated);
assert.soon(() => csCursor.hasNext());
+ assert.docEq(csCursor.next().fullDocument, {_id: "insert_one"});
+ assert.soon(() => csCursor.hasNext());
assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_TWO"});
csCursor.close();
@@ -96,13 +127,21 @@
aggregate: collName,
pipeline: [
{$changeStream: {resumeAfter: hwmFromCollWithCollation}},
- {$match: {"fullDocument._id": "insert_four"}}
+ {
+ $match: {
+ $or: [
+ {"fullDocument._id": "INSERT_THREE"},
+ {"fullDocument._id": "INSERT_FOUR"}
+ ]
+ }
+ }
],
cursor: {}
}));
csCursor = new DBCommandCursor(db, cmdResResumeWithCollation);
- // ... and we inherit the collection's case-insensitive collation, matching {_id:"insert_four"}.
+ // ... but we do not inherit the collection's case-insensitive collation, matching 'INSERT_FOUR'
+ // but not the preceding 'insert_three'.
assert.soon(() => csCursor.hasNext());
assert.docEq(csCursor.next().fullDocument, {_id: "INSERT_FOUR"});
csCursor.close();
@@ -194,23 +233,21 @@
hwmPostCreation = csCursor.getResumeToken();
csCursor.close();
- // Because this HWM has a UUID, we cannot resume from the token if the collection is dropped...
+ // We can resume from the token if the collection is dropped...
assertDropCollection(db, collName);
- assert.commandFailedWithCode(runExactCommand(db, {
- aggregate: collName,
- pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
- cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
+ assert.commandWorked(runExactCommand(db, {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
+ cursor: {}
+ }));
// ... or if the collection is recreated with a different UUID...
assertCreateCollection(db, collName);
- assert.commandFailedWithCode(runExactCommand(db, {
- aggregate: collName,
- pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
- cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
- // ... unless we specify an explicit collation.
+ assert.commandWorked(runExactCommand(db, {
+ aggregate: collName,
+ pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
+ cursor: {}
+ }));
+ // ... or if we specify an explicit collation.
assert.commandWorked(runExactCommand(db, {
aggregate: collName,
pipeline: [{$changeStream: {resumeAfter: hwmPostCreation}}],
@@ -218,13 +255,13 @@
cursor: {}
}));
- // But even after the collection is recreated, we can still resume from the pre-creation HWM...
+ // Even after the collection is recreated, we can still resume from the pre-creation HWM...
cmdResResumeFromBeforeCollCreated = assert.commandWorked(runExactCommand(db, {
aggregate: collName,
pipeline: [{$changeStream: {resumeAfter: pbrtBeforeCollExists}}],
cursor: {}
}));
- // ...and can still see all the events from the collection's original incarnation...
+ // ...and we can still see all the events from the collection's original incarnation...
csCursor = new DBCommandCursor(db, cmdResResumeFromBeforeCollCreated);
docCount = 0;
assert.soon(() => {
diff --git a/jstests/noPassthrough/change_streams_collation_chunk_migration.js b/jstests/noPassthrough/change_streams_collation_chunk_migration.js
new file mode 100644
index 00000000000..4be1044d2d9
--- /dev/null
+++ b/jstests/noPassthrough/change_streams_collation_chunk_migration.js
@@ -0,0 +1,64 @@
+/**
+ * Tests that a change stream on a sharded collection with a non-simple default collation is not
+ * erroneously invalidated upon chunk migration. Reproduction script for the bug in SERVER-33944.
+ * @tags: [requires_replication, requires_journaling]
+ */
+(function() {
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/change_stream_util.js"); // For 'ChangeStreamTest'.
+
+ const st = new ShardingTest({
+ shards: 2,
+ mongos: 1,
+ rs: {
+ nodes: 1,
+ },
+ });
+
+ const testDB = st.s.getDB(jsTestName());
+
+ // Enable sharding on the test database and ensure that the primary is shard0.
+ assert.commandWorked(testDB.adminCommand({enableSharding: testDB.getName()}));
+ st.ensurePrimaryShard(testDB.getName(), st.shard0.shardName);
+
+ const caseInsensitiveCollectionName = "change_stream_case_insensitive";
+ const caseInsensitive = {locale: "en_US", strength: 2};
+
+ // Create the collection with a case-insensitive collation, then shard it on {shardKey: 1}.
+ const caseInsensitiveCollection = assertDropAndRecreateCollection(
+ testDB, caseInsensitiveCollectionName, {collation: caseInsensitive});
+ assert.commandWorked(
+ caseInsensitiveCollection.createIndex({shardKey: 1}, {collation: {locale: "simple"}}));
+ assert.commandWorked(testDB.adminCommand({
+ shardCollection: caseInsensitiveCollection.getFullName(),
+ key: {shardKey: 1},
+ collation: {locale: "simple"}
+ }));
+
+ // Verify that the collection does not exist on shard1.
+ assert(!st.shard1.getCollection(caseInsensitiveCollection.getFullName()).exists());
+
+ // Now open a change stream on the collection.
+ const cst = new ChangeStreamTest(testDB);
+ const csCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {}}, {$project: {docId: "$documentKey.shardKey"}}],
+ collection: caseInsensitiveCollection
+ });
+
+ // Insert some documents into the collection.
+ assert.commandWorked(caseInsensitiveCollection.insert({shardKey: 0, text: "aBc"}));
+ assert.commandWorked(caseInsensitiveCollection.insert({shardKey: 1, text: "abc"}));
+
+ // Move a chunk from shard0 to shard1. This will create the collection on shard1.
+ assert.commandWorked(testDB.adminCommand({
+ moveChunk: caseInsensitiveCollection.getFullName(),
+ find: {shardKey: 1},
+ to: st.rs1.getURL(),
+ _waitForDelete: false
+ }));
+
+ // Attempt to read from the change stream. We should see both inserts, without an invalidation.
+ cst.assertNextChangesEqual({cursor: csCursor, expectedChanges: [{docId: 0}, {docId: 1}]});
+
+ st.stop();
+})();
diff --git a/jstests/noPassthrough/change_streams_update_lookup_collation.js b/jstests/noPassthrough/change_streams_update_lookup_collation.js
index b82ad384cde..97c7e4013a5 100644
--- a/jstests/noPassthrough/change_streams_update_lookup_collation.js
+++ b/jstests/noPassthrough/change_streams_update_lookup_collation.js
@@ -39,7 +39,8 @@
assert.writeOK(coll.insert({_id: "åbC", x: "AbÇ"}));
const changeStreamDefaultCollation = coll.aggregate(
- [{$changeStream: {fullDocument: "updateLookup"}}, {$match: {"fullDocument.x": "abc"}}]);
+ [{$changeStream: {fullDocument: "updateLookup"}}, {$match: {"fullDocument.x": "abc"}}],
+ {collation: caseInsensitive});
// Strength one will consider "ç" equal to "c" and "C".
const strengthOneCollation = {locale: "en_US", strength: 1};
diff --git a/jstests/sharding/change_stream_metadata_notifications.js b/jstests/sharding/change_stream_metadata_notifications.js
index b7ff3b1fae4..f535012a7b2 100644
--- a/jstests/sharding/change_stream_metadata_notifications.js
+++ b/jstests/sharding/change_stream_metadata_notifications.js
@@ -110,13 +110,12 @@
assert.eq(next.operationType, "invalidate");
assert(changeStream.isExhausted());
- // Test that we cannot resume the change stream without specifying an explicit collation.
- assert.commandFailedWithCode(mongosDB.runCommand({
+ // Test that we can resume the change stream without specifying an explicit collation.
+ assert.commandWorked(mongosDB.runCommand({
aggregate: mongosColl.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
+ }));
// Recreate and shard the collection.
assert.commandWorked(mongosDB.createCollection(mongosColl.getName()));
@@ -125,14 +124,13 @@
assert.commandWorked(
mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}}));
- // Test that resuming the change stream on the recreated collection fails since the UUID has
- // changed.
- assert.commandFailedWithCode(mongosDB.runCommand({
+ // Test that resuming the change stream on the recreated collection succeeds, since we will not
+ // attempt to inherit the collection's default collation and can therefore ignore the new UUID.
+ assert.commandWorked(mongosDB.runCommand({
aggregate: mongosColl.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdate}}],
cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
+ }));
// Recreate the collection as unsharded and open a change stream on it.
assertDropAndRecreateCollection(mongosDB, mongosColl.getName());
diff --git a/jstests/sharding/change_streams.js b/jstests/sharding/change_streams.js
index e2ef69362c9..142b621f1fe 100644
--- a/jstests/sharding/change_streams.js
+++ b/jstests/sharding/change_streams.js
@@ -238,14 +238,12 @@
assert.soon(() => changeStream.hasNext());
assert.eq(changeStream.next().operationType, "invalidate");
- // Without an explicit collation, test that we *cannot* resume from before the collection
- // drop
- assert.commandFailedWithCode(mongosDB.runCommand({
+ // Test that we can resume from before the collection drop without an explicit collation.
+ assert.commandWorked(mongosDB.runCommand({
aggregate: mongosColl.getName(),
pipeline: [{$changeStream: {resumeAfter: resumeToken}}],
cursor: {}
- }),
- ErrorCodes.InvalidResumeToken);
+ }));
st.stop();
}
diff --git a/src/mongo/db/commands/run_aggregate.cpp b/src/mongo/db/commands/run_aggregate.cpp
index f371ef7b1a0..8b7d7fd7863 100644
--- a/src/mongo/db/commands/run_aggregate.cpp
+++ b/src/mongo/db/commands/run_aggregate.cpp
@@ -400,8 +400,7 @@ Status runAggregate(OperationContext* opCtx,
// where the collation has not yet been resolved, and where it has been resolved to nullptr.
boost::optional<std::unique_ptr<CollatorInterface>> collatorToUse;
- // The UUID of the collection for the execution namespace of this aggregation. For change
- // streams, this will be the UUID of the original namespace instead of the oplog namespace.
+ // The UUID of the collection for the execution namespace of this aggregation.
boost::optional<UUID> uuid;
std::vector<unique_ptr<PlanExecutor, PlanExecutor::Deleter>> execs;
@@ -425,28 +424,6 @@ Status runAggregate(OperationContext* opCtx,
return ex.toStatus();
}
- if (liteParsedPipeline.hasChangeStream()) {
- nss = NamespaceString::kRsOplogNamespace;
-
- // Upgrade and wait for read concern if necessary.
- _adjustChangeStreamReadConcern(opCtx);
-
- if (liteParsedPipeline.shouldResolveUUIDAndCollation()) {
- // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view.
- AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss);
-
- // Resolve the collator to either the user-specified collation or the default
- // collation of the collection on which $changeStream was invoked, so that we do not
- // end up resolving the collation on the oplog.
- invariant(!collatorToUse);
- Collection* origColl = origNssCtx.getCollection();
- collatorToUse.emplace(resolveCollator(opCtx, request, origColl));
-
- // Get the collection UUID to be set on the expression context.
- uuid = origColl ? origColl->uuid() : boost::none;
- }
- }
-
const auto& pipelineInvolvedNamespaces = liteParsedPipeline.getInvolvedNamespaces();
// If emplaced, AutoGetCollectionForReadCommand will throw if the sharding version for this
@@ -458,31 +435,44 @@ Status runAggregate(OperationContext* opCtx,
// AutoStatsTracker to record CurOp and Top entries.
boost::optional<AutoStatsTracker> statsTracker;
- // If this is a collectionless aggregation with no foreign namespaces, we don't want to
- // acquire any locks. Otherwise, lock the collection or view.
- if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) {
+ // If this is a change stream, perform special checks and change the execution namespace.
+ if (liteParsedPipeline.hasChangeStream()) {
+ // Replace the execution namespace with that of the oplog.
+ nss = NamespaceString::kRsOplogNamespace;
+
+ // Upgrade and wait for read concern if necessary.
+ _adjustChangeStreamReadConcern(opCtx);
+
+ // AutoGetCollectionForReadCommand will raise an error if 'origNss' is a view. We do not
+ // need to check this if we are opening a stream on an entire db or across the cluster.
+ if (!origNss.isCollectionlessAggregateNS()) {
+ AutoGetCollectionForReadCommand origNssCtx(opCtx, origNss);
+ }
+
+ // If the user specified an explicit collation, adopt it; otherwise, use the simple
+ // collation. We do not inherit the collection's default collation or UUID, since
+ // the stream may be resuming from a point before the current UUID existed.
+ collatorToUse.emplace(resolveCollator(opCtx, request, nullptr));
+
+ // Obtain collection locks on the execution namespace; that is, the oplog.
+ ctx.emplace(opCtx, nss, AutoGetCollection::ViewMode::kViewsForbidden);
+ } else if (nss.isCollectionlessAggregateNS() && pipelineInvolvedNamespaces.empty()) {
+ // If this is a collectionless agg with no foreign namespaces, don't acquire any locks.
statsTracker.emplace(opCtx,
nss,
Top::LockType::NotLocked,
AutoStatsTracker::LogMode::kUpdateTopAndCurop,
0);
+ collatorToUse.emplace(resolveCollator(opCtx, request, nullptr));
} else {
+ // This is a regular aggregation. Lock the collection or view.
ctx.emplace(opCtx, nss, AutoGetCollection::ViewMode::kViewsPermitted);
+ collatorToUse.emplace(resolveCollator(opCtx, request, ctx->getCollection()));
+ uuid = ctx->getCollection() ? ctx->getCollection()->uuid() : boost::none;
}
Collection* collection = ctx ? ctx->getCollection() : nullptr;
- // For change streams, the UUID will already have been set for the original namespace.
- if (!liteParsedPipeline.hasChangeStream()) {
- uuid = collection ? collection->uuid() : boost::none;
- }
-
- // The collator may already have been set if this is a $changeStream pipeline. If not,
- // resolve the collator to either the user-specified collation or the collection default.
- if (!collatorToUse) {
- collatorToUse.emplace(resolveCollator(opCtx, request, collection));
- }
-
// If this is a view, resolve it by finding the underlying collection and stitching view
// pipelines and this request's pipeline together. We then release our locks before
// recursively calling runAggregate(), which will re-acquire locks on the underlying
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index b20c5d9cb9a..4984207ae4c 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -222,14 +222,6 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
// Generate 'rename' entries if the change stream is open on the source or target namespace.
relevantCommands.append(BSON("o.renameCollection" << nss.ns()));
relevantCommands.append(BSON("o.to" << nss.ns()));
- if (expCtx->collation.isEmpty()) {
- // If the user did not specify a collation, they should be using the collection's
- // default collation. So a "create" command which has any collation present would
- // invalidate the change stream, since that must mean the stream was created before the
- // collection existed and used the simple collation, which is no longer the default.
- relevantCommands.append(
- BSON("o.create" << nss.coll() << "o.collation" << BSON("$exists" << true)));
- }
} else {
// For change streams on an entire database, include notifications for drops and renames of
// non-system collections which will not invalidate the stream. Also include the
@@ -287,52 +279,6 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
namespace {
-/**
- * Throws an assertion if this pipeline might need to use a collation but it can't figure out what
- * the collation should be. Specifically, it is only safe to resume if at least one of the following
- * is true:
- * * The request has an explicit collation set, so we don't need to know if there was a default
- * collation on the collection.
- * * The request is 'collectionless', meaning it's a change stream on a whole database or a
- * whole cluster. Unlike individual collections, there is no concept of a default collation
- * at the level of an entire database or cluster.
- * * The resume token contains a UUID and a collection with that UUID still exists, thus we can
- * figure out its default collation.
- */
-void assertResumeAllowed(const intrusive_ptr<ExpressionContext>& expCtx,
- const ResumeTokenData& tokenData) {
- if (!expCtx->collation.isEmpty()) {
- // Explicit collation has been set, it's okay to resume.
- return;
- }
-
- if (!expCtx->isSingleNamespaceAggregation()) {
- // Change stream on a whole database or cluster, do not need to worry about collation.
- return;
- }
-
- if (!tokenData.uuid && ResumeToken::isHighWaterMarkToken(tokenData)) {
- // The only time we see a single-collection high water mark with no UUID is when the stream
- // was opened on a non-existent collection. We allow this to proceed, as the resumed stream
- // will immediately invalidate itself if it observes a createCollection event in the oplog
- // with a non-simple collation.
- return;
- }
-
- const auto cannotResumeErrMsg =
- "Attempted to resume a stream on a collection which has been dropped. The change stream's "
- "pipeline may need to make comparisons which should respect the collection's default "
- "collation, which can no longer be determined. If you wish to resume this change stream "
- "you must specify a collation with the request.";
- // Verify that the UUID on the expression context matches the UUID in the resume token.
- // TODO SERVER-35254: If we're on a stale mongos, this check may incorrectly reject a valid
- // resume token since the UUID on the expression context could be for a previous version of the
- // collection.
- uassert(ErrorCodes::InvalidResumeToken,
- cannotResumeErrMsg,
- expCtx->uuid && tokenData.uuid && expCtx->uuid.get() == tokenData.uuid.get());
-}
-
list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<ExpressionContext>& expCtx,
const DocumentSourceChangeStreamSpec spec,
BSONElement elem) {
@@ -359,9 +305,14 @@ list<intrusive_ptr<DocumentSource>> buildPipeline(const intrusive_ptr<Expression
"Attempting to resume a change stream using 'resumeAfter' is not allowed from an "
"invalidate notification.",
!resumeAfter || !tokenData.fromInvalidate);
- // Verify that the requested resume attempt is possible based on the stream type, resume
- // token UUID, and collation.
- assertResumeAllowed(expCtx, tokenData);
+
+ // If we are resuming a single-collection stream, the resume token should always contain a
+ // UUID unless the token is a high water mark.
+ uassert(ErrorCodes::InvalidResumeToken,
+ "Attempted to resume a single-collection stream, but the resume token does not "
+ "include a UUID.",
+ tokenData.uuid || !expCtx->isSingleNamespaceAggregation() ||
+ ResumeToken::isHighWaterMarkToken(tokenData));
// Store the resume token as the initial postBatchResumeToken for this stream.
expCtx->initialPostBatchResumeToken = token.toDocument().toBson();
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index a6d3405a9a0..077ec7ff616 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -48,46 +48,15 @@ public:
public:
static std::unique_ptr<LiteParsed> parse(const AggregationRequest& request,
const BSONElement& spec) {
- return stdx::make_unique<LiteParsed>(request.getNamespaceString(), spec);
+ return stdx::make_unique<LiteParsed>(request.getNamespaceString());
}
- explicit LiteParsed(NamespaceString nss, BSONElement spec) : _nss(std::move(nss)) {
- // We don't do any validation here, just a minimal check for the resume token. We also
- // do not need to extract the token unless the stream is running on a single namespace.
- if (_nss.isCollectionlessAggregateNS() || spec.type() != BSONType::Object) {
- return;
- }
- // Check the 'resumeAfter' field first; if empty, check the 'startAfter' field.
- auto specObj = spec.embeddedObject();
- _resumeToken =
- specObj.getObjectField(DocumentSourceChangeStreamSpec::kResumeAfterFieldName);
- if (_resumeToken.isEmpty()) {
- _resumeToken =
- specObj.getObjectField(DocumentSourceChangeStreamSpec::kStartAfterFieldName);
- }
- }
+ explicit LiteParsed(NamespaceString nss) : _nss(std::move(nss)) {}
bool isChangeStream() const final {
return true;
}
- bool shouldResolveUUIDAndCollation() const final {
- // If this is a whole-db or whole-cluster stream, never resolve the UUID and collation.
- if (_nss.isCollectionlessAggregateNS()) {
- return false;
- }
- // If we are not resuming, always resolve the UUID and collation.
- if (_resumeToken.isEmpty()) {
- return true;
- }
- // If we are resuming a single-collection stream from a high water mark that does not
- // have a UUID, then the token was generated before the collection was created. Do not
- // attempt to resolve the collection's current UUID or collation, so that the stream
- // resumes in exactly the same condition as it was in when the token was generated.
- auto tokenData = ResumeToken::parse(_resumeToken).getData();
- return !(ResumeToken::isHighWaterMarkToken(tokenData) && !tokenData.uuid);
- }
-
bool allowedToForwardFromMongos() const final {
return false;
}
@@ -128,7 +97,6 @@ public:
private:
const NamespaceString _nss;
- BSONObj _resumeToken;
};
// The name of the field where the document key (_id and shard key, if present) will be found
diff --git a/src/mongo/db/pipeline/lite_parsed_document_source.h b/src/mongo/db/pipeline/lite_parsed_document_source.h
index 3832046bf47..7fb21cb37bf 100644
--- a/src/mongo/db/pipeline/lite_parsed_document_source.h
+++ b/src/mongo/db/pipeline/lite_parsed_document_source.h
@@ -108,14 +108,6 @@ public:
}
/**
- * Returns true if this pipeline's UUID and collation should be resolved. For the latter, this
- * means adopting the collection's default collation, unless a custom collation was specified.
- */
- virtual bool shouldResolveUUIDAndCollation() const {
- return true;
- }
-
- /**
* Returns true if this stage does not require an input source.
*/
virtual bool isInitialSource() const {
diff --git a/src/mongo/db/pipeline/lite_parsed_pipeline.h b/src/mongo/db/pipeline/lite_parsed_pipeline.h
index 5adeb3dd835..1d92064d584 100644
--- a/src/mongo/db/pipeline/lite_parsed_pipeline.h
+++ b/src/mongo/db/pipeline/lite_parsed_pipeline.h
@@ -103,18 +103,6 @@ public:
}
/**
- * Returns true if this pipeline's UUID and collation should be resolved. For the latter, this
- * means adopting the collection's default collation, unless a custom collation was specified.
- */
- bool shouldResolveUUIDAndCollation() const {
- // Collectionless aggregations do not have a UUID or default collation.
- return !_nss.isCollectionlessAggregateNS() &&
- std::all_of(_stageSpecs.begin(), _stageSpecs.end(), [](auto&& spec) {
- return spec->shouldResolveUUIDAndCollation();
- });
- }
-
- /**
* Returns false if the pipeline has any stage which must be run locally on mongos.
*/
bool allowedToForwardFromMongos() const {
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 92142c1c166..e683ae162de 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -465,12 +465,12 @@ std::pair<BSONObj, boost::optional<UUID>> getCollationAndUUID(
const bool collectionIsSharded = (routingInfo && routingInfo->cm());
const bool collectionIsNotSharded = (routingInfo && !routingInfo->cm());
- // If the LiteParsedPipeline reports that we should not attempt to resolve the namespace's UUID
- // and collation, we immediately return the user-defined collation if one exists, or an empty
- // BSONObj otherwise. For instance, because collectionless aggregations generally run against
+ // If this is a change stream or a collectionless aggregation, we immediately return the user-
+ // defined collation if one exists, or an empty BSONObj otherwise. Change streams never inherit
+ // the collection's default collation, and since collectionless aggregations generally run on
// the 'admin' database, the standard logic would attempt to resolve its non-existent UUID and
// collation by sending a specious 'listCollections' command to the config servers.
- if (!litePipe.shouldResolveUUIDAndCollation()) {
+ if (litePipe.hasChangeStream() || nss.isCollectionlessAggregateNS()) {
return {request.getCollation(), boost::none};
}