diff options
author | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-03 17:00:41 -0400 |
---|---|---|
committer | Nick Zolnierz <nicholas.zolnierz@mongodb.com> | 2018-04-11 15:53:03 -0400 |
commit | fff261ac550155065fce4b7b1529061f18980599 (patch) | |
tree | 09ce022d7b8319f1af3c2db2354427ecfe1aa389 /jstests/change_streams | |
parent | 0fa7bcb8bea5d4585fdbc1003b5116cd7bf28540 (diff) | |
download | mongo-fff261ac550155065fce4b7b1529061f18980599.tar.gz |
SERVER-29134: Support change streams on an entire database in a sharded cluster
Diffstat (limited to 'jstests/change_streams')
5 files changed, 52 insertions, 30 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 4401df139e4..e7520d974b2 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -20,14 +20,14 @@ } // Test that a change stream cannot be opened on 'system.' collections. - assertInvalidChangeStreamNss("test", "system.users"); - assertInvalidChangeStreamNss("test", "system.profile"); - assertInvalidChangeStreamNss("test", "system.version"); + assertInvalidChangeStreamNss(db.getName(), "system.users"); + assertInvalidChangeStreamNss(db.getName(), "system.profile"); + assertInvalidChangeStreamNss(db.getName(), "system.version"); // Test that a change stream can be opened on namespaces with 'system' in the name, but not // considered an internal 'system dot' namespace. - assertValidChangeStreamNss("test", "systemindexes"); - assertValidChangeStreamNss("test", "system_users"); + assertValidChangeStreamNss(db.getName(), "systemindexes"); + assertValidChangeStreamNss(db.getName(), "system_users"); // Similar test but for DB names that are not considered internal. assert.writeOK(db.getSiblingDB("admincustomDB")["test"].insert({})); diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js index 49e4d1656bd..62d7ed1522e 100644 --- a/jstests/change_streams/change_stream_shell_helper.js +++ b/jstests/change_streams/change_stream_shell_helper.js @@ -67,7 +67,11 @@ jsTestLog("Testing watch() with pipeline"); cursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]); wholeDbCursor = db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]); - assert.writeOK(coll.insert({_id: 1, x: 1})); + + // Store the cluster time of the insert as the timestamp to start from. + const resumeTime = + assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]})) + .$clusterTime.clusterTime; checkNextChange(cursor, {docId: 1}); checkNextChange(wholeDbCursor, {docId: 1}); @@ -80,8 +84,6 @@ checkNextChange(wholeDbCursor, {docId: 1}); jsTestLog("Testing watch() with pipeline and startAtClusterTime"); - // Store the cluster time of the last insert as the timestamp to start from. - const resumeTime = db.runCommand({isMaster: 1}).$clusterTime.clusterTime; cursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAtClusterTime: {ts: resumeTime}}); wholeDbCursor = db.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/change_stream_whole_db.js index b24faeead15..4b2cd749c3e 100644 --- a/jstests/change_streams/change_stream_whole_db.js +++ b/jstests/change_streams/change_stream_whole_db.js @@ -5,20 +5,25 @@ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and // assert[Valid|Invalid]ChangeStreamNss. + load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. // Test that a change stream cannot be opened on the "admin", "config", or "local" databases. // TODO SERVER-34086: $changeStream may run against 'admin' if 'allChangesForCluster' is true. assertInvalidChangeStreamNss("admin", 1); assertInvalidChangeStreamNss("config", 1); - assertInvalidChangeStreamNss("local", 1); + if (!FixtureHelpers.isMongos(db)) { + assertInvalidChangeStreamNss("local", 1); + } - // Test that a change stream can be opened before a database exists. - assert.commandWorked(db.dropDatabase()); + assertDropCollection(db, "t1"); + assertDropCollection(db, "t2"); + + assertCreateCollection(db, "t1"); + assertCreateCollection(db, "t2"); let cst = new ChangeStreamTest(db); let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - assertCreateCollection(db, "t1"); // Test that if there are no changes, we return an empty batch. assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); @@ -45,8 +50,7 @@ // Dropping the database should invalidate the change stream. assert.commandWorked(db.dropDatabase()); - expected = {operationType: "invalidate"}; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [{operationType: "invalidate"}]}); cst.cleanUp(); }()); diff --git a/jstests/change_streams/change_stream_whole_db_invalidations.js b/jstests/change_streams/change_stream_whole_db_invalidations.js index a5523b7362c..0c7db3be0dc 100644 --- a/jstests/change_streams/change_stream_whole_db_invalidations.js +++ b/jstests/change_streams/change_stream_whole_db_invalidations.js @@ -5,6 +5,7 @@ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. const testDB = db.getSiblingDB(jsTestName()); let cst = new ChangeStreamTest(testDB); @@ -79,19 +80,27 @@ assert.eq(change.operationType, "insert", tojson(change)); assert.eq(change.documentKey._id, 1); + // Test that renaming a collection will invalidate the change stream. MongoDB does not allow + // renaming of sharded collections, so only perform this test if the collection is not sharded. + if (!FixtureHelpers.isSharded(coll)) { + assertDropCollection(testDB, coll.getName()); + + assertCreateCollection(testDB, coll.getName()); + assertDropCollection(testDB, "renamed_coll"); + aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); + assert.writeOK(coll.renameCollection("renamed_coll")); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + } + // Dropping a collection should invalidate the change stream. assertDropCollection(testDB, coll.getName()); - cst.assertNextChangesEqual({ - cursor: aggCursor, - expectedChanges: [{operationType: "invalidate"}], - expectInvalidate: true - }); - - // Renaming a collection should invalidate the change stream. - assertCreateCollection(testDB, coll.getName()); - assertDropCollection(testDB, "renamed_coll"); aggCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - assert.writeOK(coll.renameCollection("renamed_coll")); + assertCreateCollection(testDB, coll.getName()); + assertDropCollection(testDB, coll.getName()); cst.assertNextChangesEqual({ cursor: aggCursor, expectedChanges: [{operationType: "invalidate"}], diff --git a/jstests/change_streams/change_stream_whole_db_resumability.js b/jstests/change_streams/change_stream_whole_db_resumability.js index e3b11ab9c60..fa6cd369f75 100644 --- a/jstests/change_streams/change_stream_whole_db_resumability.js +++ b/jstests/change_streams/change_stream_whole_db_resumability.js @@ -5,31 +5,38 @@ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. - assertDropAndRecreateCollection(db, "resumeColl"); - const coll = db.resumeColl; + const coll = db[jsTestName() + "resume_coll"]; + const otherColl = db[jsTestName() + "resume_coll_other"]; + coll.drop(); + otherColl.drop(); - // Note we do not project away 'id.ts' as it is part of the resume token. let cst = new ChangeStreamTest(db); let resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - // Insert a document and save the resulting change stream. + // Insert a single document to each collection and save the resume token from the first insert. assert.writeOK(coll.insert({_id: 1})); + assert.writeOK(otherColl.insert({_id: 2})); const firstInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1}); + assert.eq(firstInsertChangeDoc.ns, {db: "test", coll: coll.getName()}); - // Test resume after an insert. + // Test resuming the change stream after the first insert should pick up the insert on the + // second collection. resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], collection: 1, aggregateOptions: {cursor: {batchSize: 0}}, }); - assert.writeOK(coll.insert({_id: 2})); const secondInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2}); + assert.eq(secondInsertChangeDoc.ns, {db: "test", coll: otherColl.getName()}); + + // Insert a third document to the first collection and test that the change stream picks it up. assert.writeOK(coll.insert({_id: 3})); const thirdInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3}); + assert.eq(thirdInsertChangeDoc.ns, {db: "test", coll: coll.getName()}); // Test resuming after the first insert again. resumeCursor = cst.startWatchingChanges({ |