summaryrefslogtreecommitdiff
path: root/jstests/change_streams/whole_db_metadata_notifications.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams/whole_db_metadata_notifications.js')
-rw-r--r--jstests/change_streams/whole_db_metadata_notifications.js468
1 files changed, 231 insertions, 237 deletions
diff --git a/jstests/change_streams/whole_db_metadata_notifications.js b/jstests/change_streams/whole_db_metadata_notifications.js
index 54d4b8cc6e2..7b659ff4e12 100644
--- a/jstests/change_streams/whole_db_metadata_notifications.js
+++ b/jstests/change_streams/whole_db_metadata_notifications.js
@@ -3,256 +3,250 @@
// invalidated by a database drop.
// @tags: [do_not_run_in_whole_cluster_passthrough]
(function() {
- "use strict";
-
- load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
- load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
-
- const testDB = db.getSiblingDB(jsTestName());
- testDB.dropDatabase();
- let cst = new ChangeStreamTest(testDB);
-
- // Write a document to the collection and test that the change stream returns it
- // and getMore command closes the cursor afterwards.
- const collName = "test";
- let coll = assertDropAndRecreateCollection(testDB, collName);
-
- let aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
-
- // Create oplog entries of type insert, update, and delete.
- assert.writeOK(coll.insert({_id: 1}));
- assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
- assert.writeOK(coll.remove({_id: 1}));
-
- // Drop and recreate the collection.
- const collAgg = assertDropAndRecreateCollection(testDB, collName);
-
- // We should get 4 oplog entries of type insert, update, delete, and drop.
- let change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "update", tojson(change));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "delete", tojson(change));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "drop", tojson(change));
-
- // Get a valid resume token that the next change stream can use.
- assert.writeOK(collAgg.insert({_id: 1}));
-
- change = cst.getOneChange(aggCursor, false);
- const resumeToken = change._id;
-
- // For whole-db streams, it is possible to resume at a point before a collection is dropped.
- assertDropCollection(testDB, collAgg.getName());
- // Wait for two-phase drop to complete, so that the UUID no longer exists.
- assert.soon(function() {
- return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB,
- collAgg.getName());
- });
- assert.commandWorked(testDB.runCommand(
- {aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}}));
-
- // Test that invalidation entries for other databases are filtered out.
- const otherDB = testDB.getSiblingDB(jsTestName() + "other");
- const otherDBColl = otherDB[collName + "_other"];
- assert.writeOK(otherDBColl.insert({_id: 0}));
-
- // Create collection on the database being watched.
- coll = assertDropAndRecreateCollection(testDB, collName);
-
- // Create the $changeStream. We set 'doNotModifyInPassthroughs' so that this test will not be
- // upconverted to a cluster-wide stream, which would return an entry for the dropped collection
- // in the other database.
- aggCursor = cst.startWatchingChanges(
- {pipeline: [{$changeStream: {}}], collection: 1, doNotModifyInPassthroughs: true});
-
- // Drop the collection on the other database, this should *not* invalidate the change stream.
- assertDropCollection(otherDB, otherDBColl.getName());
-
- // Insert into the collection in the watched database, and verify the change stream is able to
- // pick it up.
- assert.writeOK(coll.insert({_id: 1}));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 1);
-
- // Test that renaming a collection generates a 'rename' entry for the 'from' collection. MongoDB
- // does not allow renaming of sharded collections, so only perform this test if the collection
- // is not sharded.
- if (!FixtureHelpers.isSharded(coll)) {
- assertDropAndRecreateCollection(testDB, coll.getName());
- assertDropCollection(testDB, "renamed_coll");
- aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- assert.writeOK(coll.renameCollection("renamed_coll"));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{
- operationType: "rename",
- ns: {db: testDB.getName(), coll: coll.getName()},
- to: {db: testDB.getName(), coll: "renamed_coll"}
- }]
- });
-
- // Repeat the test, this time using the 'dropTarget' option with an existing target
- // collection.
- coll = testDB["renamed_coll"];
- assertCreateCollection(testDB, collName);
- assert.writeOK(testDB[collName].insert({_id: 0}));
- assert.writeOK(coll.renameCollection(collName, true /* dropTarget */));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [
- {
- operationType: "insert",
- ns: {db: testDB.getName(), coll: collName},
- documentKey: {_id: 0},
- fullDocument: {_id: 0}
- },
- {
- operationType: "rename",
- ns: {db: testDB.getName(), coll: "renamed_coll"},
- to: {db: testDB.getName(), coll: collName}
- }
- ]
- });
-
- coll = testDB[collName];
- // Test renaming a collection from the database being watched to a different database. Do
- // not run this in the mongos passthrough suites since we cannot guarantee the primary shard
- // of the target database, and renameCollection requires the source and destination to be on
- // the same shard.
- if (!FixtureHelpers.isMongos(testDB)) {
- const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target");
- // Create target collection to ensure the database exists.
- const collOtherDB = assertCreateCollection(otherDB, "test");
- assertDropCollection(otherDB, "test");
- assert.commandWorked(testDB.adminCommand(
- {renameCollection: coll.getFullName(), to: collOtherDB.getFullName()}));
- // Rename across databases drops the source collection after the collection is copied
- // over.
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges:
- [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}]
- });
-
- // Test renaming a collection from a different database to the database being watched.
- assert.commandWorked(testDB.adminCommand(
- {renameCollection: collOtherDB.getFullName(), to: coll.getFullName()}));
- // Do not check the 'ns' field since it will contain the namespace of the temp
- // collection created when renaming a collection across databases.
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "rename");
- assert.eq(change.to, {db: testDB.getName(), coll: coll.getName()});
- }
-
- // Test the behavior of a change stream watching the target collection of a $out aggregation
- // stage.
- coll.aggregate([{$out: "renamed_coll"}]);
- // Note that $out will first create a temp collection, and then rename the temp collection
- // to the target. Do not explicitly check the 'ns' field.
- const rename = cst.getOneChange(aggCursor);
- assert.eq(rename.operationType, "rename", tojson(rename));
- assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename));
-
- // The change stream should not be invalidated by the rename(s).
- assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length);
- assert.writeOK(coll.insert({_id: 2}));
- assert.eq(cst.getOneChange(aggCursor).operationType, "insert");
-
- // Drop the new collection to avoid an additional 'drop' notification when the database is
- // dropped.
- assertDropCollection(testDB, "renamed_coll");
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges:
- [{operationType: "drop", ns: {db: testDB.getName(), coll: "renamed_coll"}}],
- });
- }
-
- // Dropping a collection should return a 'drop' entry.
- assertDropCollection(testDB, coll.getName());
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges:
- [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}],
- });
-
- // Operations on internal "system" collections should be filtered out and not included in the
- // change stream.
- aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- // Creating a view will generate an insert entry on the "system.views" collection.
- assert.commandWorked(
- testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
- // Drop the "system.views" collection.
- assertDropCollection(testDB, "system.views");
- // Verify that the change stream does not report the insertion into "system.views", and is
- // not invalidated by dropping the system collection. Instead, it correctly reports the next
- // write to the test collection.
- assert.writeOK(coll.insert({_id: 0}));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()});
-
- // Test that renaming a "system" collection *does* return a notification if the target of
- // the rename is a non-system collection.
- assert.commandWorked(
- testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
- assert.writeOK(testDB.system.views.renameCollection("non_system_collection"));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{
- operationType: "rename",
- ns: {db: testDB.getName(), coll: "system.views"},
- to: {db: testDB.getName(), coll: "non_system_collection"}
- }],
- });
-
- // Test that renaming a "system" collection to a different "system" collection does not
- // result in a notification in the change stream.
+"use strict";
+
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+
+const testDB = db.getSiblingDB(jsTestName());
+testDB.dropDatabase();
+let cst = new ChangeStreamTest(testDB);
+
+// Write a document to the collection and test that the change stream returns it
+// and getMore command closes the cursor afterwards.
+const collName = "test";
+let coll = assertDropAndRecreateCollection(testDB, collName);
+
+let aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+
+// Create oplog entries of type insert, update, and delete.
+assert.writeOK(coll.insert({_id: 1}));
+assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
+assert.writeOK(coll.remove({_id: 1}));
+
+// Drop and recreate the collection.
+const collAgg = assertDropAndRecreateCollection(testDB, collName);
+
+// We should get 4 oplog entries of type insert, update, delete, and drop.
+let change = cst.getOneChange(aggCursor);
+assert.eq(change.operationType, "insert", tojson(change));
+change = cst.getOneChange(aggCursor);
+assert.eq(change.operationType, "update", tojson(change));
+change = cst.getOneChange(aggCursor);
+assert.eq(change.operationType, "delete", tojson(change));
+change = cst.getOneChange(aggCursor);
+assert.eq(change.operationType, "drop", tojson(change));
+
+// Get a valid resume token that the next change stream can use.
+assert.writeOK(collAgg.insert({_id: 1}));
+
+change = cst.getOneChange(aggCursor, false);
+const resumeToken = change._id;
+
+// For whole-db streams, it is possible to resume at a point before a collection is dropped.
+assertDropCollection(testDB, collAgg.getName());
+// Wait for two-phase drop to complete, so that the UUID no longer exists.
+assert.soon(function() {
+ return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB, collAgg.getName());
+});
+assert.commandWorked(testDB.runCommand(
+ {aggregate: 1, pipeline: [{$changeStream: {resumeAfter: resumeToken}}], cursor: {}}));
+
+// Test that invalidation entries for other databases are filtered out.
+const otherDB = testDB.getSiblingDB(jsTestName() + "other");
+const otherDBColl = otherDB[collName + "_other"];
+assert.writeOK(otherDBColl.insert({_id: 0}));
+
+// Create collection on the database being watched.
+coll = assertDropAndRecreateCollection(testDB, collName);
+
+// Create the $changeStream. We set 'doNotModifyInPassthroughs' so that this test will not be
+// upconverted to a cluster-wide stream, which would return an entry for the dropped collection
+// in the other database.
+aggCursor = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: 1, doNotModifyInPassthroughs: true});
+
+// Drop the collection on the other database, this should *not* invalidate the change stream.
+assertDropCollection(otherDB, otherDBColl.getName());
+
+// Insert into the collection in the watched database, and verify the change stream is able to
+// pick it up.
+assert.writeOK(coll.insert({_id: 1}));
+change = cst.getOneChange(aggCursor);
+assert.eq(change.operationType, "insert", tojson(change));
+assert.eq(change.documentKey._id, 1);
+
+// Test that renaming a collection generates a 'rename' entry for the 'from' collection. MongoDB
+// does not allow renaming of sharded collections, so only perform this test if the collection
+// is not sharded.
+if (!FixtureHelpers.isSharded(coll)) {
+ assertDropAndRecreateCollection(testDB, coll.getName());
+ assertDropCollection(testDB, "renamed_coll");
aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- assert.commandWorked(
- testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
- // Note that the target of the rename must be a valid "system" collection.
- assert.writeOK(testDB.system.views.renameCollection("system.users"));
- // Verify that the change stream filters out the rename above, instead returning the next insert
- // to the test collection.
- assert.writeOK(coll.insert({_id: 1}));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()});
-
- // Test that renaming a user collection to a "system" collection *is* returned in the change
- // stream.
- assert.writeOK(coll.renameCollection("system.views"));
+ assert.writeOK(coll.renameCollection("renamed_coll"));
cst.assertNextChangesEqual({
cursor: aggCursor,
expectedChanges: [{
operationType: "rename",
ns: {db: testDB.getName(), coll: coll.getName()},
- to: {db: testDB.getName(), coll: "system.views"}
- }],
+ to: {db: testDB.getName(), coll: "renamed_coll"}
+ }]
});
- // Drop the "system.views" collection to avoid view catalog errors in subsequent tests.
- assertDropCollection(testDB, "system.views");
- assertDropCollection(testDB, "non_system_collection");
+ // Repeat the test, this time using the 'dropTarget' option with an existing target
+ // collection.
+ coll = testDB["renamed_coll"];
+ assertCreateCollection(testDB, collName);
+ assert.writeOK(testDB[collName].insert({_id: 0}));
+ assert.writeOK(coll.renameCollection(collName, true /* dropTarget */));
cst.assertNextChangesEqual({
cursor: aggCursor,
expectedChanges: [
- {operationType: "drop", ns: {db: testDB.getName(), coll: "non_system_collection"}},
+ {
+ operationType: "insert",
+ ns: {db: testDB.getName(), coll: collName},
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0}
+ },
+ {
+ operationType: "rename",
+ ns: {db: testDB.getName(), coll: "renamed_coll"},
+ to: {db: testDB.getName(), coll: collName}
+ }
]
});
- // Dropping the database should generate a 'dropDatabase' notification followed by an
- // 'invalidate'.
- assert.commandWorked(testDB.dropDatabase());
- cst.assertDatabaseDrop({cursor: aggCursor, db: testDB});
- cst.assertNextChangesEqual(
- {cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}]});
+ coll = testDB[collName];
+ // Test renaming a collection from the database being watched to a different database. Do
+ // not run this in the mongos passthrough suites since we cannot guarantee the primary shard
+ // of the target database, and renameCollection requires the source and destination to be on
+ // the same shard.
+ if (!FixtureHelpers.isMongos(testDB)) {
+ const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target");
+ // Create target collection to ensure the database exists.
+ const collOtherDB = assertCreateCollection(otherDB, "test");
+ assertDropCollection(otherDB, "test");
+ assert.commandWorked(testDB.adminCommand(
+ {renameCollection: coll.getFullName(), to: collOtherDB.getFullName()}));
+ // Rename across databases drops the source collection after the collection is copied
+ // over.
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges:
+ [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}]
+ });
+
+ // Test renaming a collection from a different database to the database being watched.
+ assert.commandWorked(testDB.adminCommand(
+ {renameCollection: collOtherDB.getFullName(), to: coll.getFullName()}));
+ // Do not check the 'ns' field since it will contain the namespace of the temp
+ // collection created when renaming a collection across databases.
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "rename");
+ assert.eq(change.to, {db: testDB.getName(), coll: coll.getName()});
+ }
- cst.cleanUp();
+ // Test the behavior of a change stream watching the target collection of a $out aggregation
+ // stage.
+ coll.aggregate([{$out: "renamed_coll"}]);
+ // Note that $out will first create a temp collection, and then rename the temp collection
+ // to the target. Do not explicitly check the 'ns' field.
+ const rename = cst.getOneChange(aggCursor);
+ assert.eq(rename.operationType, "rename", tojson(rename));
+ assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename));
+
+ // The change stream should not be invalidated by the rename(s).
+ assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length);
+ assert.writeOK(coll.insert({_id: 2}));
+ assert.eq(cst.getOneChange(aggCursor).operationType, "insert");
+
+ // Drop the new collection to avoid an additional 'drop' notification when the database is
+ // dropped.
+ assertDropCollection(testDB, "renamed_coll");
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges:
+ [{operationType: "drop", ns: {db: testDB.getName(), coll: "renamed_coll"}}],
+ });
+}
+
+// Dropping a collection should return a 'drop' entry.
+assertDropCollection(testDB, coll.getName());
+cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}],
+});
+
+// Operations on internal "system" collections should be filtered out and not included in the
+// change stream.
+aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+// Creating a view will generate an insert entry on the "system.views" collection.
+assert.commandWorked(testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
+// Drop the "system.views" collection.
+assertDropCollection(testDB, "system.views");
+// Verify that the change stream does not report the insertion into "system.views", and is
+// not invalidated by dropping the system collection. Instead, it correctly reports the next
+// write to the test collection.
+assert.writeOK(coll.insert({_id: 0}));
+change = cst.getOneChange(aggCursor);
+assert.eq(change.operationType, "insert", tojson(change));
+assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()});
+
+// Test that renaming a "system" collection *does* return a notification if the target of
+// the rename is a non-system collection.
+assert.commandWorked(testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
+assert.writeOK(testDB.system.views.renameCollection("non_system_collection"));
+cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{
+ operationType: "rename",
+ ns: {db: testDB.getName(), coll: "system.views"},
+ to: {db: testDB.getName(), coll: "non_system_collection"}
+ }],
+});
+
+// Test that renaming a "system" collection to a different "system" collection does not
+// result in a notification in the change stream.
+aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+assert.commandWorked(testDB.runCommand({create: "view1", viewOn: coll.getName(), pipeline: []}));
+// Note that the target of the rename must be a valid "system" collection.
+assert.writeOK(testDB.system.views.renameCollection("system.users"));
+// Verify that the change stream filters out the rename above, instead returning the next insert
+// to the test collection.
+assert.writeOK(coll.insert({_id: 1}));
+change = cst.getOneChange(aggCursor);
+assert.eq(change.operationType, "insert", tojson(change));
+assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()});
+
+// Test that renaming a user collection to a "system" collection *is* returned in the change
+// stream.
+assert.writeOK(coll.renameCollection("system.views"));
+cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{
+ operationType: "rename",
+ ns: {db: testDB.getName(), coll: coll.getName()},
+ to: {db: testDB.getName(), coll: "system.views"}
+ }],
+});
+
+// Drop the "system.views" collection to avoid view catalog errors in subsequent tests.
+assertDropCollection(testDB, "system.views");
+assertDropCollection(testDB, "non_system_collection");
+cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [
+ {operationType: "drop", ns: {db: testDB.getName(), coll: "non_system_collection"}},
+ ]
+});
+
+// Dropping the database should generate a 'dropDatabase' notification followed by an
+// 'invalidate'.
+assert.commandWorked(testDB.dropDatabase());
+cst.assertDatabaseDrop({cursor: aggCursor, db: testDB});
+cst.assertNextChangesEqual({cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}]});
+
+cst.cleanUp();
}());