diff options
author | clang-format-7.0.1 <adam.martin@10gen.com> | 2019-07-26 18:20:35 -0400 |
---|---|---|
committer | ADAM David Alan Martin <adam.martin@10gen.com> | 2019-07-27 11:02:23 -0400 |
commit | 134a4083953270e8a11430395357fb70a29047ad (patch) | |
tree | dd428e1230e31d92b20b393dfdc17ffe7fa79cb6 /jstests/change_streams/whole_cluster_metadata_notifications.js | |
parent | 1e46b5049003f427047e723ea5fab15b5a9253ca (diff) | |
download | mongo-134a4083953270e8a11430395357fb70a29047ad.tar.gz |
SERVER-41772 Apply clang-format 7.0.1 to the codebase
Diffstat (limited to 'jstests/change_streams/whole_cluster_metadata_notifications.js')
-rw-r--r-- | jstests/change_streams/whole_cluster_metadata_notifications.js | 466 |
1 files changed, 231 insertions, 235 deletions
diff --git a/jstests/change_streams/whole_cluster_metadata_notifications.js b/jstests/change_streams/whole_cluster_metadata_notifications.js index ec7da470842..9a9d8c6efd5 100644 --- a/jstests/change_streams/whole_cluster_metadata_notifications.js +++ b/jstests/change_streams/whole_cluster_metadata_notifications.js @@ -1,280 +1,276 @@ // Tests of metadata notifications for a $changeStream on a whole cluster. (function() { - "use strict"; +"use strict"; - load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. - load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. +load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. +load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. - // Define two databases. We will conduct our tests by creating one collection in each. - const testDB1 = db.getSiblingDB(jsTestName()), - testDB2 = db.getSiblingDB(jsTestName() + "_other"); - const adminDB = db.getSiblingDB("admin"); +// Define two databases. We will conduct our tests by creating one collection in each. +const testDB1 = db.getSiblingDB(jsTestName()), testDB2 = db.getSiblingDB(jsTestName() + "_other"); +const adminDB = db.getSiblingDB("admin"); - assert.commandWorked(testDB1.dropDatabase()); - assert.commandWorked(testDB2.dropDatabase()); +assert.commandWorked(testDB1.dropDatabase()); +assert.commandWorked(testDB2.dropDatabase()); - // Create one collection on each database. - let [db1Coll, db2Coll] = - [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test")); +// Create one collection on each database. +let [db1Coll, db2Coll] = + [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test")); - // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened - // on admin. - let cst = new ChangeStreamTest(adminDB); - let aggCursor = cst.startWatchingAllChangesForCluster(); +// Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened +// on admin. +let cst = new ChangeStreamTest(adminDB); +let aggCursor = cst.startWatchingAllChangesForCluster(); - // Generate oplog entries of type insert, update, and delete across both databases. - for (let coll of[db1Coll, db2Coll]) { - assert.writeOK(coll.insert({_id: 1})); - assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}})); - assert.writeOK(coll.remove({_id: 1})); - } +// Generate oplog entries of type insert, update, and delete across both databases. +for (let coll of [db1Coll, db2Coll]) { + assert.writeOK(coll.insert({_id: 1})); + assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}})); + assert.writeOK(coll.remove({_id: 1})); +} - // Drop the second database, which should generate a 'drop' entry for the collection followed - // by a 'dropDatabase' entry. - assert.commandWorked(testDB2.dropDatabase()); +// Drop the second database, which should generate a 'drop' entry for the collection followed +// by a 'dropDatabase' entry. +assert.commandWorked(testDB2.dropDatabase()); - // We should get 6 oplog entries; three ops of type insert, update, delete from each database. - for (let expectedDB of[testDB1, testDB2]) { - let change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "update", tojson(change)); - assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "delete", tojson(change)); - assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); - } - cst.assertDatabaseDrop({cursor: aggCursor, db: testDB2}); +// We should get 6 oplog entries; three ops of type insert, update, delete from each database. +for (let expectedDB of [testDB1, testDB2]) { + let change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "update", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "delete", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); +} +cst.assertDatabaseDrop({cursor: aggCursor, db: testDB2}); - // Test that a cluster-wide change stream can be resumed using a token from a collection which - // has been dropped. - db1Coll = assertDropAndRecreateCollection(testDB1, db1Coll.getName()); +// Test that a cluster-wide change stream can be resumed using a token from a collection which +// has been dropped. +db1Coll = assertDropAndRecreateCollection(testDB1, db1Coll.getName()); - // Get a valid resume token that the next change stream can use. - aggCursor = cst.startWatchingAllChangesForCluster(); +// Get a valid resume token that the next change stream can use. +aggCursor = cst.startWatchingAllChangesForCluster(); - assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); +assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); - let change = cst.getOneChange(aggCursor, false); - const resumeToken = change._id; - - // For cluster-wide streams, it is possible to resume at a point before a collection is dropped, - // even if the "drop" notification has not been received on the original stream yet. - assertDropCollection(db1Coll, db1Coll.getName()); - // Wait for two-phase drop to complete, so that the UUID no longer exists. - assert.soon(function() { - return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1, - db1Coll.getName()); - }); - assert.commandWorked(adminDB.runCommand({ - aggregate: 1, - pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}], - cursor: {} - })); - - // Test that collection drops from any database result in "drop" notifications for the stream. - [db1Coll, db2Coll] = - [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test")); - let _idForTest = 0; - for (let collToInvalidate of[db1Coll, db2Coll]) { - // Start watching all changes in the cluster. - aggCursor = cst.startWatchingAllChangesForCluster(); - - let testDB = collToInvalidate.getDB(); - - // Insert into the collections on both databases, and verify the change stream is able to - // pick them up. - for (let collToWrite of[db1Coll, db2Coll]) { - assert.writeOK(collToWrite.insert({_id: _idForTest})); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, _idForTest); - assert.eq(change.ns.db, collToWrite.getDB().getName()); - _idForTest++; - } - - // Renaming the collection should generate a 'rename' notification. Skip this test when - // running on a sharded collection, since these cannot be renamed. - if (!FixtureHelpers.isSharded(collToInvalidate)) { - assertDropAndRecreateCollection(testDB, collToInvalidate.getName()); - const collName = collToInvalidate.getName(); - - // Start watching all changes in the cluster. - aggCursor = cst.startWatchingAllChangesForCluster(); - assert.writeOK(collToInvalidate.renameCollection("renamed_coll")); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [ - { - operationType: "rename", - ns: {db: testDB.getName(), coll: collToInvalidate.getName()}, - to: {db: testDB.getName(), coll: "renamed_coll"} - }, - ] - }); +let change = cst.getOneChange(aggCursor, false); +const resumeToken = change._id; - // Repeat the test, this time using the 'dropTarget' option with an existing target - // collection. - collToInvalidate = testDB.getCollection("renamed_coll"); - assertDropAndRecreateCollection(testDB, collName); - assert.writeOK(testDB[collName].insert({_id: 0})); - assert.writeOK(collToInvalidate.renameCollection(collName, true /* dropTarget */)); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [ - { - operationType: "insert", - ns: {db: testDB.getName(), coll: collName}, - documentKey: {_id: 0}, - fullDocument: {_id: 0} - }, - { - operationType: "rename", - ns: {db: testDB.getName(), coll: "renamed_coll"}, - to: {db: testDB.getName(), coll: collName} - } - ] - }); +// For cluster-wide streams, it is possible to resume at a point before a collection is dropped, +// even if the "drop" notification has not been received on the original stream yet. +assertDropCollection(db1Coll, db1Coll.getName()); +// Wait for two-phase drop to complete, so that the UUID no longer exists. +assert.soon(function() { + return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1, + db1Coll.getName()); +}); +assert.commandWorked(adminDB.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}], + cursor: {} +})); - collToInvalidate = testDB[collName]; +// Test that collection drops from any database result in "drop" notifications for the stream. +[db1Coll, db2Coll] = + [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, "test")); +let _idForTest = 0; +for (let collToInvalidate of [db1Coll, db2Coll]) { + // Start watching all changes in the cluster. + aggCursor = cst.startWatchingAllChangesForCluster(); - // Test renaming a collection to a different database. Do not run this in the mongos - // passthrough suites since we cannot guarantee the primary shard of the target database - // and renameCollection requires the source and destination to be on the same shard. - if (!FixtureHelpers.isMongos(testDB)) { - const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target"); - // Ensure the target database exists. - const collOtherDB = assertDropAndRecreateCollection(otherDB, "test"); - assertDropCollection(otherDB, collOtherDB.getName()); - aggCursor = cst.startWatchingAllChangesForCluster(); - assert.commandWorked(testDB.adminCommand({ - renameCollection: collToInvalidate.getFullName(), - to: collOtherDB.getFullName() - })); - // Do not check the 'ns' field since it will contain the namespace of the temp - // collection created when renaming a collection across databases. - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "rename", tojson(change)); - assert.eq(change.to, - {db: otherDB.getName(), coll: collOtherDB.getName()}, - tojson(change)); - // Rename across databases also drops the source collection after the collection is - // copied over. - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{ - operationType: "drop", - ns: {db: testDB.getName(), coll: collToInvalidate.getName()} - }] - }); - } + let testDB = collToInvalidate.getDB(); - // Test the behavior of a change stream watching the target collection of a $out - // aggregation stage. - collToInvalidate.aggregate([{$out: "renamed_coll"}]); - // Do not check the 'ns' field since it will contain the namespace of the temp - // collection created by the $out stage, before renaming to 'renamed_coll'. - const rename = cst.getOneChange(aggCursor); - assert.eq(rename.operationType, "rename", tojson(rename)); - assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename)); + // Insert into the collections on both databases, and verify the change stream is able to + // pick them up. + for (let collToWrite of [db1Coll, db2Coll]) { + assert.writeOK(collToWrite.insert({_id: _idForTest})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, _idForTest); + assert.eq(change.ns.db, collToWrite.getDB().getName()); + _idForTest++; + } - // The change stream should not be invalidated by the rename(s). - assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length); - assert.writeOK(collToInvalidate.insert({_id: 2})); - assert.eq(cst.getOneChange(aggCursor).operationType, "insert"); + // Renaming the collection should generate a 'rename' notification. Skip this test when + // running on a sharded collection, since these cannot be renamed. + if (!FixtureHelpers.isSharded(collToInvalidate)) { + assertDropAndRecreateCollection(testDB, collToInvalidate.getName()); + const collName = collToInvalidate.getName(); - // Test that renaming a "system" collection to a user collection *does* return a rename - // notification. - assert.commandWorked(testDB.runCommand( - {create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); - assert.writeOK(testDB.system.views.renameCollection("non_system_collection")); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{ + // Start watching all changes in the cluster. + aggCursor = cst.startWatchingAllChangesForCluster(); + assert.writeOK(collToInvalidate.renameCollection("renamed_coll")); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [ + { operationType: "rename", - ns: {db: testDB.getName(), coll: "system.views"}, - to: {db: testDB.getName(), coll: "non_system_collection"} - }], - }); - - // Test that renaming a "system" collection to a different "system" collection does not - // result in a notification in the change stream. - aggCursor = cst.startWatchingAllChangesForCluster(); - assert.commandWorked(testDB.runCommand( - {create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); - // Note that the target of the rename must be a valid "system" collection. - assert.writeOK(testDB.system.views.renameCollection("system.users")); - // Verify that the change stream filters out the rename above, instead returning the - // next insert to the test collection. - assert.writeOK(collToInvalidate.insert({_id: 1})); - change = cst.getOneChange(aggCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()}); + ns: {db: testDB.getName(), coll: collToInvalidate.getName()}, + to: {db: testDB.getName(), coll: "renamed_coll"} + }, + ] + }); - // Test that renaming a user collection to a "system" collection *does* return a rename - // notification. - assert.writeOK(collToInvalidate.renameCollection("system.views")); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{ + // Repeat the test, this time using the 'dropTarget' option with an existing target + // collection. + collToInvalidate = testDB.getCollection("renamed_coll"); + assertDropAndRecreateCollection(testDB, collName); + assert.writeOK(testDB[collName].insert({_id: 0})); + assert.writeOK(collToInvalidate.renameCollection(collName, true /* dropTarget */)); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [ + { + operationType: "insert", + ns: {db: testDB.getName(), coll: collName}, + documentKey: {_id: 0}, + fullDocument: {_id: 0} + }, + { operationType: "rename", - ns: {db: testDB.getName(), coll: collToInvalidate.getName()}, - to: {db: testDB.getName(), coll: "system.views"} - }], - }); + ns: {db: testDB.getName(), coll: "renamed_coll"}, + to: {db: testDB.getName(), coll: collName} + } + ] + }); - // Drop the "system.views" collection to avoid view catalog errors in subsequent tests. - assertDropCollection(testDB, "system.views"); + collToInvalidate = testDB[collName]; - // Recreate the test collection for the remainder of the test. - assert.writeOK(collToInvalidate.insert({_id: 0})); + // Test renaming a collection to a different database. Do not run this in the mongos + // passthrough suites since we cannot guarantee the primary shard of the target database + // and renameCollection requires the source and destination to be on the same shard. + if (!FixtureHelpers.isMongos(testDB)) { + const otherDB = testDB.getSiblingDB(testDB.getName() + "_rename_target"); + // Ensure the target database exists. + const collOtherDB = assertDropAndRecreateCollection(otherDB, "test"); + assertDropCollection(otherDB, collOtherDB.getName()); + aggCursor = cst.startWatchingAllChangesForCluster(); + assert.commandWorked(testDB.adminCommand( + {renameCollection: collToInvalidate.getFullName(), to: collOtherDB.getFullName()})); + // Do not check the 'ns' field since it will contain the namespace of the temp + // collection created when renaming a collection across databases. + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "rename", tojson(change)); + assert.eq( + change.to, {db: otherDB.getName(), coll: collOtherDB.getName()}, tojson(change)); + // Rename across databases also drops the source collection after the collection is + // copied over. cst.assertNextChangesEqual({ cursor: aggCursor, expectedChanges: [{ - operationType: "insert", - ns: {db: testDB.getName(), coll: collToInvalidate.getName()}, - documentKey: {_id: 0}, - fullDocument: {_id: 0} + operationType: "drop", + ns: {db: testDB.getName(), coll: collToInvalidate.getName()} }] }); } - // Dropping a collection should generate a 'drop' entry. - assertDropCollection(testDB, collToInvalidate.getName()); - // Insert to the test collection to queue up another change after the drop. This is needed - // since the number of 'drop' notifications is not deterministic in the sharded passthrough - // suites. - assert.writeOK(collToInvalidate.insert({_id: 0})); - cst.consumeDropUpTo({ + // Test the behavior of a change stream watching the target collection of a $out + // aggregation stage. + collToInvalidate.aggregate([{$out: "renamed_coll"}]); + // Do not check the 'ns' field since it will contain the namespace of the temp + // collection created by the $out stage, before renaming to 'renamed_coll'. + const rename = cst.getOneChange(aggCursor); + assert.eq(rename.operationType, "rename", tojson(rename)); + assert.eq(rename.to, {db: testDB.getName(), coll: "renamed_coll"}, tojson(rename)); + + // The change stream should not be invalidated by the rename(s). + assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length); + assert.writeOK(collToInvalidate.insert({_id: 2})); + assert.eq(cst.getOneChange(aggCursor).operationType, "insert"); + + // Test that renaming a "system" collection to a user collection *does* return a rename + // notification. + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); + assert.writeOK(testDB.system.views.renameCollection("non_system_collection")); + cst.assertNextChangesEqual({ cursor: aggCursor, - dropType: "drop", - expectedNext: { - documentKey: {_id: 0}, - fullDocument: {_id: 0}, - ns: {db: testDB.getName(), coll: collToInvalidate.getName()}, - operationType: "insert", - }, + expectedChanges: [{ + operationType: "rename", + ns: {db: testDB.getName(), coll: "system.views"}, + to: {db: testDB.getName(), coll: "non_system_collection"} + }], }); - // Operations on internal "system" collections should be filtered out and not included in - // the change stream. + // Test that renaming a "system" collection to a different "system" collection does not + // result in a notification in the change stream. aggCursor = cst.startWatchingAllChangesForCluster(); - // Creating a view will generate an insert entry on the "system.views" collection. assert.commandWorked( testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); - // Drop the "system.views" collection. - assertDropCollection(testDB, "system.views"); - // Verify that the change stream does not report the insertion into "system.views", and is - // not invalidated by dropping the system collection. Instead, it correctly reports the next - // write to the test collection. + // Note that the target of the rename must be a valid "system" collection. + assert.writeOK(testDB.system.views.renameCollection("system.users")); + // Verify that the change stream filters out the rename above, instead returning the + // next insert to the test collection. assert.writeOK(collToInvalidate.insert({_id: 1})); change = cst.getOneChange(aggCursor); assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()}); + + // Test that renaming a user collection to a "system" collection *does* return a rename + // notification. + assert.writeOK(collToInvalidate.renameCollection("system.views")); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{ + operationType: "rename", + ns: {db: testDB.getName(), coll: collToInvalidate.getName()}, + to: {db: testDB.getName(), coll: "system.views"} + }], + }); + + // Drop the "system.views" collection to avoid view catalog errors in subsequent tests. + assertDropCollection(testDB, "system.views"); + + // Recreate the test collection for the remainder of the test. + assert.writeOK(collToInvalidate.insert({_id: 0})); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{ + operationType: "insert", + ns: {db: testDB.getName(), coll: collToInvalidate.getName()}, + documentKey: {_id: 0}, + fullDocument: {_id: 0} + }] + }); } - cst.cleanUp(); + // Dropping a collection should generate a 'drop' entry. + assertDropCollection(testDB, collToInvalidate.getName()); + // Insert to the test collection to queue up another change after the drop. This is needed + // since the number of 'drop' notifications is not deterministic in the sharded passthrough + // suites. + assert.writeOK(collToInvalidate.insert({_id: 0})); + cst.consumeDropUpTo({ + cursor: aggCursor, + dropType: "drop", + expectedNext: { + documentKey: {_id: 0}, + fullDocument: {_id: 0}, + ns: {db: testDB.getName(), coll: collToInvalidate.getName()}, + operationType: "insert", + }, + }); + + // Operations on internal "system" collections should be filtered out and not included in + // the change stream. + aggCursor = cst.startWatchingAllChangesForCluster(); + // Creating a view will generate an insert entry on the "system.views" collection. + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); + // Drop the "system.views" collection. + assertDropCollection(testDB, "system.views"); + // Verify that the change stream does not report the insertion into "system.views", and is + // not invalidated by dropping the system collection. Instead, it correctly reports the next + // write to the test collection. + assert.writeOK(collToInvalidate.insert({_id: 1})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns, {db: testDB.getName(), coll: collToInvalidate.getName()}); +} + +cst.cleanUp(); }()); |