diff options
Diffstat (limited to 'jstests/change_streams')
6 files changed, 271 insertions, 20 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index e7520d974b2..01d60bde295 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -19,6 +19,16 @@ assertInvalidChangeStreamNss("local"); } + // Test that a change stream can be opened on the admin database if {allChangesForCluster:true} + // is specified. + assertValidChangeStreamNss("admin", 1, {allChangesForCluster: true}); + // Test that a change stream cannot be opened on the admin database if a collection is + // specified, even with {allChangesForCluster:true}. + assertInvalidChangeStreamNss("admin", "testcoll", {allChangesForCluster: true}); + // Test that a change stream cannot be opened on a database other than admin if + // {allChangesForCluster:true} is specified. + assertInvalidChangeStreamNss(db.getName(), 1, {allChangesForCluster: true}); + // Test that a change stream cannot be opened on 'system.' collections. assertInvalidChangeStreamNss(db.getName(), "system.users"); assertInvalidChangeStreamNss(db.getName(), "system.profile"); diff --git a/jstests/change_streams/change_stream_whole_cluster.js b/jstests/change_streams/change_stream_whole_cluster.js new file mode 100644 index 00000000000..b6a6c8a3483 --- /dev/null +++ b/jstests/change_streams/change_stream_whole_cluster.js @@ -0,0 +1,47 @@ +// Basic tests for $changeStream against all databases in the cluster. +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and + // assert[Valid|Invalid]ChangeStreamNss. + + const adminDB = db.getSiblingDB("admin"); + const otherDB = db.getSiblingDB(`${db.getName()}_other`); + + let cst = new ChangeStreamTest(adminDB); + let cursor = cst.startWatchingAllChangesForCluster(); + + assertCreateCollection(db, "t1"); + // Test that if there are no changes, we return an empty batch. + assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); + + // Test that the change stream returns an inserted doc. + assert.writeOK(db.t1.insert({_id: 0, a: 1})); + let expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0, a: 1}, + ns: {db: db.getName(), coll: "t1"}, + operationType: "insert", + }; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Test that the change stream returns another inserted doc in a different database. + assert.writeOK(otherDB.t2.insert({_id: 0, a: 2})); + expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0, a: 2}, + ns: {db: otherDB.getName(), coll: "t2"}, + operationType: "insert", + }; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Dropping either database should invalidate the change stream. + assert.commandWorked(otherDB.dropDatabase()); + expected = {operationType: "invalidate"}; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Drop the remaining database and clean up the test. + assert.commandWorked(db.dropDatabase()); + cst.cleanUp(); +}()); diff --git a/jstests/change_streams/change_stream_whole_cluster_invalidations.js b/jstests/change_streams/change_stream_whole_cluster_invalidations.js new file mode 100644 index 00000000000..143d7e47b91 --- /dev/null +++ b/jstests/change_streams/change_stream_whole_cluster_invalidations.js @@ -0,0 +1,132 @@ +// Tests of invalidate entries for a $changeStream on a whole cluster. +(function() { + "use strict"; + + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + // Define two databases. We will conduct our tests by creating one collection in each. + const testDB1 = db, testDB2 = db.getSiblingDB(`${db.getName()}_other`); + const adminDB = db.getSiblingDB("admin"); + + // Create one collection on each database. + let [db1Coll, db2Coll] = + [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName())); + + // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened + // on admin. + let cst = new ChangeStreamTest(adminDB); + let aggCursor = cst.startWatchingAllChangesForCluster(); + + // Generate oplog entries of type insert, update, and delete across both databases. + for (let coll of[db1Coll, db2Coll]) { + assert.writeOK(coll.insert({_id: 1})); + assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}})); + assert.writeOK(coll.remove({_id: 1})); + } + + // Drop the second database, which should invalidate the stream. + assert.commandWorked(testDB2.dropDatabase()); + + // We should get 7 oplog entries; three ops of type insert, update, delete from each database, + // and then an invalidate. The cursor should be closed. + for (let expectedDB of[testDB1, testDB2]) { + let change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "update", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "delete", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + } + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + + // Test that a cluster-wide change stream cannot be resumed using a token from a collection + // which has been dropped. + db1Coll = assertDropAndRecreateCollection(testDB1, jsTestName()); + + // Get a valid resume token that the next change stream can use. + aggCursor = cst.startWatchingAllChangesForCluster(); + + assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + let change = cst.getOneChange(aggCursor, false); + const resumeToken = change._id; + + // It should not possible to resume a change stream after a collection drop, even if the + // invalidate has not been received. + assertDropCollection(db1Coll, db1Coll.getName()); + // Wait for two-phase drop to complete, so that the UUID no longer exists. + assert.soon(function() { + return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1, + db1Coll.getName()); + }); + assert.commandFailedWithCode(adminDB.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}], + cursor: {} + }), + 40615); + + // Test that invalidation entries from any database invalidate the stream. + [db1Coll, db2Coll] = + [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName())); + let _idForTest = 0; + for (let collToInvalidate of[db1Coll, db2Coll]) { + // Start watching all changes in the cluster. + aggCursor = cst.startWatchingAllChangesForCluster(); + + let testDB = collToInvalidate.getDB(); + + // Insert into the collections on both databases, and verify the change stream is able to + // pick them up. + for (let collToWrite of[db1Coll, db2Coll]) { + assert.writeOK(collToWrite.insert({_id: _idForTest})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, _idForTest); + assert.eq(change.ns.db, collToWrite.getDB().getName()); + _idForTest++; + } + + // Renaming the collection should invalidate the change stream. + // TODO SERVER-34088: cannot rename collections on mongoS. + assert.writeOK(collToInvalidate.renameCollection("renamed_coll")); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + collToInvalidate = testDB.getCollection("renamed_coll"); + + // Dropping a collection should invalidate the change stream. + aggCursor = cst.startWatchingAllChangesForCluster(); + assertDropCollection(testDB, collToInvalidate.getName()); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + + // Dropping a 'system' collection should invalidate the change stream. + // Create a view to ensure that the 'system.views' collection exists. + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); + aggCursor = cst.startWatchingAllChangesForCluster(); + assertDropCollection(testDB, "system.views"); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + } + + cst.cleanUp(); +}()); diff --git a/jstests/change_streams/change_stream_whole_cluster_resumability.js b/jstests/change_streams/change_stream_whole_cluster_resumability.js new file mode 100644 index 00000000000..df1041d0628 --- /dev/null +++ b/jstests/change_streams/change_stream_whole_cluster_resumability.js @@ -0,0 +1,60 @@ +// Basic tests for resuming a $changeStream that is open against all databases in a cluster. +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + + // Create two databases, with one collection in each. + const testDBs = [db, db.getSiblingDB(`${db.getName()}_other`)]; + const[db1Coll, db2Coll] = + testDBs.map((db) => assertDropAndRecreateCollection(db, jsTestName())); + const adminDB = db.getSiblingDB("admin"); + + let cst = new ChangeStreamTest(adminDB); + let resumeCursor = cst.startWatchingAllChangesForCluster(); + + // Insert a document in the first database and save the resulting change stream. + assert.writeOK(db1Coll.insert({_id: 1})); + const firstInsertChangeDoc = cst.getOneChange(resumeCursor); + assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1}); + + // Test resume after the first insert. + resumeCursor = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + + // Write the next document into the second database. + assert.writeOK(db2Coll.insert({_id: 2})); + const secondInsertChangeDoc = cst.getOneChange(resumeCursor); + assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2}); + + // Write the third document into the first database again. + assert.writeOK(db1Coll.insert({_id: 3})); + const thirdInsertChangeDoc = cst.getOneChange(resumeCursor); + assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3}); + + // Test resuming after the first insert again. + resumeCursor = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc); + assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); + + // Test resume after second insert. + resumeCursor = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: secondInsertChangeDoc._id, allChangesForCluster: true}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); + + cst.cleanUp(); +})(); diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/change_stream_whole_db.js index 4b2cd749c3e..77ecfe932d1 100644 --- a/jstests/change_streams/change_stream_whole_db.js +++ b/jstests/change_streams/change_stream_whole_db.js @@ -7,8 +7,7 @@ // assert[Valid|Invalid]ChangeStreamNss. load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. - // Test that a change stream cannot be opened on the "admin", "config", or "local" databases. - // TODO SERVER-34086: $changeStream may run against 'admin' if 'allChangesForCluster' is true. + // Test that a single-database change stream cannot be opened on "admin", "config", or "local". assertInvalidChangeStreamNss("admin", 1); assertInvalidChangeStreamNss("config", 1); if (!FixtureHelpers.isMongos(db)) { diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index 7d21884864b..434360aff4c 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -10,17 +10,18 @@ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. load("jstests/replsets/libs/two_phase_drops.js"); // For 'TwoPhaseDropCollectionTest'. - let cst = new ChangeStreamTest(db); const coll = assertDropAndRecreateCollection(db, "change_post_image"); - function testUpdateLookup(coll, collToWatch) { + function testUpdateLookup(coll, collToWatch, changeStreamDB = db, changeStreamSpec = {}) { coll.drop(); + const cst = new ChangeStreamTest(changeStreamDB); + jsTestLog("Testing change streams without 'fullDocument' specified"); // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for // an insert. - let cursor = - cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collToWatch}); + let cursor = cst.startWatchingChanges( + {pipeline: [{$changeStream: changeStreamSpec}], collection: collToWatch}); assert.writeOK(coll.insert({_id: "fullDocument not specified"})); let latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "insert"); @@ -45,8 +46,9 @@ // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the // result for an insert. + const defaultFullDocSpec = Object.assign({fullDocument: "default"}, changeStreamSpec); cursor = cst.startWatchingChanges( - {collection: collToWatch, pipeline: [{$changeStream: {fullDocument: "default"}}]}); + {collection: collToWatch, pipeline: [{$changeStream: defaultFullDocSpec}]}); assert.writeOK(coll.insert({_id: "fullDocument is default"})); latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "insert"); @@ -71,8 +73,9 @@ // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in // the result for an insert. + const updateLookupSpec = Object.assign({fullDocument: "updateLookup"}, changeStreamSpec); cursor = cst.startWatchingChanges( - {collection: collToWatch, pipeline: [{$changeStream: {fullDocument: "updateLookup"}}]}); + {collection: collToWatch, pipeline: [{$changeStream: updateLookupSpec}]}); assert.writeOK(coll.insert({_id: "fullDocument is lookup"})); latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "insert"); @@ -98,10 +101,7 @@ // in a 'fullDocument' with a value of null. cursor = cst.startWatchingChanges({ collection: collToWatch, - pipeline: [ - {$changeStream: {fullDocument: "updateLookup"}}, - {$match: {operationType: "update"}} - ] + pipeline: [{$changeStream: updateLookupSpec}, {$match: {operationType: "update"}}] }); assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updatedAgain: true}})); assert.writeOK(coll.remove({_id: "fullDocument is lookup"})); @@ -123,10 +123,12 @@ assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}})); // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet. + const resumeAfterDeleteAndUpdateLookupSpec = Object.assign( + {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}, changeStreamSpec); cursor = cst.startWatchingChanges({ collection: collToWatch, pipeline: [ - {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, + {$changeStream: resumeAfterDeleteAndUpdateLookupSpec}, {$match: {operationType: {$ne: "delete"}}} ], aggregateOptions: {cursor: {batchSize: 0}} @@ -136,7 +138,7 @@ let cursorBeforeDrop = cst.startWatchingChanges({ collection: collToWatch, pipeline: [ - {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, + {$changeStream: resumeAfterDeleteAndUpdateLookupSpec}, {$match: {operationType: {$ne: "delete"}}} ], aggregateOptions: {cursor: {batchSize: 0}} @@ -177,10 +179,10 @@ assert.eq(latestChange.fullDocument, null); // Test establishing new cursors with resume token on dropped collections fails. - let res = db.runCommand({ + let res = changeStreamDB.runCommand({ aggregate: collToWatch, pipeline: [ - {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, + {$changeStream: resumeAfterDeleteAndUpdateLookupSpec}, {$match: {operationType: "update"}} ], cursor: {batchSize: 0} @@ -207,8 +209,8 @@ // specified. const collInvalidate = assertDropAndRecreateCollection(db, "collInvalidate"); cursor = cst.startWatchingChanges({ - collection: collInvalidate.getName(), - pipeline: [{$changeStream: {fullDocument: "updateLookup"}}], + collection: isNumber(collToWatch) ? collToWatch : collInvalidate.getName(), + pipeline: [{$changeStream: updateLookupSpec}], aggregateOptions: {cursor: {batchSize: 0}} }); assert.writeOK(collInvalidate.insert({_id: "testing invalidate"})); @@ -229,7 +231,7 @@ cursor = cst.startWatchingChanges({ collection: collToWatch, - pipeline: [{$changeStream: {fullDocument: "updateLookup"}}], + pipeline: [{$changeStream: updateLookupSpec}], }); assert.writeOK(coll.update({_id: "getMoreEnabled"}, {$set: {updated: true}})); @@ -243,5 +245,6 @@ // Test update lookup with a change stream on a whole database. testUpdateLookup(coll, 1); - cst.cleanUp(); + // Test update lookup with a change stream on the whole cluster. + testUpdateLookup(coll, 1, db.getSiblingDB("admin"), {allChangesForCluster: true}); }()); |