diff options
7 files changed, 259 insertions, 30 deletions
diff --git a/jstests/change_streams/change_stream_whole_cluster.js b/jstests/change_streams/change_stream_whole_cluster.js index 0f3c9d6ec92..16500be99b8 100644 --- a/jstests/change_streams/change_stream_whole_cluster.js +++ b/jstests/change_streams/change_stream_whole_cluster.js @@ -5,6 +5,7 @@ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and // assert[Valid|Invalid]ChangeStreamNss. + load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. const adminDB = db.getSiblingDB("admin"); const otherDB = db.getSiblingDB(`${db.getName()}_other`); @@ -49,12 +50,83 @@ }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + // Test that the change stream returns an inserted doc on a user-created database whose name + // includes 'admin', 'local', or 'config'. + const validUserDBs = [ + "admin1", + "1admin", + "_admin_", + "local_", + "_local", + "_local_", + "config_", + "_config", + "_config_" + ]; + validUserDBs.forEach(dbName => { + assert.writeOK(db.getSiblingDB(dbName).test.insert({_id: 0, a: 1})); + expected = [ + { + documentKey: {_id: 0}, + fullDocument: {_id: 0, a: 1}, + ns: {db: dbName, coll: "test"}, + operationType: "insert", + }, + ]; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + }); + + // Test that the change stream returns an inserted doc on a user-created collection whose name + // includes "system" but is not considered an internal collection. + const validSystemColls = ["system", "systems.views", "ssystem.views", "test.system"]; + validSystemColls.forEach(collName => { + assert.writeOK(db.getCollection(collName).insert({_id: 0, a: 1})); + expected = [ + { + documentKey: {_id: 0}, + fullDocument: {_id: 0, a: 1}, + ns: {db: db.getName(), coll: collName}, + operationType: "insert", + }, + ]; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + }); + + // Test that the change stream filters out operations on any collection in the 'admin', 'local', + // or 'config' databases. + const filteredDBs = ["admin", "local", "config"]; + filteredDBs.forEach(dbName => { + // Not allowed to use 'local' db through mongos. + if (FixtureHelpers.isMongos(db) && dbName == "local") + return; + + assert.writeOK(db.getSiblingDB(dbName).test.insert({_id: 0, a: 1})); + // Insert to the test collection to ensure that the change stream has something to + // return. + assert.writeOK(db.t1.insert({_id: dbName})); + expected = [ + { + documentKey: {_id: dbName}, + fullDocument: {_id: dbName}, + ns: {db: db.getName(), coll: "t1"}, + operationType: "insert", + }, + ]; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + // Drop the test collection to avoid duplicate key errors if this test is run multiple + // times. + assertDropCollection(db.getSiblingDB(dbName), "test"); + }); + // Dropping either database should invalidate the change stream. assert.commandWorked(otherDB.dropDatabase()); expected = {operationType: "invalidate"}; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); - // Drop the remaining database and clean up the test. + // Drop the remaining databases and clean up the test. assert.commandWorked(db.dropDatabase()); + validUserDBs.forEach(dbName => { + db.getSiblingDB(dbName).dropDatabase(); + }); cst.cleanUp(); }()); diff --git a/jstests/change_streams/change_stream_whole_cluster_invalidations.js b/jstests/change_streams/change_stream_whole_cluster_invalidations.js index d2c07cdbc81..c794a2ee186 100644 --- a/jstests/change_streams/change_stream_whole_cluster_invalidations.js +++ b/jstests/change_streams/change_stream_whole_cluster_invalidations.js @@ -117,17 +117,62 @@ expectInvalidate: true }); - // Dropping a 'system' collection should invalidate the change stream. - // Create a view to ensure that the 'system.views' collection exists. + // Drop the database in order to clear the system and non-system test collections before + // running the rename tests. + assert.commandWorked(testDB.dropDatabase()); + + // Operations on internal "system" collections should be filtered out and not included in + // the change stream. + aggCursor = cst.startWatchingAllChangesForCluster(); + // Creating a view will generate an insert entry on the "system.views" collection. assert.commandWorked( testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); - aggCursor = cst.startWatchingAllChangesForCluster(); + // Drop the "system.views" collection. assertDropCollection(testDB, "system.views"); + // Verify that the change stream does not report the insertion into "system.views", and is + // not invalidated by dropping the system collection. Instead, it correctly reports the next + // write to the test collection. + assert.writeOK(collToInvalidate.insert({_id: 0})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()}); + + // Test that renaming a "system" collection *does* invalidate the stream if the target of + // the rename is a non-system collection. + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); + assert.writeOK(testDB.system.views.renameCollection("non_system_collection")); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + + // Test that renaming a "system" collection to a different "system" collection does not + // result in a notification in the change stream. + aggCursor = cst.startWatchingAllChangesForCluster(); + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); + // Note that the target of the rename must be a valid "system" collection. + assert.writeOK(testDB.system.views.renameCollection("system.js")); + // Verify that the change stream filters out the rename, instead returning the next insert + // to the test collection. + assert.writeOK(collToInvalidate.insert({_id: 1})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()}); + + // Test that renaming a user collection to a "system" collection *does* invalidate the + // stream. + assert.writeOK(collToInvalidate.renameCollection("system.views")); cst.assertNextChangesEqual({ cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}], expectInvalidate: true }); + + // Drop the "system.views" collection to avoid view catalog errors in subsequent tests. + assertDropCollection(testDB, "system.views"); } cst.cleanUp(); diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/change_stream_whole_db.js index c904c4a750e..afdb0166ffd 100644 --- a/jstests/change_streams/change_stream_whole_db.js +++ b/jstests/change_streams/change_stream_whole_db.js @@ -45,6 +45,22 @@ }; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + // Test that the change stream returns an inserted doc on a user-created collection whose name + // includes "system" but is not considered an internal collection. + const validSystemColls = ["system", "systems.views", "ssystem.views", "test.system"]; + validSystemColls.forEach(collName => { + assert.writeOK(db.getCollection(collName).insert({_id: 0, a: 1})); + expected = [ + { + documentKey: {_id: 0}, + fullDocument: {_id: 0, a: 1}, + ns: {db: db.getName(), coll: collName}, + operationType: "insert", + }, + ]; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + }); + // Dropping the database should invalidate the change stream. assert.commandWorked(db.dropDatabase()); cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]}); diff --git a/jstests/change_streams/change_stream_whole_db_invalidations.js b/jstests/change_streams/change_stream_whole_db_invalidations.js index 463359ec839..0bd2b7daae2 100644 --- a/jstests/change_streams/change_stream_whole_db_invalidations.js +++ b/jstests/change_streams/change_stream_whole_db_invalidations.js @@ -109,17 +109,61 @@ expectInvalidate: true }); - // Dropping a 'system' collection should invalidate the change stream. - // Create a view to ensure that the 'system.views' collection exists. + // Drop the database in order to clear the system and non-system test collections before running + // the rename tests. + assert.commandWorked(testDB.dropDatabase()); + + // Operations on internal "system" collections should be filtered out and not included in the + // change stream. + aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); + // Creating a view will generate an insert entry on the "system.views" collection. assert.commandWorked( testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []})); - aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); + // Drop the "system.views" collection. assertDropCollection(testDB, "system.views"); + // Verify that the change stream does not report the insertion into "system.views", and is + // not invalidated by dropping the system collection. Instead, it correctly reports the next + // write to the test collection. + assert.writeOK(coll.insert({_id: 0})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}); + + // Test that renaming a "system" collection *does* invalidate the stream if the target of + // the rename is a non-system collection. + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []})); + assert.writeOK(testDB.system.views.renameCollection("non_system_collection")); cst.assertNextChangesEqual({ cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}], expectInvalidate: true }); + // Test that renaming a "system" collection to a different "system" collection does not + // result in a notification in the change stream. + aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []})); + // Note that the target of the rename must be a valid "system" collection. + assert.writeOK(testDB.system.views.renameCollection("system.js")); + // Verify that the change stream filters out the rename, instead returning the next insert to + // the test collection. + assert.writeOK(coll.insert({_id: 1})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}); + + // Test that renaming a user collection to a "system" collection *does* invalidate the stream. + assert.writeOK(coll.renameCollection("system.views")); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + + // Drop the "system.views" collection to avoid view catalog errors in subsequent tests. + assertDropCollection(testDB, "system.views"); + cst.cleanUp(); }()); diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index a9a3d161342..d3d8314cb45 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -285,11 +285,11 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac case ChangeStreamType::kSingleDatabase: // Match all namespaces that start with db name, followed by ".", then NOT followed by // '$' or 'system.' - return "^" + nss.db() + kRegexAllCollections; + return "^" + nss.db() + "\\." + kRegexAllCollections; case ChangeStreamType::kAllChangesForCluster: // Match all namespaces that start with any db name other than admin, config, or local, - // followed by ".", then NOT followed by '$' or 'system.' - return "^" + kRegexAllDBs + kRegexAllCollections; + // followed by ".", then NOT followed by '$' or 'system.'. + return kRegexAllDBs + "\\." + kRegexAllCollections; default: MONGO_UNREACHABLE; } @@ -320,18 +320,20 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( BSON("o.create" << nss.coll() << "o.collation" << BSON("$exists" << true))); } } else { - // For change streams on an entire database, the stream is invalidated if any collections in - // that database are dropped or renamed. For cluster-wide streams, drops or renames of any - // collection in any database (aside from the internal databases admin, config and local) - // will invalidate the stream. - invalidatingCommands.append(BSON("o.drop" << BSON("$exists" << true))); - invalidatingCommands.append(BSON("o.renameCollection" << BSON("$exists" << true))); + // For change streams on an entire database, the stream is invalidated if any non-system + // collections in that database are dropped or renamed. For cluster-wide streams, drops or + // renames of any non-system collection in any database (aside from the internal databases + // admin, config and local) will invalidate the stream. + invalidatingCommands.append(BSON("o.drop" << BSONRegEx("^" + kRegexAllCollections))); + // Note that 'o.renameCollection' contains the full NamespaceString. + invalidatingCommands.append( + BSON("o.renameCollection" << BSONRegEx(getNsRegexForChangeStream(nss)))); } // For cluster-wide $changeStream, match the command namespace of any database other than admin, // config, or local. Otherwise, match only against the target db's command namespace. auto cmdNsFilter = (sourceType == ChangeStreamType::kAllChangesForCluster - ? BSON("ns" << BSONRegEx("^" + kRegexAllDBs + kRegexCmdColl)) + ? BSON("ns" << BSONRegEx(kRegexAllDBs + "\\." + kRegexCmdColl)) : BSON("ns" << nss.getCommandNS().ns())); // 1.1) Commands that are on target db(s) and one of the above invalidating commands. @@ -339,9 +341,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( BSON("$and" << BSON_ARRAY(cmdNsFilter << BSON("$or" << invalidatingCommands.arr()))); // 1.2) Supported commands that have arbitrary db namespaces in "ns" field. - auto renameDropTarget = (sourceType == ChangeStreamType::kAllChangesForCluster - ? BSON("o.to" << BSON("$exists" << true)) - : BSON("o.to" << nss.ns())); + auto renameDropTarget = BSON("o.to" << BSONRegEx(getNsRegexForChangeStream(nss))); // All supported commands that are either (1.1) or (1.2). BSONObj commandMatch = BSON("op" diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 0912e79ebd7..9bc9ca54eb5 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -190,9 +190,9 @@ public: static void checkValueType(const Value v, const StringData fieldName, BSONType expectedType); private: - static constexpr StringData kRegexAllCollections = R"(\.(?!(\$|system\.)))"_sd; - static constexpr StringData kRegexAllDBs = "(?!(admin|config|local)).+"_sd; - static constexpr StringData kRegexCmdColl = R"(\.\$cmd$)"_sd; + static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd; + static constexpr StringData kRegexAllDBs = R"(^(?!(admin|config|local)\.)[^.]+)"_sd; + static constexpr StringData kRegexCmdColl = R"(\$cmd$)"_sd; // Helper function which throws if the $changeStream fails any of a series of semantic checks. // For instance, whether it is permitted to run given the current FCV, whether the namespace is diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp index 1cb939b573a..441f5c65019 100644 --- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp @@ -1086,6 +1086,32 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDo 40645); } +TEST_F(ChangeStreamStageTest, RenameFromSystemToUserCollectionShouldIncludeNotification) { + // Renaming to a non-system collection will include a notification in the stream. + NamespaceString systemColl(nss.db() + ".system.users"); + OplogEntry rename = + createCommand(BSON("renameCollection" << systemColl.ns() << "to" << nss.ns()), testUuid()); + Document expectedInvalidate{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + }; + checkTransformation(rename, expectedInvalidate); +} + +TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotification) { + // Renaming to a system collection will include a notification in the stream. + NamespaceString systemColl(nss.db() + ".system.users"); + OplogEntry rename = + createCommand(BSON("renameCollection" << nss.ns() << "to" << systemColl.ns()), testUuid()); + Document expectedInvalidate{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + }; + checkTransformation(rename, expectedInvalidate); +} + // // Test class for change stream of a single database. // @@ -1317,20 +1343,46 @@ TEST_F(ChangeStreamStageDBTest, TransformInvalidate) { checkTransformation(dropDB, expectedInvalidateDropDatabase); } -TEST_F(ChangeStreamStageDBTest, SystemCollectionsDropOrRenameShouldInvalidate) { +TEST_F(ChangeStreamStageDBTest, MatchFiltersOperationsOnSystemCollections) { + NamespaceString systemColl(nss.db() + ".system.users"); + OplogEntry insert = makeOplogEntry(OpTypeEnum::kInsert, systemColl, BSON("_id" << 1)); + checkTransformation(insert, boost::none); + + OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid()); + checkTransformation(dropColl, boost::none); + + // Rename from a 'system' collection to another 'system' collection should not include a + // notification. + NamespaceString renamedSystemColl(nss.db() + ".system.views"); + OplogEntry rename = createCommand( + BSON("renameCollection" << systemColl.ns() << "to" << renamedSystemColl.ns()), testUuid()); + checkTransformation(rename, boost::none); +} + +TEST_F(ChangeStreamStageDBTest, RenameFromSystemToUserCollectionShouldIncludeNotification) { + // Renaming to a non-system collection will include a notification in the stream. NamespaceString systemColl(nss.db() + ".system.users"); - NamespaceString renamedSystemColl(nss.db() + ".system.users_new"); + NamespaceString renamedColl(nss.db() + ".non_system_coll"); + OplogEntry rename = createCommand( + BSON("renameCollection" << systemColl.ns() << "to" << renamedColl.ns()), testUuid()); Document expectedInvalidate{ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, {DSChangeStream::kClusterTimeField, kDefaultTs}, }; + checkTransformation(rename, expectedInvalidate); +} - OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid()); - checkTransformation(dropColl, expectedInvalidate); - - OplogEntry rename = createCommand( - BSON("renameCollection" << systemColl.ns() << "to" << renamedSystemColl.ns()), testUuid()); +TEST_F(ChangeStreamStageDBTest, RenameFromUserToSystemCollectionShouldIncludeNotification) { + // Renaming to a system collection will include a notification in the stream. + NamespaceString systemColl(nss.db() + ".system.users"); + OplogEntry rename = + createCommand(BSON("renameCollection" << nss.ns() << "to" << systemColl.ns()), testUuid()); + Document expectedInvalidate{ + {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())}, + {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType}, + {DSChangeStream::kClusterTimeField, kDefaultTs}, + }; checkTransformation(rename, expectedInvalidate); } |