diff options
17 files changed, 561 insertions, 132 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml index ad76d5785d4..2afd262b338 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml @@ -8,9 +8,14 @@ selector: # to work against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js # TODO: SERVER-32088 should fix resuming a change stream when not all shards have data. + - jstests/change_streams/change_stream_shell_helper.js - jstests/change_streams/change_stream_whole_db_resumability.js - jstests/change_streams/lookup_post_image.js - - jstests/change_streams/change_stream_shell_helper.js + # TODO SERVER-34088 add support for cluster-wide change stream on sharded clusters + - jstests/change_streams/change_stream.js + - jstests/change_streams/change_stream_whole_cluster.js + - jstests/change_streams/change_stream_whole_cluster_invalidations.js + - jstests/change_streams/change_stream_whole_cluster_resumability.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml index 9972d983734..fde33fd8e77 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml @@ -9,6 +9,13 @@ selector: - jstests/change_streams/only_wake_getmore_for_relevant_changes.js # This test is not expected to work when run against a mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-34088 add support for cluster-wide change stream on sharded clusters + - jstests/change_streams/change_stream.js + - jstests/change_streams/change_stream_shell_helper.js + - jstests/change_streams/change_stream_whole_cluster.js + - jstests/change_streams/change_stream_whole_cluster_invalidations.js + - jstests/change_streams/change_stream_whole_cluster_resumability.js + - jstests/change_streams/lookup_post_image.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml index 827f88fe878..175ed177602 100644 --- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml +++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml @@ -6,6 +6,13 @@ selector: exclude_files: # Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos. - jstests/change_streams/report_latest_observed_oplog_timestamp.js + # TODO SERVER-34088 add support for cluster-wide change stream on sharded clusters + - jstests/change_streams/change_stream.js + - jstests/change_streams/change_stream_shell_helper.js + - jstests/change_streams/change_stream_whole_cluster.js + - jstests/change_streams/change_stream_whole_cluster_invalidations.js + - jstests/change_streams/change_stream_whole_cluster_resumability.js + - jstests/change_streams/lookup_post_image.js exclude_with_any_tags: ## # The next three tags correspond to the special errors thrown by the diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index e7520d974b2..01d60bde295 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -19,6 +19,16 @@ assertInvalidChangeStreamNss("local"); } + // Test that a change stream can be opened on the admin database if {allChangesForCluster:true} + // is specified. + assertValidChangeStreamNss("admin", 1, {allChangesForCluster: true}); + // Test that a change stream cannot be opened on the admin database if a collection is + // specified, even with {allChangesForCluster:true}. + assertInvalidChangeStreamNss("admin", "testcoll", {allChangesForCluster: true}); + // Test that a change stream cannot be opened on a database other than admin if + // {allChangesForCluster:true} is specified. + assertInvalidChangeStreamNss(db.getName(), 1, {allChangesForCluster: true}); + // Test that a change stream cannot be opened on 'system.' collections. assertInvalidChangeStreamNss(db.getName(), "system.users"); assertInvalidChangeStreamNss(db.getName(), "system.profile"); diff --git a/jstests/change_streams/change_stream_whole_cluster.js b/jstests/change_streams/change_stream_whole_cluster.js new file mode 100644 index 00000000000..b6a6c8a3483 --- /dev/null +++ b/jstests/change_streams/change_stream_whole_cluster.js @@ -0,0 +1,47 @@ +// Basic tests for $changeStream against all databases in the cluster. +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and + // assert[Valid|Invalid]ChangeStreamNss. + + const adminDB = db.getSiblingDB("admin"); + const otherDB = db.getSiblingDB(`${db.getName()}_other`); + + let cst = new ChangeStreamTest(adminDB); + let cursor = cst.startWatchingAllChangesForCluster(); + + assertCreateCollection(db, "t1"); + // Test that if there are no changes, we return an empty batch. + assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); + + // Test that the change stream returns an inserted doc. + assert.writeOK(db.t1.insert({_id: 0, a: 1})); + let expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0, a: 1}, + ns: {db: db.getName(), coll: "t1"}, + operationType: "insert", + }; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Test that the change stream returns another inserted doc in a different database. + assert.writeOK(otherDB.t2.insert({_id: 0, a: 2})); + expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0, a: 2}, + ns: {db: otherDB.getName(), coll: "t2"}, + operationType: "insert", + }; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Dropping either database should invalidate the change stream. + assert.commandWorked(otherDB.dropDatabase()); + expected = {operationType: "invalidate"}; + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Drop the remaining database and clean up the test. + assert.commandWorked(db.dropDatabase()); + cst.cleanUp(); +}()); diff --git a/jstests/change_streams/change_stream_whole_cluster_invalidations.js b/jstests/change_streams/change_stream_whole_cluster_invalidations.js new file mode 100644 index 00000000000..143d7e47b91 --- /dev/null +++ b/jstests/change_streams/change_stream_whole_cluster_invalidations.js @@ -0,0 +1,132 @@ +// Tests of invalidate entries for a $changeStream on a whole cluster. +(function() { + "use strict"; + + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + // Define two databases. We will conduct our tests by creating one collection in each. + const testDB1 = db, testDB2 = db.getSiblingDB(`${db.getName()}_other`); + const adminDB = db.getSiblingDB("admin"); + + // Create one collection on each database. + let [db1Coll, db2Coll] = + [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName())); + + // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened + // on admin. + let cst = new ChangeStreamTest(adminDB); + let aggCursor = cst.startWatchingAllChangesForCluster(); + + // Generate oplog entries of type insert, update, and delete across both databases. + for (let coll of[db1Coll, db2Coll]) { + assert.writeOK(coll.insert({_id: 1})); + assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}})); + assert.writeOK(coll.remove({_id: 1})); + } + + // Drop the second database, which should invalidate the stream. + assert.commandWorked(testDB2.dropDatabase()); + + // We should get 7 oplog entries; three ops of type insert, update, delete from each database, + // and then an invalidate. The cursor should be closed. + for (let expectedDB of[testDB1, testDB2]) { + let change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "update", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "delete", tojson(change)); + assert.eq(change.ns.db, expectedDB.getName(), tojson(change)); + } + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + + // Test that a cluster-wide change stream cannot be resumed using a token from a collection + // which has been dropped. + db1Coll = assertDropAndRecreateCollection(testDB1, jsTestName()); + + // Get a valid resume token that the next change stream can use. + aggCursor = cst.startWatchingAllChangesForCluster(); + + assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); + + let change = cst.getOneChange(aggCursor, false); + const resumeToken = change._id; + + // It should not possible to resume a change stream after a collection drop, even if the + // invalidate has not been received. + assertDropCollection(db1Coll, db1Coll.getName()); + // Wait for two-phase drop to complete, so that the UUID no longer exists. + assert.soon(function() { + return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1, + db1Coll.getName()); + }); + assert.commandFailedWithCode(adminDB.runCommand({ + aggregate: 1, + pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}], + cursor: {} + }), + 40615); + + // Test that invalidation entries from any database invalidate the stream. + [db1Coll, db2Coll] = + [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName())); + let _idForTest = 0; + for (let collToInvalidate of[db1Coll, db2Coll]) { + // Start watching all changes in the cluster. + aggCursor = cst.startWatchingAllChangesForCluster(); + + let testDB = collToInvalidate.getDB(); + + // Insert into the collections on both databases, and verify the change stream is able to + // pick them up. + for (let collToWrite of[db1Coll, db2Coll]) { + assert.writeOK(collToWrite.insert({_id: _idForTest})); + change = cst.getOneChange(aggCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, _idForTest); + assert.eq(change.ns.db, collToWrite.getDB().getName()); + _idForTest++; + } + + // Renaming the collection should invalidate the change stream. + // TODO SERVER-34088: cannot rename collections on mongoS. + assert.writeOK(collToInvalidate.renameCollection("renamed_coll")); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + collToInvalidate = testDB.getCollection("renamed_coll"); + + // Dropping a collection should invalidate the change stream. + aggCursor = cst.startWatchingAllChangesForCluster(); + assertDropCollection(testDB, collToInvalidate.getName()); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + + // Dropping a 'system' collection should invalidate the change stream. + // Create a view to ensure that the 'system.views' collection exists. + assert.commandWorked( + testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []})); + aggCursor = cst.startWatchingAllChangesForCluster(); + assertDropCollection(testDB, "system.views"); + cst.assertNextChangesEqual({ + cursor: aggCursor, + expectedChanges: [{operationType: "invalidate"}], + expectInvalidate: true + }); + } + + cst.cleanUp(); +}()); diff --git a/jstests/change_streams/change_stream_whole_cluster_resumability.js b/jstests/change_streams/change_stream_whole_cluster_resumability.js new file mode 100644 index 00000000000..df1041d0628 --- /dev/null +++ b/jstests/change_streams/change_stream_whole_cluster_resumability.js @@ -0,0 +1,60 @@ +// Basic tests for resuming a $changeStream that is open against all databases in a cluster. +(function() { + "use strict"; + + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + + // Create two databases, with one collection in each. + const testDBs = [db, db.getSiblingDB(`${db.getName()}_other`)]; + const[db1Coll, db2Coll] = + testDBs.map((db) => assertDropAndRecreateCollection(db, jsTestName())); + const adminDB = db.getSiblingDB("admin"); + + let cst = new ChangeStreamTest(adminDB); + let resumeCursor = cst.startWatchingAllChangesForCluster(); + + // Insert a document in the first database and save the resulting change stream. + assert.writeOK(db1Coll.insert({_id: 1})); + const firstInsertChangeDoc = cst.getOneChange(resumeCursor); + assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1}); + + // Test resume after the first insert. + resumeCursor = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + + // Write the next document into the second database. + assert.writeOK(db2Coll.insert({_id: 2})); + const secondInsertChangeDoc = cst.getOneChange(resumeCursor); + assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2}); + + // Write the third document into the first database again. + assert.writeOK(db1Coll.insert({_id: 3})); + const thirdInsertChangeDoc = cst.getOneChange(resumeCursor); + assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3}); + + // Test resuming after the first insert again. + resumeCursor = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc); + assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); + + // Test resume after second insert. + resumeCursor = cst.startWatchingChanges({ + pipeline: + [{$changeStream: {resumeAfter: secondInsertChangeDoc._id, allChangesForCluster: true}}], + collection: 1, + aggregateOptions: {cursor: {batchSize: 0}}, + }); + assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); + + cst.cleanUp(); +})(); diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/change_stream_whole_db.js index 4b2cd749c3e..77ecfe932d1 100644 --- a/jstests/change_streams/change_stream_whole_db.js +++ b/jstests/change_streams/change_stream_whole_db.js @@ -7,8 +7,7 @@ // assert[Valid|Invalid]ChangeStreamNss. load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. - // Test that a change stream cannot be opened on the "admin", "config", or "local" databases. - // TODO SERVER-34086: $changeStream may run against 'admin' if 'allChangesForCluster' is true. + // Test that a single-database change stream cannot be opened on "admin", "config", or "local". assertInvalidChangeStreamNss("admin", 1); assertInvalidChangeStreamNss("config", 1); if (!FixtureHelpers.isMongos(db)) { diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index 7d21884864b..434360aff4c 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -10,17 +10,18 @@ load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. load("jstests/replsets/libs/two_phase_drops.js"); // For 'TwoPhaseDropCollectionTest'. - let cst = new ChangeStreamTest(db); const coll = assertDropAndRecreateCollection(db, "change_post_image"); - function testUpdateLookup(coll, collToWatch) { + function testUpdateLookup(coll, collToWatch, changeStreamDB = db, changeStreamSpec = {}) { coll.drop(); + const cst = new ChangeStreamTest(changeStreamDB); + jsTestLog("Testing change streams without 'fullDocument' specified"); // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for // an insert. - let cursor = - cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collToWatch}); + let cursor = cst.startWatchingChanges( + {pipeline: [{$changeStream: changeStreamSpec}], collection: collToWatch}); assert.writeOK(coll.insert({_id: "fullDocument not specified"})); let latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "insert"); @@ -45,8 +46,9 @@ // Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the // result for an insert. + const defaultFullDocSpec = Object.assign({fullDocument: "default"}, changeStreamSpec); cursor = cst.startWatchingChanges( - {collection: collToWatch, pipeline: [{$changeStream: {fullDocument: "default"}}]}); + {collection: collToWatch, pipeline: [{$changeStream: defaultFullDocSpec}]}); assert.writeOK(coll.insert({_id: "fullDocument is default"})); latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "insert"); @@ -71,8 +73,9 @@ // Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in // the result for an insert. + const updateLookupSpec = Object.assign({fullDocument: "updateLookup"}, changeStreamSpec); cursor = cst.startWatchingChanges( - {collection: collToWatch, pipeline: [{$changeStream: {fullDocument: "updateLookup"}}]}); + {collection: collToWatch, pipeline: [{$changeStream: updateLookupSpec}]}); assert.writeOK(coll.insert({_id: "fullDocument is lookup"})); latestChange = cst.getOneChange(cursor); assert.eq(latestChange.operationType, "insert"); @@ -98,10 +101,7 @@ // in a 'fullDocument' with a value of null. cursor = cst.startWatchingChanges({ collection: collToWatch, - pipeline: [ - {$changeStream: {fullDocument: "updateLookup"}}, - {$match: {operationType: "update"}} - ] + pipeline: [{$changeStream: updateLookupSpec}, {$match: {operationType: "update"}}] }); assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updatedAgain: true}})); assert.writeOK(coll.remove({_id: "fullDocument is lookup"})); @@ -123,10 +123,12 @@ assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}})); // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet. + const resumeAfterDeleteAndUpdateLookupSpec = Object.assign( + {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}, changeStreamSpec); cursor = cst.startWatchingChanges({ collection: collToWatch, pipeline: [ - {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, + {$changeStream: resumeAfterDeleteAndUpdateLookupSpec}, {$match: {operationType: {$ne: "delete"}}} ], aggregateOptions: {cursor: {batchSize: 0}} @@ -136,7 +138,7 @@ let cursorBeforeDrop = cst.startWatchingChanges({ collection: collToWatch, pipeline: [ - {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, + {$changeStream: resumeAfterDeleteAndUpdateLookupSpec}, {$match: {operationType: {$ne: "delete"}}} ], aggregateOptions: {cursor: {batchSize: 0}} @@ -177,10 +179,10 @@ assert.eq(latestChange.fullDocument, null); // Test establishing new cursors with resume token on dropped collections fails. - let res = db.runCommand({ + let res = changeStreamDB.runCommand({ aggregate: collToWatch, pipeline: [ - {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}}, + {$changeStream: resumeAfterDeleteAndUpdateLookupSpec}, {$match: {operationType: "update"}} ], cursor: {batchSize: 0} @@ -207,8 +209,8 @@ // specified. const collInvalidate = assertDropAndRecreateCollection(db, "collInvalidate"); cursor = cst.startWatchingChanges({ - collection: collInvalidate.getName(), - pipeline: [{$changeStream: {fullDocument: "updateLookup"}}], + collection: isNumber(collToWatch) ? collToWatch : collInvalidate.getName(), + pipeline: [{$changeStream: updateLookupSpec}], aggregateOptions: {cursor: {batchSize: 0}} }); assert.writeOK(collInvalidate.insert({_id: "testing invalidate"})); @@ -229,7 +231,7 @@ cursor = cst.startWatchingChanges({ collection: collToWatch, - pipeline: [{$changeStream: {fullDocument: "updateLookup"}}], + pipeline: [{$changeStream: updateLookupSpec}], }); assert.writeOK(coll.update({_id: "getMoreEnabled"}, {$set: {updated: true}})); @@ -243,5 +245,6 @@ // Test update lookup with a change stream on a whole database. testUpdateLookup(coll, 1); - cst.cleanUp(); + // Test update lookup with a change stream on the whole cluster. + testUpdateLookup(coll, 1, db.getSiblingDB("admin"), {allChangesForCluster: true}); }()); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index a5740ffe15b..a27ec4fe9af 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -38,6 +38,19 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { }; /** + * Returns a change stream cursor that listens for every change in the cluster. Assumes that the + * ChangeStreamTest has been created on the 'admin' db, and will assert if not. It uses the + * 'aggregateOptions' if provided and saves the cursor so that it can be cleaned up later. + */ + self.startWatchingAllChangesForCluster = function(aggregateOptions) { + return self.startWatchingChanges({ + pipeline: [{$changeStream: {allChangesForCluster: true}}], + collection: 1, + aggregateOptions: aggregateOptions + }); + }; + + /** * Issues a 'getMore' on the provided cursor and returns the cursor returned. */ self.getNextBatch = function(cursor) { @@ -225,18 +238,23 @@ ChangeStreamTest.assertChangeStreamThrowsCode = function assertChangeStreamThrow /** * A set of functions to help validate the behaviour of $changeStreams for a given namespace. */ -function assertChangeStreamNssBehaviour(dbName, collName = "test", assertFunc) { +function assertChangeStreamNssBehaviour(dbName, collName = "test", options, assertFunc) { const testDb = db.getSiblingDB(dbName); - const res = - testDb.runCommand({aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {}}); + options = (options || {}); + const res = testDb.runCommand( + Object.assign({aggregate: collName, pipeline: [{$changeStream: options}], cursor: {}})); return assertFunc(res); } -function assertValidChangeStreamNss(dbName, collName = "test") { - const res = assertChangeStreamNssBehaviour(dbName, collName, assert.commandWorked); - assert.commandWorked( - db.getSiblingDB(dbName).runCommand({killCursors: collName, cursors: [res.cursor.id]})); +function assertValidChangeStreamNss(dbName, collName = "test", options) { + const res = assertChangeStreamNssBehaviour(dbName, collName, options, assert.commandWorked); + assert.commandWorked(db.getSiblingDB(dbName).runCommand( + {killCursors: (collName == 1 ? "$cmd.aggregate" : collName), cursors: [res.cursor.id]})); } -function assertInvalidChangeStreamNss(dbName, collName = "test") { +function assertInvalidChangeStreamNss(dbName, collName = "test", options) { assertChangeStreamNssBehaviour( - dbName, collName, (res) => assert.commandFailedWithCode(res, ErrorCodes.InvalidNamespace)); + dbName, + collName, + options, + (res) => assert.commandFailedWithCode( + res, [ErrorCodes.InvalidNamespace, ErrorCodes.InvalidOptions])); } diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js index 6299cc95404..64b7d0c698d 100644 --- a/jstests/multiVersion/change_streams_feature_compatibility_version.js +++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js @@ -21,56 +21,64 @@ const testDB = rst.getPrimary().getDB(jsTestName()); const adminDB = rst.getPrimary().getDB("admin"); - const coll = testDB[jsTestName()]; - // Explicitly set feature compatibility version 4.0. - let res = adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"}); - assert.commandWorked(res); - const resumeTime = res.$clusterTime.clusterTime; - - // Open and test a change stream using 4.0 features. - const cst = new ChangeStreamTest(testDB); - - const wholeDbCursor = - cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1}); - const resumeCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTime}}}], - collection: coll.getName() - }); - - assert.writeOK(coll.insert({_id: 0})); - let change = cst.getOneChange(wholeDbCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, 0); - - change = cst.getOneChange(resumeCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, 0); - - // Set the feature compatibility version to 3.6. - assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"})); - - // An already created change stream should continue to work. - assert.writeOK(coll.insert({_id: 1})); - change = cst.getOneChange(wholeDbCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, 1); - - change = cst.getOneChange(resumeCursor); - assert.eq(change.operationType, "insert", tojson(change)); - assert.eq(change.documentKey._id, 1); - - // Creating a new change stream with a 4.0 feature should fail. - assert.commandFailedWithCode( - testDB.runCommand({aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {}}), - ErrorCodes.QueryFeatureNotAllowed); - - assert.commandFailedWithCode(testDB.runCommand({ - aggregate: coll.getName(), - pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTime}}}], - cursor: {} - }), - ErrorCodes.QueryFeatureNotAllowed); + // Test both whole-db change streams and cluster-wide change streams. + const testCases = [{db: testDB, spec: {}}, {db: adminDB, spec: {allChangesForCluster: true}}]; + + for (let testCase of testCases) { + const coll = testDB[jsTestName()]; + coll.drop(); + + // Explicitly set feature compatibility version 4.0. + let res = adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"}); + assert.commandWorked(res); + const startAtTime = res.$clusterTime.clusterTime; + + // Open and test a change stream using 4.0 features. + const cst = new ChangeStreamTest(testCase.db); + + const multiCollectionCursor = + cst.startWatchingChanges({pipeline: [{$changeStream: testCase.spec}], collection: 1}); + const startAtSpec = + Object.assign({}, testCase.spec, {startAtClusterTime: {ts: startAtTime}}); + const startAtClusterTimeCursor = + cst.startWatchingChanges({pipeline: [{$changeStream: startAtSpec}], collection: 1}); + + assert.writeOK(coll.insert({_id: 0})); + let change = cst.getOneChange(multiCollectionCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 0); + + change = cst.getOneChange(startAtClusterTimeCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 0); + + // Set the feature compatibility version to 3.6. + assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"})); + + // An already created change stream should continue to work. + assert.writeOK(coll.insert({_id: 1})); + change = cst.getOneChange(multiCollectionCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 1); + + change = cst.getOneChange(startAtClusterTimeCursor); + assert.eq(change.operationType, "insert", tojson(change)); + assert.eq(change.documentKey._id, 1); + + // Creating a new change stream with a 4.0 feature should fail. + assert.commandFailedWithCode( + testCase.db.runCommand( + {aggregate: 1, pipeline: [{$changeStream: testCase.spec}], cursor: {}}), + ErrorCodes.QueryFeatureNotAllowed); + + assert.commandFailedWithCode(testDB.runCommand({ + aggregate: coll.getName(), + pipeline: [{$changeStream: {startAtClusterTime: {ts: startAtTime}}}], + cursor: {} + }), + ErrorCodes.QueryFeatureNotAllowed); + } rst.stopSet(); }()); diff --git a/jstests/noPassthrough/change_streams_whole_cluster_requires_test_commands_enabled.js b/jstests/noPassthrough/change_streams_whole_cluster_requires_test_commands_enabled.js new file mode 100644 index 00000000000..068300005dc --- /dev/null +++ b/jstests/noPassthrough/change_streams_whole_cluster_requires_test_commands_enabled.js @@ -0,0 +1,60 @@ +// Confirm that $changeStream can only run on an entire cluster if 'enableTestCommands' is true. +// Because the $changeStream stage requires a replica set to run, we tag this test as +// requires_replication. +// @tags: [requires_replication,requires_journaling] + +// TODO SERVER-34409: remove this test once cluster-wide $changeStream is feature-complete. +(function() { + 'use strict'; + + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + + let testDB = null; + let rst = null; + + // Creates and initiates a new ReplSetTest with a test database. + function startNewReplSet(name) { + rst = new ReplSetTest({name: name, nodes: 1}); + + if (!startSetIfSupportsReadMajority(rst)) { + rst.stopSet(); + return false; + } + rst.initiate(); + + testDB = rst.getPrimary().getDB(jsTestName()); + assert.commandWorked(testDB.test.insert({_id: 0})); + + return rst; + } + + jsTest.setOption('enableTestCommands', false); + + if (!startNewReplSet("prod")) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + // Confirm that we can run $changeStream on an individual collection with + // 'enableTestCommands:false', but not on the entire cluster. + assert.commandWorked( + testDB.runCommand({aggregate: "test", pipeline: [{$changeStream: {}}], cursor: {}})); + assert.commandFailedWithCode( + testDB.adminCommand( + {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}}), + ErrorCodes.QueryFeatureNotAllowed); + + rst.stopSet(); + + jsTest.setOption('enableTestCommands', true); + assert(startNewReplSet("test")); + + // Confirm that we can run $changeStream on an individual collection and the entire cluster with + // 'enableTestCommands:true'. + assert.commandWorked( + testDB.runCommand({aggregate: "test", pipeline: [{$changeStream: {}}], cursor: {}})); + assert.commandWorked(testDB.adminCommand( + {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}})); + + rst.stopSet(); +})();
\ No newline at end of file diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp index 0ef9addfaac..fc0614a2b8e 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.cpp +++ b/src/mongo/db/pipeline/document_source_change_stream.cpp @@ -242,19 +242,22 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( Timestamp startFrom, bool startFromInclusive) { auto nss = expCtx->ns; - auto onEntireDB = nss.isCollectionlessAggregateNS(); - const auto regexAllCollections = R"(\.(?!(\$|system\.)))"; + // If we have been permitted to run on admin, 'allChangesForCluster' must be true. + ChangeStreamType sourceType = (nss.isAdminDB() ? ChangeStreamType::kAllChangesForCluster + : (nss.isCollectionlessAggregateNS() + ? ChangeStreamType::kSingleDatabase + : ChangeStreamType::kSingleCollection)); + + // Regular expressions that match all oplog entries on supported databases and collections. + const auto regexAllCollections = R"(\.(?!(\$|system\.)))"_sd; + const auto regexAllDBs = "(?!(admin|config|local)).+"_sd; + const auto regexCmdColl = R"(\.\$cmd$)"_sd; // 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field. BSONArrayBuilder invalidatingCommands; invalidatingCommands.append(BSON("o.dropDatabase" << 1)); - // For change streams on an entire database, all collections drops and renames are considered - // invalidate entries. - if (onEntireDB) { - invalidatingCommands.append(BSON("o.drop" << BSON("$exists" << true))); - invalidatingCommands.append(BSON("o.renameCollection" << BSON("$exists" << true))); - } else { + if (sourceType == ChangeStreamType::kSingleCollection) { invalidatingCommands.append(BSON("o.drop" << nss.coll())); invalidatingCommands.append(BSON("o.renameCollection" << nss.ns())); if (expCtx->collation.isEmpty()) { @@ -265,15 +268,29 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( invalidatingCommands.append( BSON("o.create" << nss.coll() << "o.collation" << BSON("$exists" << true))); } + } else { + // For change streams on an entire database, the stream is invalidated if any collections in + // that database are dropped or renamed. For cluster-wide streams, drops or renames of any + // collection in any database (aside from the internal databases admin, config and local) + // will invalidate the stream. + invalidatingCommands.append(BSON("o.drop" << BSON("$exists" << true))); + invalidatingCommands.append(BSON("o.renameCollection" << BSON("$exists" << true))); } - // 1.1) Commands that are on target db and one of the above. + // For cluster-wide $changeStream, match the command namespace of any database other than admin, + // config, or local. Otherwise, match only against the target db's command namespace. + auto cmdNsFilter = (sourceType == ChangeStreamType::kAllChangesForCluster + ? BSON("ns" << BSONRegEx("^" + regexAllDBs + regexCmdColl)) + : BSON("ns" << nss.getCommandNS().ns())); + + // 1.1) Commands that are on target db(s) and one of the above invalidating commands. auto commandsOnTargetDb = - BSON("$and" << BSON_ARRAY(BSON("ns" << nss.getCommandNS().ns()) - << BSON("$or" << invalidatingCommands.arr()))); + BSON("$and" << BSON_ARRAY(cmdNsFilter << BSON("$or" << invalidatingCommands.arr()))); // 1.2) Supported commands that have arbitrary db namespaces in "ns" field. - auto renameDropTarget = BSON("o.to" << nss.ns()); + auto renameDropTarget = (sourceType == ChangeStreamType::kAllChangesForCluster + ? BSON("o.to" << BSON("$exists" << true)) + : BSON("o.to" << nss.ns())); // All supported commands that are either (1.1) or (1.2). BSONObj commandMatch = BSON("op" @@ -290,19 +307,27 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter( << "migrateChunkToNewShard"); // 2) Supported operations on the target namespace. - BSONObj opMatch; - if (onEntireDB) { - // Match all namespaces that start with db name, followed by ".", then not followed by - // '$' or 'system.' - opMatch = BSON("ns" << BSONRegEx("^" + nss.db() + regexAllCollections) - << OR(normalOpTypeMatch, chunkMigratedMatch)); - } else { - opMatch = BSON("ns" << nss.ns() << OR(normalOpTypeMatch, chunkMigratedMatch)); + BSONObj nsMatch; + switch (sourceType) { + case ChangeStreamType::kSingleCollection: + // Match the target namespace exactly. + nsMatch = BSON("ns" << nss.ns()); + break; + case ChangeStreamType::kSingleDatabase: + // Match all namespaces that start with db name, followed by ".", then NOT followed by + // '$' or 'system.' + nsMatch = BSON("ns" << BSONRegEx("^" + nss.db() + regexAllCollections)); + break; + case ChangeStreamType::kAllChangesForCluster: + // Match all namespaces that start with any db name other than admin, config, or local, + // followed by ".", then NOT followed by '$' or 'system.' + nsMatch = BSON("ns" << BSONRegEx("^" + regexAllDBs + regexAllCollections)); } + auto opMatch = BSON(nsMatch["ns"] << OR(normalOpTypeMatch, chunkMigratedMatch)); // Match oplog entries after "start" and are either supported (1) commands or (2) operations, - // excepting those tagged "fromMigrate". - // Include the resume token, if resuming, so we can verify it was still present in the oplog. + // excepting those tagged "fromMigrate". Include the resume token, if resuming, so we can verify + // it was still present in the oplog. return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom) << BSON(OR(opMatch, commandMatch)) << BSON("fromMigrate" << NE << true))); @@ -397,38 +422,11 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson( // A change stream is a tailable + awaitData cursor. expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData; - // Prevent $changeStream from running on an entire database (or cluster-wide) unless we are in - // test mode. - // TODO SERVER-34283: remove once whole-database $changeStream is feature-complete. - uassert(ErrorCodes::QueryFeatureNotAllowed, - "Running $changeStream on an entire database or cluster is not permitted unless the " - "deployment is in test mode.", - !(expCtx->ns.isCollectionlessAggregateNS() && !getTestCommandsEnabled())); - - // Change stream on an entire database is a new 4.0 feature. - uassert(ErrorCodes::QueryFeatureNotAllowed, - str::stream() << "$changeStream on an entire database is not allowed in the current " - "feature compatibility version. See " - << feature_compatibility_version_documentation::kCompatibilityLink - << " for more information.", - !expCtx->ns.isCollectionlessAggregateNS() || - serverGlobalParams.featureCompatibility.getVersion() >= - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40); - auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"), elem.embeddedObject()); - // TODO SERVER-34086: $changeStream may run against the 'admin' database iff - // 'allChangesForCluster' is true. - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db() - << " database", - !(expCtx->ns.isAdminDB() || expCtx->ns.isLocal() || expCtx->ns.isConfigDB())); - - uassert(ErrorCodes::InvalidNamespace, - str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.ns() - << " collection", - !expCtx->ns.isSystem()); + // Make sure that it is legal to run this $changeStream before proceeding. + DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec); boost::optional<Timestamp> startFrom; intrusive_ptr<DocumentSource> resumeStage = nullptr; @@ -496,6 +494,50 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or return newCmd.freeze().toBson(); } +void DocumentSourceChangeStream::assertIsLegalSpecification( + const intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) { + // Prevent $changeStream from running on an entire database (or cluster-wide) unless we are in + // test mode. + // TODO SERVER-34283: remove once whole-database $changeStream is feature-complete. + uassert(ErrorCodes::QueryFeatureNotAllowed, + "Running $changeStream on an entire database or cluster is not permitted unless the " + "deployment is in test mode.", + !(expCtx->ns.isCollectionlessAggregateNS() && !getTestCommandsEnabled())); + + // Change stream on an entire database is a new 4.0 feature. + uassert(ErrorCodes::QueryFeatureNotAllowed, + str::stream() << "$changeStream on an entire database is not allowed in the current " + "feature compatibility version. See " + << feature_compatibility_version_documentation::kCompatibilityLink + << " for more information.", + !expCtx->ns.isCollectionlessAggregateNS() || + serverGlobalParams.featureCompatibility.getVersion() >= + ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40); + + // If 'allChangesForCluster' is true, the stream must be opened on the 'admin' database with + // {aggregate: 1}. + uassert(ErrorCodes::InvalidOptions, + str::stream() << "A $changeStream with 'allChangesForCluster:true' may only be opened " + "on the 'admin' database, and with no collection name; found " + << expCtx->ns.ns(), + !spec.getAllChangesForCluster() || + (expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS())); + + // Prevent $changeStream from running on internal databases. A stream may run against the + // 'admin' database iff 'allChangesForCluster' is true. + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db() + << " database", + expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster() + : (!expCtx->ns.isLocal() && !expCtx->ns.isConfigDB())); + + // Prevent $changeStream from running on internal collections in any database. + uassert(ErrorCodes::InvalidNamespace, + str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.ns() + << " collection", + !expCtx->ns.isSystem()); +} + intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage( BSONObj changeStreamSpec, const intrusive_ptr<ExpressionContext>& expCtx) { // Mark the transformation stage as independent of any collection if the change stream is diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h index 8d39159dc0f..bbaa868169b 100644 --- a/src/mongo/db/pipeline/document_source_change_stream.h +++ b/src/mongo/db/pipeline/document_source_change_stream.h @@ -200,6 +200,14 @@ public: const BSONObj resumeToken); private: + enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster }; + + // Helper function which throws if the $changeStream fails any of a series of semantic checks. + // For instance, whether it is permitted to run given the current FCV, whether the namespace is + // valid for the options specified in the spec, etc. + static void assertIsLegalSpecification(const boost::intrusive_ptr<ExpressionContext>& expCtx, + const DocumentSourceChangeStreamSpec& spec); + // It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson() // instead. DocumentSourceChangeStream() = default; diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp index c6f5c456c7d..3695aaf9d09 100644 --- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp +++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp @@ -82,13 +82,16 @@ NamespaceString DocumentSourceLookupChangePostImage::assertValidNamespace( auto collectionName = assertFieldHasType(namespaceObject, "coll"_sd, BSONType::String); NamespaceString nss(dbName.getString(), collectionName.getString()); - // Change streams on an entire database only need to verify that the database names match. + // Change streams on an entire database only need to verify that the database names match. If + // the database is 'admin', then this is a cluster-wide $changeStream and we are permitted to + // lookup into any namespace. uassert(40579, str::stream() << "unexpected namespace during post image lookup: " << nss.ns() << ", expected " << pExpCtx->ns.ns(), nss == pExpCtx->ns || - (pExpCtx->ns.isCollectionlessAggregateNS() && nss.db() == pExpCtx->ns.db())); + (pExpCtx->isClusterAggregation() || pExpCtx->isDBAggregation(nss.db()))); + return nss; } diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl index 835fdbcb5ab..df287295d2c 100644 --- a/src/mongo/db/pipeline/document_sources.idl +++ b/src/mongo/db/pipeline/document_sources.idl @@ -95,6 +95,12 @@ structs: default: '"default"' description: A string '"updateLookup"' or '"default"', indicating whether or not we should return a full document or just changes for an update. + allChangesForCluster: + cpp_name: allChangesForCluster + type: bool + default: false + description: A flag indicating whether the stream should report all changes that occur + on the deployment, aside from those on internal databases or collections. ListSessionsUser: description: "A struct representing a $listSessions/$listLocalSessions User" diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h index b5e490e91d7..4172f4fef57 100644 --- a/src/mongo/db/pipeline/expression_context.h +++ b/src/mongo/db/pipeline/expression_context.h @@ -116,6 +116,20 @@ public: */ void checkForInterrupt(); + /** + * Returns true if this is a collectionless aggregation on the specified database. + */ + bool isDBAggregation(StringData dbName) const { + return ns.db() == dbName && ns.isCollectionlessAggregateNS(); + } + + /** + * Returns true if this is a collectionless aggregation on the 'admin' database. + */ + bool isClusterAggregation() const { + return ns.isAdminDB() && ns.isCollectionlessAggregateNS(); + } + const CollatorInterface* getCollator() const { return _collator; } |