diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-07 18:05:12 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-06-19 15:37:58 -0400 |
commit | 77f43172584a0302df127174b9b0bd2259730772 (patch) | |
tree | afe00156f29775142b070d251fa5691fefde686e /jstests/change_streams | |
parent | 4e614564d032bc9133ac46319b588b4b62cee304 (diff) | |
download | mongo-77f43172584a0302df127174b9b0bd2259730772.tar.gz |
SERVER-35029: Add change stream notification for database drop
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/metadata_notifications.js | 10 | ||||
-rw-r--r-- | jstests/change_streams/whole_cluster.js | 8 | ||||
-rw-r--r-- | jstests/change_streams/whole_cluster_metadata_notifications.js | 7 | ||||
-rw-r--r-- | jstests/change_streams/whole_cluster_resumability.js | 46 | ||||
-rw-r--r-- | jstests/change_streams/whole_db.js | 18 | ||||
-rw-r--r-- | jstests/change_streams/whole_db_metadata_notifications.js | 29 | ||||
-rw-r--r-- | jstests/change_streams/whole_db_resumability.js | 80 |
7 files changed, 167 insertions, 31 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js index ea7559de77b..ef17b2b4249 100644 --- a/jstests/change_streams/metadata_notifications.js +++ b/jstests/change_streams/metadata_notifications.js @@ -1,4 +1,7 @@ // Tests of $changeStream notifications for metadata operations. +// Do not run in whole-cluster passthrough since this test assumes that the change stream will be +// invalidated by a database drop. +// @tags: [do_not_run_in_whole_cluster_passthrough] (function() { "use strict"; @@ -27,6 +30,13 @@ assert.neq(change.id, 0); assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch)); + // Dropping the empty database should not generate any notification for the change stream, since + // the collection does not exist yet. + assert.commandWorked(db.dropDatabase()); + change = cst.getNextBatch(cursor); + assert.neq(change.id, 0); + assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch)); + // After collection creation, we expect to see oplog entries for each subsequent operation. let coll = assertCreateCollection(db, collName); assert.writeOK(coll.insert({_id: 0})); diff --git a/jstests/change_streams/whole_cluster.js b/jstests/change_streams/whole_cluster.js index 9d154220b6e..894e90fc31d 100644 --- a/jstests/change_streams/whole_cluster.js +++ b/jstests/change_streams/whole_cluster.js @@ -119,14 +119,12 @@ assertDropCollection(db.getSiblingDB(dbName), "test"); }); - // Dropping a database should generate drop entries for each collection followed by an - // invalidate. - // TODO SERVER-35029: This test should not invalidate the stream once there's support for - // returning a notification for the dropDatabase command. + // Dropping a database should generate drop entries for each collection followed by a database + // drop. assert.commandWorked(otherDB.dropDatabase()); expected = [ {operationType: "drop", ns: {db: otherDB.getName(), coll: "t2"}}, - {operationType: "invalidate"} + {operationType: "dropDatabase", ns: {db: otherDB.getName()}}, ]; cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); diff --git a/jstests/change_streams/whole_cluster_metadata_notifications.js b/jstests/change_streams/whole_cluster_metadata_notifications.js index 7c8e9c59f8f..471f31282ff 100644 --- a/jstests/change_streams/whole_cluster_metadata_notifications.js +++ b/jstests/change_streams/whole_cluster_metadata_notifications.js @@ -31,7 +31,8 @@ assert.writeOK(coll.remove({_id: 1})); } - // Drop the second database, which should invalidate the stream. + // Drop the second database, which should generate a 'drop' entry for the collection followed + // by a 'dropDatabase' entry. assert.commandWorked(testDB2.dropDatabase()); // We should get 6 oplog entries; three ops of type insert, update, delete from each database. @@ -50,10 +51,8 @@ cursor: aggCursor, expectedChanges: [ {operationType: "drop", ns: {db: testDB2.getName(), coll: db2Coll.getName()}}, - // TODO SERVER-35029: Return an entry for a database drop, instead of "invalidate". - {operationType: "invalidate"} + {operationType: "dropDatabase", ns: {db: testDB2.getName()}}, ], - expectInvalidate: true }); // Test that a cluster-wide change stream can be resumed using a token from a collection which diff --git a/jstests/change_streams/whole_cluster_resumability.js b/jstests/change_streams/whole_cluster_resumability.js index 9058c58010a..ad9145ba74a 100644 --- a/jstests/change_streams/whole_cluster_resumability.js +++ b/jstests/change_streams/whole_cluster_resumability.js @@ -6,8 +6,8 @@ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. // Create two databases, with one collection in each. - const testDBs = [db, db.getSiblingDB(jsTestName() + "_other")]; - const[db1Coll, db2Coll] = testDBs.map((db) => assertDropAndRecreateCollection(db, "test")); + const testDBs = [db.getSiblingDB(jsTestName()), db.getSiblingDB(jsTestName() + "_other")]; + let [db1Coll, db2Coll] = testDBs.map((db) => assertDropAndRecreateCollection(db, "test")); const adminDB = db.getSiblingDB("admin"); let cst = new ChangeStreamTest(adminDB); @@ -55,5 +55,47 @@ }); assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); + // Dropping a database should generate a 'drop' notification for the collection followed by a + // 'dropDatabase' notification. + assert.commandWorked(testDBs[0].dropDatabase()); + const dropDbChanges = cst.assertNextChangesEqual({ + cursor: resumeCursor, + expectedChanges: [ + {operationType: "drop", ns: {db: testDBs[0].getName(), coll: db1Coll.getName()}}, + {operationType: "dropDatabase", ns: {db: testDBs[0].getName()}} + ] + }); + + // Recreate the database and verify that the change stream picks up another insert. + assert.writeOK(db1Coll.insert({_id: 5})); + + let change = cst.getOneChange(resumeCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns, {db: testDBs[0].getName(), coll: db1Coll.getName()}, tojson(change)); + assert.eq(change.fullDocument, {_id: 5}, tojson(change)); + + // Resume the change stream from the 'drop' entry. + resumeCursor = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: dropDbChanges[0]._id, allChangesForCluster: true}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + change = cst.getOneChange(resumeCursor); + assert.eq(change.operationType, "dropDatabase", tojson(change)); + assert.eq(change.ns, {db: testDBs[0].getName()}, tojson(change)); + + // Resume the change stream from the 'dropDatabase' entry. + resumeCursor = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: dropDbChanges[1]._id, allChangesForCluster: true}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + change = cst.getOneChange(resumeCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns, {db: testDBs[0].getName(), coll: db1Coll.getName()}, tojson(change)); + assert.eq(change.fullDocument, {_id: 5}, tojson(change)); + cst.cleanUp(); })(); diff --git a/jstests/change_streams/whole_db.js b/jstests/change_streams/whole_db.js index 8c287e8493f..82240dacd4b 100644 --- a/jstests/change_streams/whole_db.js +++ b/jstests/change_streams/whole_db.js @@ -1,4 +1,7 @@ // Basic tests for $changeStream against all collections in a database. +// Do not run in whole-cluster passthrough since this test assumes that the change stream will be +// invalidated by a database drop. +// @tags: [do_not_run_in_whole_cluster_passthrough] (function() { "use strict"; @@ -8,6 +11,7 @@ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. db = db.getSiblingDB(jsTestName()); + assert.commandWorked(db.dropDatabase()); // Test that a single-database change stream cannot be opened on "admin", "config", or "local". assertInvalidChangeStreamNss("admin", 1); @@ -63,19 +67,5 @@ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); }); - // Dropping the database should generate collection drop entries followed by an invalidate. Note - // that the order of collection drops is not guaranteed so only check the database name. - assert.commandWorked(db.dropDatabase()); - let change = cst.getOneChange(cursor); - assert.eq(change.operationType, "drop", tojson(change)); - assert.eq(change.ns.db, db.getName(), tojson(change)); - change = cst.getOneChange(cursor); - assert.eq(change.operationType, "drop", tojson(change)); - assert.eq(change.ns.db, db.getName(), tojson(change)); - - // TODO SERVER-35029: Expect to see a 'dropDatabase' entry before the invalidate. - change = cst.getOneChange(cursor, true); - assert.eq(change.operationType, "invalidate", tojson(change)); - cst.cleanUp(); }()); diff --git a/jstests/change_streams/whole_db_metadata_notifications.js b/jstests/change_streams/whole_db_metadata_notifications.js index 6fa694aa1e8..0cb15d57680 100644 --- a/jstests/change_streams/whole_db_metadata_notifications.js +++ b/jstests/change_streams/whole_db_metadata_notifications.js @@ -1,4 +1,7 @@ // Tests of metadata notifications for a $changeStream on a whole database. +// Do not run in whole-cluster passthrough since this test assumes that the change stream will be +// invalidated by a database drop. +// @tags: [do_not_run_in_whole_cluster_passthrough] (function() { "use strict"; @@ -159,6 +162,15 @@ assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length); assert.writeOK(coll.insert({_id: 2})); assert.eq(cst.getOneChange(aggCursor).operationType, "insert"); + + // Drop the new collection to avoid an additional 'drop' notification when the database is + // dropped. + assertDropCollection(testDB, "renamed_coll"); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: + [{operationType: "drop", ns: {db: testDB.getName(), coll: "renamed_coll"}}], + }); } // Dropping a collection should return a 'drop' entry. @@ -228,6 +240,23 @@ // Drop the "system.views" collection to avoid view catalog errors in subsequent tests. assertDropCollection(testDB, "system.views"); assertDropCollection(testDB, "non_system_collection"); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [ + {operationType: "drop", ns: {db: testDB.getName(), coll: "non_system_collection"}}, + ] + }); + + // Dropping the database should generate a 'dropDatabase' notification followed by an + // 'invalidate'. + assert.commandWorked(testDB.dropDatabase()); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [ + {operationType: "dropDatabase", ns: {db: testDB.getName()}}, + {operationType: "invalidate"} + ] + }); cst.cleanUp(); }()); diff --git a/jstests/change_streams/whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js index 682d0546cb7..25f03943112 100644 --- a/jstests/change_streams/whole_db_resumability.js +++ b/jstests/change_streams/whole_db_resumability.js @@ -1,4 +1,7 @@ // Basic tests for resuming a $changeStream that is open against all collections in a database. +// Do not run in whole-cluster passthrough since this test assumes that the change stream will be +// invalidated by a database drop. +// @tags: [do_not_run_in_whole_cluster_passthrough] (function() { "use strict"; @@ -6,10 +9,11 @@ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. // Drop and recreate the collections to be used in this set of tests. - const coll = assertDropAndRecreateCollection(db, jsTestName() + "resume_coll"); - const otherColl = assertDropAndRecreateCollection(db, jsTestName() + "resume_coll_other"); + const testDB = db.getSiblingDB(jsTestName()); + let coll = assertDropAndRecreateCollection(testDB, "resume_coll"); + const otherColl = assertDropAndRecreateCollection(testDB, "resume_coll_other"); - let cst = new ChangeStreamTest(db); + let cst = new ChangeStreamTest(testDB); let resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); // Insert a single document to each collection and save the resume token from the first insert. @@ -17,7 +21,7 @@ assert.writeOK(otherColl.insert({_id: 2})); const firstInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1}); - assert.eq(firstInsertChangeDoc.ns, {db: "test", coll: coll.getName()}); + assert.eq(firstInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()}); // Test resuming the change stream after the first insert should pick up the insert on the // second collection. @@ -29,13 +33,13 @@ const secondInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2}); - assert.eq(secondInsertChangeDoc.ns, {db: "test", coll: otherColl.getName()}); + assert.eq(secondInsertChangeDoc.ns, {db: testDB.getName(), coll: otherColl.getName()}); // Insert a third document to the first collection and test that the change stream picks it up. assert.writeOK(coll.insert({_id: 3})); const thirdInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3}); - assert.eq(thirdInsertChangeDoc.ns, {db: "test", coll: coll.getName()}); + assert.eq(thirdInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()}); // Test resuming after the first insert again. resumeCursor = cst.startWatchingChanges({ @@ -54,5 +58,69 @@ }); assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); + // Explicitly drop one collection to ensure reliability of the order of notifications from the + // dropDatabase command. + assertDropCollection(testDB, otherColl.getName()); + const firstCollDrop = cst.getOneChange(resumeCursor); + assert.eq(firstCollDrop.operationType, "drop", tojson(firstCollDrop)); + assert.eq(firstCollDrop.ns, {db: testDB.getName(), coll: otherColl.getName()}); + + // Dropping a database should generate a 'drop' notification for each collection, a + // 'dropDatabase' notification, and finally an 'invalidate'. + assert.commandWorked(testDB.dropDatabase()); + const expectedChangesAfterFirstDrop = [ + {operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}}, + {operationType: "dropDatabase", ns: {db: testDB.getName()}}, + {operationType: "invalidate"} + ]; + const dropDbChanges = cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop}); + + // Resume from the first collection drop. + resumeCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: firstCollDrop._id}}], + collection: 1, + }); + cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop}); + + // Resume from the second collection drop. + resumeCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: dropDbChanges[0]._id}}], + collection: 1, + }); + cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop.slice(1)}); + + // Recreate the test collection. + coll = assertCreateCollection(testDB, coll.getName()); + assert.writeOK(coll.insert({_id: 0})); + + // Test resuming from the 'dropDatabase' entry. + // TODO SERVER-34789: Resuming from the 'dropDatabase' should return a single invalidate + // notification. + resumeCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: dropDbChanges[1]._id}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + let change = cst.getOneChange(resumeCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.fullDocument, {_id: 0}, tojson(change)); + assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}, tojson(change)); + + // Test resuming from the 'invalidate' entry. + // TODO SERVER-34789: Resuming from an invalidate should error or return an invalidate + // notification. + resumeCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + change = cst.getOneChange(resumeCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.fullDocument, {_id: 0}, tojson(change)); + assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}, tojson(change)); + cst.cleanUp(); })(); |