summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorNick Zolnierz <nicholas.zolnierz@mongodb.com>2018-06-06 14:38:18 -0400
committerBernard Gorman <bernard.gorman@gmail.com>2018-06-14 11:16:52 +0100
commit3c4c5b4da681e6d50744460243f6361fe0328389 (patch)
treea4dfbb0c840aa59033bdc1f9b256ad4cc734bd0d
parent0dc372d9694439914955bb7169330be6967cb4aa (diff)
downloadmongo-3c4c5b4da681e6d50744460243f6361fe0328389.tar.gz
SERVER-35401: Whole-db and whole-cluster change streams do not filter operations to "system" collections
(cherry picked from commit 12f7795c52a7d352da1a7e03f71a233624dc05d4)
-rw-r--r--jstests/change_streams/change_stream_whole_cluster.js74
-rw-r--r--jstests/change_streams/change_stream_whole_cluster_invalidations.js51
-rw-r--r--jstests/change_streams/change_stream_whole_db.js16
-rw-r--r--jstests/change_streams/change_stream_whole_db_invalidations.js50
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp26
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h6
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream_test.cpp66
7 files changed, 259 insertions, 30 deletions
diff --git a/jstests/change_streams/change_stream_whole_cluster.js b/jstests/change_streams/change_stream_whole_cluster.js
index 0f3c9d6ec92..16500be99b8 100644
--- a/jstests/change_streams/change_stream_whole_cluster.js
+++ b/jstests/change_streams/change_stream_whole_cluster.js
@@ -5,6 +5,7 @@
load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and
// assert[Valid|Invalid]ChangeStreamNss.
+ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
const adminDB = db.getSiblingDB("admin");
const otherDB = db.getSiblingDB(`${db.getName()}_other`);
@@ -49,12 +50,83 @@
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+ // Test that the change stream returns an inserted doc on a user-created database whose name
+ // includes 'admin', 'local', or 'config'.
+ const validUserDBs = [
+ "admin1",
+ "1admin",
+ "_admin_",
+ "local_",
+ "_local",
+ "_local_",
+ "config_",
+ "_config",
+ "_config_"
+ ];
+ validUserDBs.forEach(dbName => {
+ assert.writeOK(db.getSiblingDB(dbName).test.insert({_id: 0, a: 1}));
+ expected = [
+ {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, a: 1},
+ ns: {db: dbName, coll: "test"},
+ operationType: "insert",
+ },
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ });
+
+ // Test that the change stream returns an inserted doc on a user-created collection whose name
+ // includes "system" but is not considered an internal collection.
+ const validSystemColls = ["system", "systems.views", "ssystem.views", "test.system"];
+ validSystemColls.forEach(collName => {
+ assert.writeOK(db.getCollection(collName).insert({_id: 0, a: 1}));
+ expected = [
+ {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, a: 1},
+ ns: {db: db.getName(), coll: collName},
+ operationType: "insert",
+ },
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ });
+
+ // Test that the change stream filters out operations on any collection in the 'admin', 'local',
+ // or 'config' databases.
+ const filteredDBs = ["admin", "local", "config"];
+ filteredDBs.forEach(dbName => {
+ // Not allowed to use 'local' db through mongos.
+ if (FixtureHelpers.isMongos(db) && dbName == "local")
+ return;
+
+ assert.writeOK(db.getSiblingDB(dbName).test.insert({_id: 0, a: 1}));
+ // Insert to the test collection to ensure that the change stream has something to
+ // return.
+ assert.writeOK(db.t1.insert({_id: dbName}));
+ expected = [
+ {
+ documentKey: {_id: dbName},
+ fullDocument: {_id: dbName},
+ ns: {db: db.getName(), coll: "t1"},
+ operationType: "insert",
+ },
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ // Drop the test collection to avoid duplicate key errors if this test is run multiple
+ // times.
+ assertDropCollection(db.getSiblingDB(dbName), "test");
+ });
+
// Dropping either database should invalidate the change stream.
assert.commandWorked(otherDB.dropDatabase());
expected = {operationType: "invalidate"};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
- // Drop the remaining database and clean up the test.
+ // Drop the remaining databases and clean up the test.
assert.commandWorked(db.dropDatabase());
+ validUserDBs.forEach(dbName => {
+ db.getSiblingDB(dbName).dropDatabase();
+ });
cst.cleanUp();
}());
diff --git a/jstests/change_streams/change_stream_whole_cluster_invalidations.js b/jstests/change_streams/change_stream_whole_cluster_invalidations.js
index d2c07cdbc81..c794a2ee186 100644
--- a/jstests/change_streams/change_stream_whole_cluster_invalidations.js
+++ b/jstests/change_streams/change_stream_whole_cluster_invalidations.js
@@ -117,17 +117,62 @@
expectInvalidate: true
});
- // Dropping a 'system' collection should invalidate the change stream.
- // Create a view to ensure that the 'system.views' collection exists.
+ // Drop the database in order to clear the system and non-system test collections before
+ // running the rename tests.
+ assert.commandWorked(testDB.dropDatabase());
+
+ // Operations on internal "system" collections should be filtered out and not included in
+ // the change stream.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ // Creating a view will generate an insert entry on the "system.views" collection.
assert.commandWorked(
testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
- aggCursor = cst.startWatchingAllChangesForCluster();
+ // Drop the "system.views" collection.
assertDropCollection(testDB, "system.views");
+ // Verify that the change stream does not report the insertion into "system.views", and is
+ // not invalidated by dropping the system collection. Instead, it correctly reports the next
+ // write to the test collection.
+ assert.writeOK(collToInvalidate.insert({_id: 0}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()});
+
+ // Test that renaming a "system" collection *does* invalidate the stream if the target of
+ // the rename is a non-system collection.
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
+ assert.writeOK(testDB.system.views.renameCollection("non_system_collection"));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+
+ // Test that renaming a "system" collection to a different "system" collection does not
+ // result in a notification in the change stream.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
+ // Note that the target of the rename must be a valid "system" collection.
+ assert.writeOK(testDB.system.views.renameCollection("system.js"));
+ // Verify that the change stream filters out the rename, instead returning the next insert
+ // to the test collection.
+ assert.writeOK(collToInvalidate.insert({_id: 1}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()});
+
+ // Test that renaming a user collection to a "system" collection *does* invalidate the
+ // stream.
+ assert.writeOK(collToInvalidate.renameCollection("system.views"));
cst.assertNextChangesEqual({
cursor: aggCursor,
expectedChanges: [{operationType: "invalidate"}],
expectInvalidate: true
});
+
+ // Drop the "system.views" collection to avoid view catalog errors in subsequent tests.
+ assertDropCollection(testDB, "system.views");
}
cst.cleanUp();
diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/change_stream_whole_db.js
index c904c4a750e..afdb0166ffd 100644
--- a/jstests/change_streams/change_stream_whole_db.js
+++ b/jstests/change_streams/change_stream_whole_db.js
@@ -45,6 +45,22 @@
};
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+ // Test that the change stream returns an inserted doc on a user-created collection whose name
+ // includes "system" but is not considered an internal collection.
+ const validSystemColls = ["system", "systems.views", "ssystem.views", "test.system"];
+ validSystemColls.forEach(collName => {
+ assert.writeOK(db.getCollection(collName).insert({_id: 0, a: 1}));
+ expected = [
+ {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, a: 1},
+ ns: {db: db.getName(), coll: collName},
+ operationType: "insert",
+ },
+ ];
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ });
+
// Dropping the database should invalidate the change stream.
assert.commandWorked(db.dropDatabase());
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]});
diff --git a/jstests/change_streams/change_stream_whole_db_invalidations.js b/jstests/change_streams/change_stream_whole_db_invalidations.js
index 463359ec839..0bd2b7daae2 100644
--- a/jstests/change_streams/change_stream_whole_db_invalidations.js
+++ b/jstests/change_streams/change_stream_whole_db_invalidations.js
@@ -109,17 +109,61 @@
expectInvalidate: true
});
- // Dropping a 'system' collection should invalidate the change stream.
- // Create a view to ensure that the 'system.views' collection exists.
+ // Drop the database in order to clear the system and non-system test collections before running
+ // the rename tests.
+ assert.commandWorked(testDB.dropDatabase());
+
+ // Operations on internal "system" collections should be filtered out and not included in the
+ // change stream.
+ aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+ // Creating a view will generate an insert entry on the "system.views" collection.
assert.commandWorked(
testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
- aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+ // Drop the "system.views" collection.
assertDropCollection(testDB, "system.views");
+ // Verify that the change stream does not report the insertion into "system.views", and is
+ // not invalidated by dropping the system collection. Instead, it correctly reports the next
+ // write to the test collection.
+ assert.writeOK(coll.insert({_id: 0}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()});
+
+ // Test that renaming a "system" collection *does* invalidate the stream if the target of
+ // the rename is a non-system collection.
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
+ assert.writeOK(testDB.system.views.renameCollection("non_system_collection"));
cst.assertNextChangesEqual({
cursor: aggCursor,
expectedChanges: [{operationType: "invalidate"}],
expectInvalidate: true
});
+ // Test that renaming a "system" collection to a different "system" collection does not
+ // result in a notification in the change stream.
+ aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
+ // Note that the target of the rename must be a valid "system" collection.
+ assert.writeOK(testDB.system.views.renameCollection("system.js"));
+ // Verify that the change stream filters out the rename, instead returning the next insert to
+ // the test collection.
+ assert.writeOK(coll.insert({_id: 1}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()});
+
+ // Test that renaming a user collection to a "system" collection *does* invalidate the stream.
+ assert.writeOK(coll.renameCollection("system.views"));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+
+ // Drop the "system.views" collection to avoid view catalog errors in subsequent tests.
+ assertDropCollection(testDB, "system.views");
+
cst.cleanUp();
}());
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index a9a3d161342..d3d8314cb45 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -285,11 +285,11 @@ std::string DocumentSourceChangeStream::getNsRegexForChangeStream(const Namespac
case ChangeStreamType::kSingleDatabase:
// Match all namespaces that start with db name, followed by ".", then NOT followed by
// '$' or 'system.'
- return "^" + nss.db() + kRegexAllCollections;
+ return "^" + nss.db() + "\\." + kRegexAllCollections;
case ChangeStreamType::kAllChangesForCluster:
// Match all namespaces that start with any db name other than admin, config, or local,
- // followed by ".", then NOT followed by '$' or 'system.'
- return "^" + kRegexAllDBs + kRegexAllCollections;
+ // followed by ".", then NOT followed by '$' or 'system.'.
+ return kRegexAllDBs + "\\." + kRegexAllCollections;
default:
MONGO_UNREACHABLE;
}
@@ -320,18 +320,20 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
BSON("o.create" << nss.coll() << "o.collation" << BSON("$exists" << true)));
}
} else {
- // For change streams on an entire database, the stream is invalidated if any collections in
- // that database are dropped or renamed. For cluster-wide streams, drops or renames of any
- // collection in any database (aside from the internal databases admin, config and local)
- // will invalidate the stream.
- invalidatingCommands.append(BSON("o.drop" << BSON("$exists" << true)));
- invalidatingCommands.append(BSON("o.renameCollection" << BSON("$exists" << true)));
+ // For change streams on an entire database, the stream is invalidated if any non-system
+ // collections in that database are dropped or renamed. For cluster-wide streams, drops or
+ // renames of any non-system collection in any database (aside from the internal databases
+ // admin, config and local) will invalidate the stream.
+ invalidatingCommands.append(BSON("o.drop" << BSONRegEx("^" + kRegexAllCollections)));
+ // Note that 'o.renameCollection' contains the full NamespaceString.
+ invalidatingCommands.append(
+ BSON("o.renameCollection" << BSONRegEx(getNsRegexForChangeStream(nss))));
}
// For cluster-wide $changeStream, match the command namespace of any database other than admin,
// config, or local. Otherwise, match only against the target db's command namespace.
auto cmdNsFilter = (sourceType == ChangeStreamType::kAllChangesForCluster
- ? BSON("ns" << BSONRegEx("^" + kRegexAllDBs + kRegexCmdColl))
+ ? BSON("ns" << BSONRegEx(kRegexAllDBs + "\\." + kRegexCmdColl))
: BSON("ns" << nss.getCommandNS().ns()));
// 1.1) Commands that are on target db(s) and one of the above invalidating commands.
@@ -339,9 +341,7 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
BSON("$and" << BSON_ARRAY(cmdNsFilter << BSON("$or" << invalidatingCommands.arr())));
// 1.2) Supported commands that have arbitrary db namespaces in "ns" field.
- auto renameDropTarget = (sourceType == ChangeStreamType::kAllChangesForCluster
- ? BSON("o.to" << BSON("$exists" << true))
- : BSON("o.to" << nss.ns()));
+ auto renameDropTarget = BSON("o.to" << BSONRegEx(getNsRegexForChangeStream(nss)));
// All supported commands that are either (1.1) or (1.2).
BSONObj commandMatch = BSON("op"
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 0912e79ebd7..9bc9ca54eb5 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -190,9 +190,9 @@ public:
static void checkValueType(const Value v, const StringData fieldName, BSONType expectedType);
private:
- static constexpr StringData kRegexAllCollections = R"(\.(?!(\$|system\.)))"_sd;
- static constexpr StringData kRegexAllDBs = "(?!(admin|config|local)).+"_sd;
- static constexpr StringData kRegexCmdColl = R"(\.\$cmd$)"_sd;
+ static constexpr StringData kRegexAllCollections = R"((?!(\$|system\.)))"_sd;
+ static constexpr StringData kRegexAllDBs = R"(^(?!(admin|config|local)\.)[^.]+)"_sd;
+ static constexpr StringData kRegexCmdColl = R"(\$cmd$)"_sd;
// Helper function which throws if the $changeStream fails any of a series of semantic checks.
// For instance, whether it is permitted to run given the current FCV, whether the namespace is
diff --git a/src/mongo/db/pipeline/document_source_change_stream_test.cpp b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
index 1cb939b573a..441f5c65019 100644
--- a/src/mongo/db/pipeline/document_source_change_stream_test.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream_test.cpp
@@ -1086,6 +1086,32 @@ TEST_F(ChangeStreamStageTest, DocumentKeyShouldNotIncludeShardKeyIfResumeTokenDo
40645);
}
+TEST_F(ChangeStreamStageTest, RenameFromSystemToUserCollectionShouldIncludeNotification) {
+ // Renaming to a non-system collection will include a notification in the stream.
+ NamespaceString systemColl(nss.db() + ".system.users");
+ OplogEntry rename =
+ createCommand(BSON("renameCollection" << systemColl.ns() << "to" << nss.ns()), testUuid());
+ Document expectedInvalidate{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ };
+ checkTransformation(rename, expectedInvalidate);
+}
+
+TEST_F(ChangeStreamStageTest, RenameFromUserToSystemCollectionShouldIncludeNotification) {
+ // Renaming to a system collection will include a notification in the stream.
+ NamespaceString systemColl(nss.db() + ".system.users");
+ OplogEntry rename =
+ createCommand(BSON("renameCollection" << nss.ns() << "to" << systemColl.ns()), testUuid());
+ Document expectedInvalidate{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ };
+ checkTransformation(rename, expectedInvalidate);
+}
+
//
// Test class for change stream of a single database.
//
@@ -1317,20 +1343,46 @@ TEST_F(ChangeStreamStageDBTest, TransformInvalidate) {
checkTransformation(dropDB, expectedInvalidateDropDatabase);
}
-TEST_F(ChangeStreamStageDBTest, SystemCollectionsDropOrRenameShouldInvalidate) {
+TEST_F(ChangeStreamStageDBTest, MatchFiltersOperationsOnSystemCollections) {
+ NamespaceString systemColl(nss.db() + ".system.users");
+ OplogEntry insert = makeOplogEntry(OpTypeEnum::kInsert, systemColl, BSON("_id" << 1));
+ checkTransformation(insert, boost::none);
+
+ OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid());
+ checkTransformation(dropColl, boost::none);
+
+ // Rename from a 'system' collection to another 'system' collection should not include a
+ // notification.
+ NamespaceString renamedSystemColl(nss.db() + ".system.views");
+ OplogEntry rename = createCommand(
+ BSON("renameCollection" << systemColl.ns() << "to" << renamedSystemColl.ns()), testUuid());
+ checkTransformation(rename, boost::none);
+}
+
+TEST_F(ChangeStreamStageDBTest, RenameFromSystemToUserCollectionShouldIncludeNotification) {
+ // Renaming to a non-system collection will include a notification in the stream.
NamespaceString systemColl(nss.db() + ".system.users");
- NamespaceString renamedSystemColl(nss.db() + ".system.users_new");
+ NamespaceString renamedColl(nss.db() + ".non_system_coll");
+ OplogEntry rename = createCommand(
+ BSON("renameCollection" << systemColl.ns() << "to" << renamedColl.ns()), testUuid());
Document expectedInvalidate{
{DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
{DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
{DSChangeStream::kClusterTimeField, kDefaultTs},
};
+ checkTransformation(rename, expectedInvalidate);
+}
- OplogEntry dropColl = createCommand(BSON("drop" << systemColl.coll()), testUuid());
- checkTransformation(dropColl, expectedInvalidate);
-
- OplogEntry rename = createCommand(
- BSON("renameCollection" << systemColl.ns() << "to" << renamedSystemColl.ns()), testUuid());
+TEST_F(ChangeStreamStageDBTest, RenameFromUserToSystemCollectionShouldIncludeNotification) {
+ // Renaming to a system collection will include a notification in the stream.
+ NamespaceString systemColl(nss.db() + ".system.users");
+ OplogEntry rename =
+ createCommand(BSON("renameCollection" << nss.ns() << "to" << systemColl.ns()), testUuid());
+ Document expectedInvalidate{
+ {DSChangeStream::kIdField, makeResumeToken(kDefaultTs, testUuid())},
+ {DSChangeStream::kOperationTypeField, DSChangeStream::kInvalidateOpType},
+ {DSChangeStream::kClusterTimeField, kDefaultTs},
+ };
checkTransformation(rename, expectedInvalidate);
}