summaryrefslogtreecommitdiff
path: root/jstests/change_streams/whole_cluster_metadata_notifications.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams/whole_cluster_metadata_notifications.js')
-rw-r--r--jstests/change_streams/whole_cluster_metadata_notifications.js466
1 files changed, 231 insertions, 235 deletions
diff --git a/jstests/change_streams/whole_cluster_metadata_notifications.js b/jstests/change_streams/whole_cluster_metadata_notifications.js
index ec7da470842..9a9d8c6efd5 100644
--- a/jstests/change_streams/whole_cluster_metadata_notifications.js
+++ b/jstests/change_streams/whole_cluster_metadata_notifications.js
@@ -1,280 +1,276 @@
// Tests of metadata notifications for a $changeStream on a whole cluster.
(function() {
- "use strict";
+"use strict";
- load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
- load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
- // Define two databases. We will conduct our tests by creating one collection in each.
- const testDB1 = db.getSiblingDB(jsTestName()),
- testDB2 = db.getSiblingDB(jsTestName() + "_other");
- const adminDB = db.getSiblingDB("admin");
+// Define two databases. We will conduct our tests by creating one collection in each.
+const testDB1 = db.getSiblingDB(jsTestName()), testDB2 = db.getSiblingDB(jsTestName() + "_other");
+const adminDB = db.getSiblingDB("admin");
- assert.commandWorked(testDB1.dropDatabase());
- assert.commandWorked(testDB2.dropDatabase());
+assert.commandWorked(testDB1.dropDatabase());
+assert.commandWorked(testDB2.dropDatabase());
- // Create one collection on each database.
- let [db1Coll, db2Coll] =
- [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test"));
+// Create one collection on each database.
+let [db1Coll, db2Coll] =
+ [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test"));
- // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened
- // on admin.
- let cst = new ChangeStreamTest(adminDB);
- let aggCursor = cst.startWatchingAllChangesForCluster();
+// Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened
+// on admin.
+let cst = new ChangeStreamTest(adminDB);
+let aggCursor = cst.startWatchingAllChangesForCluster();
- // Generate oplog entries of type insert, update, and delete across both databases.
- for (let coll of[db1Coll, db2Coll]) {
- assert.writeOK(coll.insert({_id: 1}));
- assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
- assert.writeOK(coll.remove({_id: 1}));
- }
+// Generate oplog entries of type insert, update, and delete across both databases.
+for (let coll of [db1Coll, db2Coll]) {
+ assert.writeOK(coll.insert({_id: 1}));
+ assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
+ assert.writeOK(coll.remove({_id: 1}));
+}
- // Drop the second database, which should generate a 'drop' entry for the collection followed
- // by a 'dropDatabase' entry.
- assert.commandWorked(testDB2.dropDatabase());
+// Drop the second database, which should generate a 'drop' entry for the collection followed
+// by a 'dropDatabase' entry.
+assert.commandWorked(testDB2.dropDatabase());
- // We should get 6 oplog entries; three ops of type insert, update, delete from each database.
- for (let expectedDB of[testDB1, testDB2]) {
- let change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "update", tojson(change));
- assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "delete", tojson(change));
- assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
- }
- cst.assertDatabaseDrop({cursor: aggCursor, db: testDB2});
+// We should get 6 oplog entries; three ops of type insert, update, delete from each database.
+for (let expectedDB of [testDB1, testDB2]) {
+ let change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "update", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "delete", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+}
+cst.assertDatabaseDrop({cursor: aggCursor, db: testDB2});
- // Test that a cluster-wide change stream can be resumed using a token from a collection which
- // has been dropped.
- db1Coll = assertDropAndRecreateCollection(testDB1, db1Coll.getName());
+// Test that a cluster-wide change stream can be resumed using a token from a collection which
+// has been dropped.
+db1Coll = assertDropAndRecreateCollection(testDB1, db1Coll.getName());
- // Get a valid resume token that the next change stream can use.
- aggCursor = cst.startWatchingAllChangesForCluster();
+// Get a valid resume token that the next change stream can use.
+aggCursor = cst.startWatchingAllChangesForCluster();
- assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
- let change = cst.getOneChange(aggCursor, false);
- const resumeToken = change._id;
-
- // For cluster-wide streams, it is possible to resume at a point before a collection is dropped,
- // even if the "drop" notification has not been received on the original stream yet.
- assertDropCollection(db1Coll, db1Coll.getName());
- // Wait for two-phase drop to complete, so that the UUID no longer exists.
- assert.soon(function() {
- return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1,
- db1Coll.getName());
- });
- assert.commandWorked(adminDB.runCommand({
- aggregate: 1,
- pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}],
- cursor: {}
- }));
-
- // Test that collection drops from any database result in "drop" notifications for the stream.
- [db1Coll, db2Coll] =
- [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test"));
- let _idForTest = 0;
- for (let collToInvalidate of[db1Coll, db2Coll]) {
- // Start watching all changes in the cluster.
- aggCursor = cst.startWatchingAllChangesForCluster();
-
- let testDB = collToInvalidate.getDB();
-
- // Insert into the collections on both databases, and verify the change stream is able to
- // pick them up.
- for (let collToWrite of[db1Coll, db2Coll]) {
- assert.writeOK(collToWrite.insert({_id: _idForTest}));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, _idForTest);
- assert.eq(change.ns.db, collToWrite.getDB().getName());
- _idForTest++;
- }
-
- // Renaming the collection should generate a 'rename' notification. Skip this test when
- // running on a sharded collection, since these cannot be renamed.
- if (!FixtureHelpers.isSharded(collToInvalidate)) {
- assertDropAndRecreateCollection(testDB, collToInvalidate.getName());
- const collName = collToInvalidate.getName();
-
- // Start watching all changes in the cluster.
- aggCursor = cst.startWatchingAllChangesForCluster();
- assert.writeOK(collToInvalidate.renameCollection("renamed_coll"));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [
- {
- operationType: "rename",
- ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
- to: {db: testDB.getName(), coll: "renamed_coll"}
- },
- ]
- });
+let change = cst.getOneChange(aggCursor, false);
+const resumeToken = change._id;
- // Repeat the test, this time using the 'dropTarget' option with an existing target
- // collection.
- collToInvalidate = testDB.getCollection("renamed_coll");
- assertDropAndRecreateCollection(testDB, collName);
- assert.writeOK(testDB[collName].insert({_id: 0}));
- assert.writeOK(collToInvalidate.renameCollection(collName, true /* dropTarget */));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [
- {
- operationType: "insert",
- ns: {db: testDB.getName(), coll: collName},
- documentKey: {_id: 0},
- fullDocument: {_id: 0}
- },
- {
- operationType: "rename",
- ns: {db: testDB.getName(), coll: "renamed_coll"},
- to: {db: testDB.getName(), coll: collName}
- }
- ]
- });
+// For cluster-wide streams, it is possible to resume at a point before a collection is dropped,
+// even if the "drop" notification has not been received on the original stream yet.
+assertDropCollection(db1Coll, db1Coll.getName());
+// Wait for two-phase drop to complete, so that the UUID no longer exists.
+assert.soon(function() {
+ return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1,
+ db1Coll.getName());
+});
+assert.commandWorked(adminDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}],
+ cursor: {}
+}));
- collToInvalidate = testDB[collName];
+// Test that collection drops from any database result in "drop" notifications for the stream.
+[db1Coll, db2Coll] =
+ [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test"));
+let _idForTest = 0;
+for (let collToInvalidate of [db1Coll, db2Coll]) {
+ // Start watching all changes in the cluster.
+ aggCursor = cst.startWatchingAllChangesForCluster();
- // Test renaming a collection to a different database. Do not run this in the mongos
- // passthrough suites since we cannot guarantee the primary shard of the target database
- // and renameCollection requires the source and destination to be on the same shard.
- if (!FixtureHelpers.isMongos(testDB)) {
- const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target");
- // Ensure the target database exists.
- const collOtherDB = assertDropAndRecreateCollection(otherDB, "test");
- assertDropCollection(otherDB, collOtherDB.getName());
- aggCursor = cst.startWatchingAllChangesForCluster();
- assert.commandWorked(testDB.adminCommand({
- renameCollection: collToInvalidate.getFullName(),
- to: collOtherDB.getFullName()
- }));
- // Do not check the 'ns' field since it will contain the namespace of the temp
- // collection created when renaming a collection across databases.
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "rename", tojson(change));
- assert.eq(change.to,
- {db: otherDB.getName(), coll: collOtherDB.getName()},
- tojson(change));
- // Rename across databases also drops the source collection after the collection is
- // copied over.
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{
- operationType: "drop",
- ns: {db: testDB.getName(), coll: collToInvalidate.getName()}
- }]
- });
- }
+ let testDB = collToInvalidate.getDB();
- // Test the behavior of a change stream watching the target collection of a $out
- // aggregation stage.
- collToInvalidate.aggregate([{$out: "renamed_coll"}]);
- // Do not check the 'ns' field since it will contain the namespace of the temp
- // collection created by the $out stage, before renaming to 'renamed_coll'.
- const rename = cst.getOneChange(aggCursor);
- assert.eq(rename.operationType, "rename", tojson(rename));
- assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename));
+ // Insert into the collections on both databases, and verify the change stream is able to
+ // pick them up.
+ for (let collToWrite of [db1Coll, db2Coll]) {
+ assert.writeOK(collToWrite.insert({_id: _idForTest}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, _idForTest);
+ assert.eq(change.ns.db, collToWrite.getDB().getName());
+ _idForTest++;
+ }
- // The change stream should not be invalidated by the rename(s).
- assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length);
- assert.writeOK(collToInvalidate.insert({_id: 2}));
- assert.eq(cst.getOneChange(aggCursor).operationType, "insert");
+ // Renaming the collection should generate a 'rename' notification. Skip this test when
+ // running on a sharded collection, since these cannot be renamed.
+ if (!FixtureHelpers.isSharded(collToInvalidate)) {
+ assertDropAndRecreateCollection(testDB, collToInvalidate.getName());
+ const collName = collToInvalidate.getName();
- // Test that renaming a "system" collection to a user collection *does* return a rename
- // notification.
- assert.commandWorked(testDB.runCommand(
- {create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
- assert.writeOK(testDB.system.views.renameCollection("non_system_collection"));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{
+ // Start watching all changes in the cluster.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ assert.writeOK(collToInvalidate.renameCollection("renamed_coll"));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [
+ {
operationType: "rename",
- ns: {db: testDB.getName(), coll: "system.views"},
- to: {db: testDB.getName(), coll: "non_system_collection"}
- }],
- });
-
- // Test that renaming a "system" collection to a different "system" collection does not
- // result in a notification in the change stream.
- aggCursor = cst.startWatchingAllChangesForCluster();
- assert.commandWorked(testDB.runCommand(
- {create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
- // Note that the target of the rename must be a valid "system" collection.
- assert.writeOK(testDB.system.views.renameCollection("system.users"));
- // Verify that the change stream filters out the rename above, instead returning the
- // next insert to the test collection.
- assert.writeOK(collToInvalidate.insert({_id: 1}));
- change = cst.getOneChange(aggCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()});
+ ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
+ to: {db: testDB.getName(), coll: "renamed_coll"}
+ },
+ ]
+ });
- // Test that renaming a user collection to a "system" collection *does* return a rename
- // notification.
- assert.writeOK(collToInvalidate.renameCollection("system.views"));
- cst.assertNextChangesEqual({
- cursor: aggCursor,
- expectedChanges: [{
+ // Repeat the test, this time using the 'dropTarget' option with an existing target
+ // collection.
+ collToInvalidate = testDB.getCollection("renamed_coll");
+ assertDropAndRecreateCollection(testDB, collName);
+ assert.writeOK(testDB[collName].insert({_id: 0}));
+ assert.writeOK(collToInvalidate.renameCollection(collName, true /* dropTarget */));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [
+ {
+ operationType: "insert",
+ ns: {db: testDB.getName(), coll: collName},
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0}
+ },
+ {
operationType: "rename",
- ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
- to: {db: testDB.getName(), coll: "system.views"}
- }],
- });
+ ns: {db: testDB.getName(), coll: "renamed_coll"},
+ to: {db: testDB.getName(), coll: collName}
+ }
+ ]
+ });
- // Drop the "system.views" collection to avoid view catalog errors in subsequent tests.
- assertDropCollection(testDB, "system.views");
+ collToInvalidate = testDB[collName];
- // Recreate the test collection for the remainder of the test.
- assert.writeOK(collToInvalidate.insert({_id: 0}));
+ // Test renaming a collection to a different database. Do not run this in the mongos
+ // passthrough suites since we cannot guarantee the primary shard of the target database
+ // and renameCollection requires the source and destination to be on the same shard.
+ if (!FixtureHelpers.isMongos(testDB)) {
+ const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target");
+ // Ensure the target database exists.
+ const collOtherDB = assertDropAndRecreateCollection(otherDB, "test");
+ assertDropCollection(otherDB, collOtherDB.getName());
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ assert.commandWorked(testDB.adminCommand(
+ {renameCollection: collToInvalidate.getFullName(), to: collOtherDB.getFullName()}));
+ // Do not check the 'ns' field since it will contain the namespace of the temp
+ // collection created when renaming a collection across databases.
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "rename", tojson(change));
+ assert.eq(
+ change.to, {db: otherDB.getName(), coll: collOtherDB.getName()}, tojson(change));
+ // Rename across databases also drops the source collection after the collection is
+ // copied over.
cst.assertNextChangesEqual({
cursor: aggCursor,
expectedChanges: [{
- operationType: "insert",
- ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
- documentKey: {_id: 0},
- fullDocument: {_id: 0}
+ operationType: "drop",
+ ns: {db: testDB.getName(), coll: collToInvalidate.getName()}
}]
});
}
- // Dropping a collection should generate a 'drop' entry.
- assertDropCollection(testDB, collToInvalidate.getName());
- // Insert to the test collection to queue up another change after the drop. This is needed
- // since the number of 'drop' notifications is not deterministic in the sharded passthrough
- // suites.
- assert.writeOK(collToInvalidate.insert({_id: 0}));
- cst.consumeDropUpTo({
+ // Test the behavior of a change stream watching the target collection of a $out
+ // aggregation stage.
+ collToInvalidate.aggregate([{$out: "renamed_coll"}]);
+ // Do not check the 'ns' field since it will contain the namespace of the temp
+ // collection created by the $out stage, before renaming to 'renamed_coll'.
+ const rename = cst.getOneChange(aggCursor);
+ assert.eq(rename.operationType, "rename", tojson(rename));
+ assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename));
+
+ // The change stream should not be invalidated by the rename(s).
+ assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length);
+ assert.writeOK(collToInvalidate.insert({_id: 2}));
+ assert.eq(cst.getOneChange(aggCursor).operationType, "insert");
+
+ // Test that renaming a "system" collection to a user collection *does* return a rename
+ // notification.
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
+ assert.writeOK(testDB.system.views.renameCollection("non_system_collection"));
+ cst.assertNextChangesEqual({
cursor: aggCursor,
- dropType: "drop",
- expectedNext: {
- documentKey: {_id: 0},
- fullDocument: {_id: 0},
- ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
- operationType: "insert",
- },
+ expectedChanges: [{
+ operationType: "rename",
+ ns: {db: testDB.getName(), coll: "system.views"},
+ to: {db: testDB.getName(), coll: "non_system_collection"}
+ }],
});
- // Operations on internal "system" collections should be filtered out and not included in
- // the change stream.
+ // Test that renaming a "system" collection to a different "system" collection does not
+ // result in a notification in the change stream.
aggCursor = cst.startWatchingAllChangesForCluster();
- // Creating a view will generate an insert entry on the "system.views" collection.
assert.commandWorked(
testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
- // Drop the "system.views" collection.
- assertDropCollection(testDB, "system.views");
- // Verify that the change stream does not report the insertion into "system.views", and is
- // not invalidated by dropping the system collection. Instead, it correctly reports the next
- // write to the test collection.
+ // Note that the target of the rename must be a valid "system" collection.
+ assert.writeOK(testDB.system.views.renameCollection("system.users"));
+ // Verify that the change stream filters out the rename above, instead returning the
+ // next insert to the test collection.
assert.writeOK(collToInvalidate.insert({_id: 1}));
change = cst.getOneChange(aggCursor);
assert.eq(change.operationType, "insert", tojson(change));
assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()});
+
+ // Test that renaming a user collection to a "system" collection *does* return a rename
+ // notification.
+ assert.writeOK(collToInvalidate.renameCollection("system.views"));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{
+ operationType: "rename",
+ ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
+ to: {db: testDB.getName(), coll: "system.views"}
+ }],
+ });
+
+ // Drop the "system.views" collection to avoid view catalog errors in subsequent tests.
+ assertDropCollection(testDB, "system.views");
+
+ // Recreate the test collection for the remainder of the test.
+ assert.writeOK(collToInvalidate.insert({_id: 0}));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{
+ operationType: "insert",
+ ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0}
+ }]
+ });
}
- cst.cleanUp();
+ // Dropping a collection should generate a 'drop' entry.
+ assertDropCollection(testDB, collToInvalidate.getName());
+ // Insert to the test collection to queue up another change after the drop. This is needed
+ // since the number of 'drop' notifications is not deterministic in the sharded passthrough
+ // suites.
+ assert.writeOK(collToInvalidate.insert({_id: 0}));
+ cst.consumeDropUpTo({
+ cursor: aggCursor,
+ dropType: "drop",
+ expectedNext: {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0},
+ ns: {db: testDB.getName(), coll: collToInvalidate.getName()},
+ operationType: "insert",
+ },
+ });
+
+ // Operations on internal "system" collections should be filtered out and not included in
+ // the change stream.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ // Creating a view will generate an insert entry on the "system.views" collection.
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
+ // Drop the "system.views" collection.
+ assertDropCollection(testDB, "system.views");
+ // Verify that the change stream does not report the insertion into "system.views", and is
+ // not invalidated by dropping the system collection. Instead, it correctly reports the next
+ // write to the test collection.
+ assert.writeOK(collToInvalidate.insert({_id: 1}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()});
+}
+
+cst.cleanUp();
}());