summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml7
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml7
-rw-r--r--buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml7
-rw-r--r--jstests/change_streams/change_stream.js10
-rw-r--r--jstests/change_streams/change_stream_whole_cluster.js47
-rw-r--r--jstests/change_streams/change_stream_whole_cluster_invalidations.js132
-rw-r--r--jstests/change_streams/change_stream_whole_cluster_resumability.js60
-rw-r--r--jstests/change_streams/change_stream_whole_db.js3
-rw-r--r--jstests/change_streams/lookup_post_image.js39
-rw-r--r--jstests/libs/change_stream_util.js36
-rw-r--r--jstests/multiVersion/change_streams_feature_compatibility_version.js106
-rw-r--r--jstests/noPassthrough/change_streams_whole_cluster_requires_test_commands_enabled.js60
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.cpp144
-rw-r--r--src/mongo/db/pipeline/document_source_change_stream.h8
-rw-r--r--src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp7
-rw-r--r--src/mongo/db/pipeline/document_sources.idl6
-rw-r--r--src/mongo/db/pipeline/expression_context.h14
17 files changed, 561 insertions, 132 deletions
diff --git a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
index ad76d5785d4..2afd262b338 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_mongos_passthrough.yml
@@ -8,9 +8,14 @@ selector:
# to work against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
# TODO: SERVER-32088 should fix resuming a change stream when not all shards have data.
+ - jstests/change_streams/change_stream_shell_helper.js
- jstests/change_streams/change_stream_whole_db_resumability.js
- jstests/change_streams/lookup_post_image.js
- - jstests/change_streams/change_stream_shell_helper.js
+ # TODO SERVER-34088 add support for cluster-wide change stream on sharded clusters
+ - jstests/change_streams/change_stream.js
+ - jstests/change_streams/change_stream_whole_cluster.js
+ - jstests/change_streams/change_stream_whole_cluster_invalidations.js
+ - jstests/change_streams/change_stream_whole_cluster_resumability.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
index 9972d983734..fde33fd8e77 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_secondary_reads.yml
@@ -9,6 +9,13 @@ selector:
- jstests/change_streams/only_wake_getmore_for_relevant_changes.js
# This test is not expected to work when run against a mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-34088 add support for cluster-wide change stream on sharded clusters
+ - jstests/change_streams/change_stream.js
+ - jstests/change_streams/change_stream_shell_helper.js
+ - jstests/change_streams/change_stream_whole_cluster.js
+ - jstests/change_streams/change_stream_whole_cluster_invalidations.js
+ - jstests/change_streams/change_stream_whole_cluster_resumability.js
+ - jstests/change_streams/lookup_post_image.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
index 827f88fe878..175ed177602 100644
--- a/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/change_streams_sharded_collections_passthrough.yml
@@ -6,6 +6,13 @@ selector:
exclude_files:
# Exercises an internal detail of mongos<->mongod communication. Not expected to work on mongos.
- jstests/change_streams/report_latest_observed_oplog_timestamp.js
+ # TODO SERVER-34088 add support for cluster-wide change stream on sharded clusters
+ - jstests/change_streams/change_stream.js
+ - jstests/change_streams/change_stream_shell_helper.js
+ - jstests/change_streams/change_stream_whole_cluster.js
+ - jstests/change_streams/change_stream_whole_cluster_invalidations.js
+ - jstests/change_streams/change_stream_whole_cluster_resumability.js
+ - jstests/change_streams/lookup_post_image.js
exclude_with_any_tags:
##
# The next three tags correspond to the special errors thrown by the
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index e7520d974b2..01d60bde295 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -19,6 +19,16 @@
assertInvalidChangeStreamNss("local");
}
+ // Test that a change stream can be opened on the admin database if {allChangesForCluster:true}
+ // is specified.
+ assertValidChangeStreamNss("admin", 1, {allChangesForCluster: true});
+ // Test that a change stream cannot be opened on the admin database if a collection is
+ // specified, even with {allChangesForCluster:true}.
+ assertInvalidChangeStreamNss("admin", "testcoll", {allChangesForCluster: true});
+ // Test that a change stream cannot be opened on a database other than admin if
+ // {allChangesForCluster:true} is specified.
+ assertInvalidChangeStreamNss(db.getName(), 1, {allChangesForCluster: true});
+
// Test that a change stream cannot be opened on 'system.' collections.
assertInvalidChangeStreamNss(db.getName(), "system.users");
assertInvalidChangeStreamNss(db.getName(), "system.profile");
diff --git a/jstests/change_streams/change_stream_whole_cluster.js b/jstests/change_streams/change_stream_whole_cluster.js
new file mode 100644
index 00000000000..b6a6c8a3483
--- /dev/null
+++ b/jstests/change_streams/change_stream_whole_cluster.js
@@ -0,0 +1,47 @@
+// Basic tests for $changeStream against all databases in the cluster.
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and
+ // assert[Valid|Invalid]ChangeStreamNss.
+
+ const adminDB = db.getSiblingDB("admin");
+ const otherDB = db.getSiblingDB(`${db.getName()}_other`);
+
+ let cst = new ChangeStreamTest(adminDB);
+ let cursor = cst.startWatchingAllChangesForCluster();
+
+ assertCreateCollection(db, "t1");
+ // Test that if there are no changes, we return an empty batch.
+ assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
+
+ // Test that the change stream returns an inserted doc.
+ assert.writeOK(db.t1.insert({_id: 0, a: 1}));
+ let expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, a: 1},
+ ns: {db: db.getName(), coll: "t1"},
+ operationType: "insert",
+ };
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Test that the change stream returns another inserted doc in a different database.
+ assert.writeOK(otherDB.t2.insert({_id: 0, a: 2}));
+ expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, a: 2},
+ ns: {db: otherDB.getName(), coll: "t2"},
+ operationType: "insert",
+ };
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Dropping either database should invalidate the change stream.
+ assert.commandWorked(otherDB.dropDatabase());
+ expected = {operationType: "invalidate"};
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Drop the remaining database and clean up the test.
+ assert.commandWorked(db.dropDatabase());
+ cst.cleanUp();
+}());
diff --git a/jstests/change_streams/change_stream_whole_cluster_invalidations.js b/jstests/change_streams/change_stream_whole_cluster_invalidations.js
new file mode 100644
index 00000000000..143d7e47b91
--- /dev/null
+++ b/jstests/change_streams/change_stream_whole_cluster_invalidations.js
@@ -0,0 +1,132 @@
+// Tests of invalidate entries for a $changeStream on a whole cluster.
+(function() {
+ "use strict";
+
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+ load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+
+ // Define two databases. We will conduct our tests by creating one collection in each.
+ const testDB1 = db, testDB2 = db.getSiblingDB(`${db.getName()}_other`);
+ const adminDB = db.getSiblingDB("admin");
+
+ // Create one collection on each database.
+ let [db1Coll, db2Coll] =
+ [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName()));
+
+ // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened
+ // on admin.
+ let cst = new ChangeStreamTest(adminDB);
+ let aggCursor = cst.startWatchingAllChangesForCluster();
+
+ // Generate oplog entries of type insert, update, and delete across both databases.
+ for (let coll of[db1Coll, db2Coll]) {
+ assert.writeOK(coll.insert({_id: 1}));
+ assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
+ assert.writeOK(coll.remove({_id: 1}));
+ }
+
+ // Drop the second database, which should invalidate the stream.
+ assert.commandWorked(testDB2.dropDatabase());
+
+ // We should get 7 oplog entries; three ops of type insert, update, delete from each database,
+ // and then an invalidate. The cursor should be closed.
+ for (let expectedDB of[testDB1, testDB2]) {
+ let change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "update", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "delete", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ }
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+
+ // Test that a cluster-wide change stream cannot be resumed using a token from a collection
+ // which has been dropped.
+ db1Coll = assertDropAndRecreateCollection(testDB1, jsTestName());
+
+ // Get a valid resume token that the next change stream can use.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+
+ assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ let change = cst.getOneChange(aggCursor, false);
+ const resumeToken = change._id;
+
+ // It should not possible to resume a change stream after a collection drop, even if the
+ // invalidate has not been received.
+ assertDropCollection(db1Coll, db1Coll.getName());
+ // Wait for two-phase drop to complete, so that the UUID no longer exists.
+ assert.soon(function() {
+ return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1,
+ db1Coll.getName());
+ });
+ assert.commandFailedWithCode(adminDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}],
+ cursor: {}
+ }),
+ 40615);
+
+ // Test that invalidation entries from any database invalidate the stream.
+ [db1Coll, db2Coll] =
+ [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName()));
+ let _idForTest = 0;
+ for (let collToInvalidate of[db1Coll, db2Coll]) {
+ // Start watching all changes in the cluster.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+
+ let testDB = collToInvalidate.getDB();
+
+ // Insert into the collections on both databases, and verify the change stream is able to
+ // pick them up.
+ for (let collToWrite of[db1Coll, db2Coll]) {
+ assert.writeOK(collToWrite.insert({_id: _idForTest}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, _idForTest);
+ assert.eq(change.ns.db, collToWrite.getDB().getName());
+ _idForTest++;
+ }
+
+ // Renaming the collection should invalidate the change stream.
+ // TODO SERVER-34088: cannot rename collections on mongoS.
+ assert.writeOK(collToInvalidate.renameCollection("renamed_coll"));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+ collToInvalidate = testDB.getCollection("renamed_coll");
+
+ // Dropping a collection should invalidate the change stream.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ assertDropCollection(testDB, collToInvalidate.getName());
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+
+ // Dropping a 'system' collection should invalidate the change stream.
+ // Create a view to ensure that the 'system.views' collection exists.
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ assertDropCollection(testDB, "system.views");
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+ }
+
+ cst.cleanUp();
+}());
diff --git a/jstests/change_streams/change_stream_whole_cluster_resumability.js b/jstests/change_streams/change_stream_whole_cluster_resumability.js
new file mode 100644
index 00000000000..df1041d0628
--- /dev/null
+++ b/jstests/change_streams/change_stream_whole_cluster_resumability.js
@@ -0,0 +1,60 @@
+// Basic tests for resuming a $changeStream that is open against all databases in a cluster.
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+
+ // Create two databases, with one collection in each.
+ const testDBs = [db, db.getSiblingDB(`${db.getName()}_other`)];
+ const[db1Coll, db2Coll] =
+ testDBs.map((db) => assertDropAndRecreateCollection(db, jsTestName()));
+ const adminDB = db.getSiblingDB("admin");
+
+ let cst = new ChangeStreamTest(adminDB);
+ let resumeCursor = cst.startWatchingAllChangesForCluster();
+
+ // Insert a document in the first database and save the resulting change stream.
+ assert.writeOK(db1Coll.insert({_id: 1}));
+ const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
+ assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
+
+ // Test resume after the first insert.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+
+ // Write the next document into the second database.
+ assert.writeOK(db2Coll.insert({_id: 2}));
+ const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
+ assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
+
+ // Write the third document into the first database again.
+ assert.writeOK(db1Coll.insert({_id: 3}));
+ const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
+ assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
+
+ // Test resuming after the first insert again.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
+ assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+ // Test resume after second insert.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: secondInsertChangeDoc._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+ cst.cleanUp();
+})();
diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/change_stream_whole_db.js
index 4b2cd749c3e..77ecfe932d1 100644
--- a/jstests/change_streams/change_stream_whole_db.js
+++ b/jstests/change_streams/change_stream_whole_db.js
@@ -7,8 +7,7 @@
// assert[Valid|Invalid]ChangeStreamNss.
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
- // Test that a change stream cannot be opened on the "admin", "config", or "local" databases.
- // TODO SERVER-34086: $changeStream may run against 'admin' if 'allChangesForCluster' is true.
+ // Test that a single-database change stream cannot be opened on "admin", "config", or "local".
assertInvalidChangeStreamNss("admin", 1);
assertInvalidChangeStreamNss("config", 1);
if (!FixtureHelpers.isMongos(db)) {
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index 7d21884864b..434360aff4c 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -10,17 +10,18 @@
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
load("jstests/replsets/libs/two_phase_drops.js"); // For 'TwoPhaseDropCollectionTest'.
- let cst = new ChangeStreamTest(db);
const coll = assertDropAndRecreateCollection(db, "change_post_image");
- function testUpdateLookup(coll, collToWatch) {
+ function testUpdateLookup(coll, collToWatch, changeStreamDB = db, changeStreamSpec = {}) {
coll.drop();
+ const cst = new ChangeStreamTest(changeStreamDB);
+
jsTestLog("Testing change streams without 'fullDocument' specified");
// Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for
// an insert.
- let cursor =
- cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collToWatch});
+ let cursor = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: changeStreamSpec}], collection: collToWatch});
assert.writeOK(coll.insert({_id: "fullDocument not specified"}));
let latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "insert");
@@ -45,8 +46,9 @@
// Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the
// result for an insert.
+ const defaultFullDocSpec = Object.assign({fullDocument: "default"}, changeStreamSpec);
cursor = cst.startWatchingChanges(
- {collection: collToWatch, pipeline: [{$changeStream: {fullDocument: "default"}}]});
+ {collection: collToWatch, pipeline: [{$changeStream: defaultFullDocSpec}]});
assert.writeOK(coll.insert({_id: "fullDocument is default"}));
latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "insert");
@@ -71,8 +73,9 @@
// Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in
// the result for an insert.
+ const updateLookupSpec = Object.assign({fullDocument: "updateLookup"}, changeStreamSpec);
cursor = cst.startWatchingChanges(
- {collection: collToWatch, pipeline: [{$changeStream: {fullDocument: "updateLookup"}}]});
+ {collection: collToWatch, pipeline: [{$changeStream: updateLookupSpec}]});
assert.writeOK(coll.insert({_id: "fullDocument is lookup"}));
latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "insert");
@@ -98,10 +101,7 @@
// in a 'fullDocument' with a value of null.
cursor = cst.startWatchingChanges({
collection: collToWatch,
- pipeline: [
- {$changeStream: {fullDocument: "updateLookup"}},
- {$match: {operationType: "update"}}
- ]
+ pipeline: [{$changeStream: updateLookupSpec}, {$match: {operationType: "update"}}]
});
assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updatedAgain: true}}));
assert.writeOK(coll.remove({_id: "fullDocument is lookup"}));
@@ -123,10 +123,12 @@
assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}}));
// Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet.
+ const resumeAfterDeleteAndUpdateLookupSpec = Object.assign(
+ {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}, changeStreamSpec);
cursor = cst.startWatchingChanges({
collection: collToWatch,
pipeline: [
- {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
+ {$changeStream: resumeAfterDeleteAndUpdateLookupSpec},
{$match: {operationType: {$ne: "delete"}}}
],
aggregateOptions: {cursor: {batchSize: 0}}
@@ -136,7 +138,7 @@
let cursorBeforeDrop = cst.startWatchingChanges({
collection: collToWatch,
pipeline: [
- {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
+ {$changeStream: resumeAfterDeleteAndUpdateLookupSpec},
{$match: {operationType: {$ne: "delete"}}}
],
aggregateOptions: {cursor: {batchSize: 0}}
@@ -177,10 +179,10 @@
assert.eq(latestChange.fullDocument, null);
// Test establishing new cursors with resume token on dropped collections fails.
- let res = db.runCommand({
+ let res = changeStreamDB.runCommand({
aggregate: collToWatch,
pipeline: [
- {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
+ {$changeStream: resumeAfterDeleteAndUpdateLookupSpec},
{$match: {operationType: "update"}}
],
cursor: {batchSize: 0}
@@ -207,8 +209,8 @@
// specified.
const collInvalidate = assertDropAndRecreateCollection(db, "collInvalidate");
cursor = cst.startWatchingChanges({
- collection: collInvalidate.getName(),
- pipeline: [{$changeStream: {fullDocument: "updateLookup"}}],
+ collection: isNumber(collToWatch) ? collToWatch : collInvalidate.getName(),
+ pipeline: [{$changeStream: updateLookupSpec}],
aggregateOptions: {cursor: {batchSize: 0}}
});
assert.writeOK(collInvalidate.insert({_id: "testing invalidate"}));
@@ -229,7 +231,7 @@
cursor = cst.startWatchingChanges({
collection: collToWatch,
- pipeline: [{$changeStream: {fullDocument: "updateLookup"}}],
+ pipeline: [{$changeStream: updateLookupSpec}],
});
assert.writeOK(coll.update({_id: "getMoreEnabled"}, {$set: {updated: true}}));
@@ -243,5 +245,6 @@
// Test update lookup with a change stream on a whole database.
testUpdateLookup(coll, 1);
- cst.cleanUp();
+ // Test update lookup with a change stream on the whole cluster.
+ testUpdateLookup(coll, 1, db.getSiblingDB("admin"), {allChangesForCluster: true});
}());
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index a5740ffe15b..a27ec4fe9af 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -38,6 +38,19 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
};
/**
+ * Returns a change stream cursor that listens for every change in the cluster. Assumes that the
+ * ChangeStreamTest has been created on the 'admin' db, and will assert if not. It uses the
+ * 'aggregateOptions' if provided and saves the cursor so that it can be cleaned up later.
+ */
+ self.startWatchingAllChangesForCluster = function(aggregateOptions) {
+ return self.startWatchingChanges({
+ pipeline: [{$changeStream: {allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: aggregateOptions
+ });
+ };
+
+ /**
* Issues a 'getMore' on the provided cursor and returns the cursor returned.
*/
self.getNextBatch = function(cursor) {
@@ -225,18 +238,23 @@ ChangeStreamTest.assertChangeStreamThrowsCode = function assertChangeStreamThrow
/**
* A set of functions to help validate the behaviour of $changeStreams for a given namespace.
*/
-function assertChangeStreamNssBehaviour(dbName, collName = "test", assertFunc) {
+function assertChangeStreamNssBehaviour(dbName, collName = "test", options, assertFunc) {
const testDb = db.getSiblingDB(dbName);
- const res =
- testDb.runCommand({aggregate: collName, pipeline: [{$changeStream: {}}], cursor: {}});
+ options = (options || {});
+ const res = testDb.runCommand(
+ Object.assign({aggregate: collName, pipeline: [{$changeStream: options}], cursor: {}}));
return assertFunc(res);
}
-function assertValidChangeStreamNss(dbName, collName = "test") {
- const res = assertChangeStreamNssBehaviour(dbName, collName, assert.commandWorked);
- assert.commandWorked(
- db.getSiblingDB(dbName).runCommand({killCursors: collName, cursors: [res.cursor.id]}));
+function assertValidChangeStreamNss(dbName, collName = "test", options) {
+ const res = assertChangeStreamNssBehaviour(dbName, collName, options, assert.commandWorked);
+ assert.commandWorked(db.getSiblingDB(dbName).runCommand(
+ {killCursors: (collName == 1 ? "$cmd.aggregate" : collName), cursors: [res.cursor.id]}));
}
-function assertInvalidChangeStreamNss(dbName, collName = "test") {
+function assertInvalidChangeStreamNss(dbName, collName = "test", options) {
assertChangeStreamNssBehaviour(
- dbName, collName, (res) => assert.commandFailedWithCode(res, ErrorCodes.InvalidNamespace));
+ dbName,
+ collName,
+ options,
+ (res) => assert.commandFailedWithCode(
+ res, [ErrorCodes.InvalidNamespace, ErrorCodes.InvalidOptions]));
}
diff --git a/jstests/multiVersion/change_streams_feature_compatibility_version.js b/jstests/multiVersion/change_streams_feature_compatibility_version.js
index 6299cc95404..64b7d0c698d 100644
--- a/jstests/multiVersion/change_streams_feature_compatibility_version.js
+++ b/jstests/multiVersion/change_streams_feature_compatibility_version.js
@@ -21,56 +21,64 @@
const testDB = rst.getPrimary().getDB(jsTestName());
const adminDB = rst.getPrimary().getDB("admin");
- const coll = testDB[jsTestName()];
- // Explicitly set feature compatibility version 4.0.
- let res = adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"});
- assert.commandWorked(res);
- const resumeTime = res.$clusterTime.clusterTime;
-
- // Open and test a change stream using 4.0 features.
- const cst = new ChangeStreamTest(testDB);
-
- const wholeDbCursor =
- cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- const resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTime}}}],
- collection: coll.getName()
- });
-
- assert.writeOK(coll.insert({_id: 0}));
- let change = cst.getOneChange(wholeDbCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 0);
-
- change = cst.getOneChange(resumeCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 0);
-
- // Set the feature compatibility version to 3.6.
- assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"}));
-
- // An already created change stream should continue to work.
- assert.writeOK(coll.insert({_id: 1}));
- change = cst.getOneChange(wholeDbCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 1);
-
- change = cst.getOneChange(resumeCursor);
- assert.eq(change.operationType, "insert", tojson(change));
- assert.eq(change.documentKey._id, 1);
-
- // Creating a new change stream with a 4.0 feature should fail.
- assert.commandFailedWithCode(
- testDB.runCommand({aggregate: 1, pipeline: [{$changeStream: {}}], cursor: {}}),
- ErrorCodes.QueryFeatureNotAllowed);
-
- assert.commandFailedWithCode(testDB.runCommand({
- aggregate: coll.getName(),
- pipeline: [{$changeStream: {startAtClusterTime: {ts: resumeTime}}}],
- cursor: {}
- }),
- ErrorCodes.QueryFeatureNotAllowed);
+ // Test both whole-db change streams and cluster-wide change streams.
+ const testCases = [{db: testDB, spec: {}}, {db: adminDB, spec: {allChangesForCluster: true}}];
+
+ for (let testCase of testCases) {
+ const coll = testDB[jsTestName()];
+ coll.drop();
+
+ // Explicitly set feature compatibility version 4.0.
+ let res = adminDB.runCommand({setFeatureCompatibilityVersion: "4.0"});
+ assert.commandWorked(res);
+ const startAtTime = res.$clusterTime.clusterTime;
+
+ // Open and test a change stream using 4.0 features.
+ const cst = new ChangeStreamTest(testCase.db);
+
+ const multiCollectionCursor =
+ cst.startWatchingChanges({pipeline: [{$changeStream: testCase.spec}], collection: 1});
+ const startAtSpec =
+ Object.assign({}, testCase.spec, {startAtClusterTime: {ts: startAtTime}});
+ const startAtClusterTimeCursor =
+ cst.startWatchingChanges({pipeline: [{$changeStream: startAtSpec}], collection: 1});
+
+ assert.writeOK(coll.insert({_id: 0}));
+ let change = cst.getOneChange(multiCollectionCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 0);
+
+ change = cst.getOneChange(startAtClusterTimeCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 0);
+
+ // Set the feature compatibility version to 3.6.
+ assert.commandWorked(adminDB.runCommand({setFeatureCompatibilityVersion: "3.6"}));
+
+ // An already created change stream should continue to work.
+ assert.writeOK(coll.insert({_id: 1}));
+ change = cst.getOneChange(multiCollectionCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 1);
+
+ change = cst.getOneChange(startAtClusterTimeCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, 1);
+
+ // Creating a new change stream with a 4.0 feature should fail.
+ assert.commandFailedWithCode(
+ testCase.db.runCommand(
+ {aggregate: 1, pipeline: [{$changeStream: testCase.spec}], cursor: {}}),
+ ErrorCodes.QueryFeatureNotAllowed);
+
+ assert.commandFailedWithCode(testDB.runCommand({
+ aggregate: coll.getName(),
+ pipeline: [{$changeStream: {startAtClusterTime: {ts: startAtTime}}}],
+ cursor: {}
+ }),
+ ErrorCodes.QueryFeatureNotAllowed);
+ }
rst.stopSet();
}());
diff --git a/jstests/noPassthrough/change_streams_whole_cluster_requires_test_commands_enabled.js b/jstests/noPassthrough/change_streams_whole_cluster_requires_test_commands_enabled.js
new file mode 100644
index 00000000000..068300005dc
--- /dev/null
+++ b/jstests/noPassthrough/change_streams_whole_cluster_requires_test_commands_enabled.js
@@ -0,0 +1,60 @@
+// Confirm that $changeStream can only run on an entire cluster if 'enableTestCommands' is true.
+// Because the $changeStream stage requires a replica set to run, we tag this test as
+// requires_replication.
+// @tags: [requires_replication,requires_journaling]
+
+// TODO SERVER-34409: remove this test once cluster-wide $changeStream is feature-complete.
+(function() {
+ 'use strict';
+
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+
+ let testDB = null;
+ let rst = null;
+
+ // Creates and initiates a new ReplSetTest with a test database.
+ function startNewReplSet(name) {
+ rst = new ReplSetTest({name: name, nodes: 1});
+
+ if (!startSetIfSupportsReadMajority(rst)) {
+ rst.stopSet();
+ return false;
+ }
+ rst.initiate();
+
+ testDB = rst.getPrimary().getDB(jsTestName());
+ assert.commandWorked(testDB.test.insert({_id: 0}));
+
+ return rst;
+ }
+
+ jsTest.setOption('enableTestCommands', false);
+
+ if (!startNewReplSet("prod")) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ // Confirm that we can run $changeStream on an individual collection with
+ // 'enableTestCommands:false', but not on the entire cluster.
+ assert.commandWorked(
+ testDB.runCommand({aggregate: "test", pipeline: [{$changeStream: {}}], cursor: {}}));
+ assert.commandFailedWithCode(
+ testDB.adminCommand(
+ {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}}),
+ ErrorCodes.QueryFeatureNotAllowed);
+
+ rst.stopSet();
+
+ jsTest.setOption('enableTestCommands', true);
+ assert(startNewReplSet("test"));
+
+ // Confirm that we can run $changeStream on an individual collection and the entire cluster with
+ // 'enableTestCommands:true'.
+ assert.commandWorked(
+ testDB.runCommand({aggregate: "test", pipeline: [{$changeStream: {}}], cursor: {}}));
+ assert.commandWorked(testDB.adminCommand(
+ {aggregate: 1, pipeline: [{$changeStream: {allChangesForCluster: true}}], cursor: {}}));
+
+ rst.stopSet();
+})(); \ No newline at end of file
diff --git a/src/mongo/db/pipeline/document_source_change_stream.cpp b/src/mongo/db/pipeline/document_source_change_stream.cpp
index 0ef9addfaac..fc0614a2b8e 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.cpp
+++ b/src/mongo/db/pipeline/document_source_change_stream.cpp
@@ -242,19 +242,22 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
Timestamp startFrom,
bool startFromInclusive) {
auto nss = expCtx->ns;
- auto onEntireDB = nss.isCollectionlessAggregateNS();
- const auto regexAllCollections = R"(\.(?!(\$|system\.)))";
+ // If we have been permitted to run on admin, 'allChangesForCluster' must be true.
+ ChangeStreamType sourceType = (nss.isAdminDB() ? ChangeStreamType::kAllChangesForCluster
+ : (nss.isCollectionlessAggregateNS()
+ ? ChangeStreamType::kSingleDatabase
+ : ChangeStreamType::kSingleCollection));
+
+ // Regular expressions that match all oplog entries on supported databases and collections.
+ const auto regexAllCollections = R"(\.(?!(\$|system\.)))"_sd;
+ const auto regexAllDBs = "(?!(admin|config|local)).+"_sd;
+ const auto regexCmdColl = R"(\.\$cmd$)"_sd;
// 1) Supported commands that have the target db namespace (e.g. test.$cmd) in "ns" field.
BSONArrayBuilder invalidatingCommands;
invalidatingCommands.append(BSON("o.dropDatabase" << 1));
- // For change streams on an entire database, all collections drops and renames are considered
- // invalidate entries.
- if (onEntireDB) {
- invalidatingCommands.append(BSON("o.drop" << BSON("$exists" << true)));
- invalidatingCommands.append(BSON("o.renameCollection" << BSON("$exists" << true)));
- } else {
+ if (sourceType == ChangeStreamType::kSingleCollection) {
invalidatingCommands.append(BSON("o.drop" << nss.coll()));
invalidatingCommands.append(BSON("o.renameCollection" << nss.ns()));
if (expCtx->collation.isEmpty()) {
@@ -265,15 +268,29 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
invalidatingCommands.append(
BSON("o.create" << nss.coll() << "o.collation" << BSON("$exists" << true)));
}
+ } else {
+ // For change streams on an entire database, the stream is invalidated if any collections in
+ // that database are dropped or renamed. For cluster-wide streams, drops or renames of any
+ // collection in any database (aside from the internal databases admin, config and local)
+ // will invalidate the stream.
+ invalidatingCommands.append(BSON("o.drop" << BSON("$exists" << true)));
+ invalidatingCommands.append(BSON("o.renameCollection" << BSON("$exists" << true)));
}
- // 1.1) Commands that are on target db and one of the above.
+ // For cluster-wide $changeStream, match the command namespace of any database other than admin,
+ // config, or local. Otherwise, match only against the target db's command namespace.
+ auto cmdNsFilter = (sourceType == ChangeStreamType::kAllChangesForCluster
+ ? BSON("ns" << BSONRegEx("^" + regexAllDBs + regexCmdColl))
+ : BSON("ns" << nss.getCommandNS().ns()));
+
+ // 1.1) Commands that are on target db(s) and one of the above invalidating commands.
auto commandsOnTargetDb =
- BSON("$and" << BSON_ARRAY(BSON("ns" << nss.getCommandNS().ns())
- << BSON("$or" << invalidatingCommands.arr())));
+ BSON("$and" << BSON_ARRAY(cmdNsFilter << BSON("$or" << invalidatingCommands.arr())));
// 1.2) Supported commands that have arbitrary db namespaces in "ns" field.
- auto renameDropTarget = BSON("o.to" << nss.ns());
+ auto renameDropTarget = (sourceType == ChangeStreamType::kAllChangesForCluster
+ ? BSON("o.to" << BSON("$exists" << true))
+ : BSON("o.to" << nss.ns()));
// All supported commands that are either (1.1) or (1.2).
BSONObj commandMatch = BSON("op"
@@ -290,19 +307,27 @@ BSONObj DocumentSourceChangeStream::buildMatchFilter(
<< "migrateChunkToNewShard");
// 2) Supported operations on the target namespace.
- BSONObj opMatch;
- if (onEntireDB) {
- // Match all namespaces that start with db name, followed by ".", then not followed by
- // '$' or 'system.'
- opMatch = BSON("ns" << BSONRegEx("^" + nss.db() + regexAllCollections)
- << OR(normalOpTypeMatch, chunkMigratedMatch));
- } else {
- opMatch = BSON("ns" << nss.ns() << OR(normalOpTypeMatch, chunkMigratedMatch));
+ BSONObj nsMatch;
+ switch (sourceType) {
+ case ChangeStreamType::kSingleCollection:
+ // Match the target namespace exactly.
+ nsMatch = BSON("ns" << nss.ns());
+ break;
+ case ChangeStreamType::kSingleDatabase:
+ // Match all namespaces that start with db name, followed by ".", then NOT followed by
+ // '$' or 'system.'
+ nsMatch = BSON("ns" << BSONRegEx("^" + nss.db() + regexAllCollections));
+ break;
+ case ChangeStreamType::kAllChangesForCluster:
+ // Match all namespaces that start with any db name other than admin, config, or local,
+ // followed by ".", then NOT followed by '$' or 'system.'
+ nsMatch = BSON("ns" << BSONRegEx("^" + regexAllDBs + regexAllCollections));
}
+ auto opMatch = BSON(nsMatch["ns"] << OR(normalOpTypeMatch, chunkMigratedMatch));
// Match oplog entries after "start" and are either supported (1) commands or (2) operations,
- // excepting those tagged "fromMigrate".
- // Include the resume token, if resuming, so we can verify it was still present in the oplog.
+ // excepting those tagged "fromMigrate". Include the resume token, if resuming, so we can verify
+ // it was still present in the oplog.
return BSON("$and" << BSON_ARRAY(BSON("ts" << (startFromInclusive ? GTE : GT) << startFrom)
<< BSON(OR(opMatch, commandMatch))
<< BSON("fromMigrate" << NE << true)));
@@ -397,38 +422,11 @@ list<intrusive_ptr<DocumentSource>> DocumentSourceChangeStream::createFromBson(
// A change stream is a tailable + awaitData cursor.
expCtx->tailableMode = TailableModeEnum::kTailableAndAwaitData;
- // Prevent $changeStream from running on an entire database (or cluster-wide) unless we are in
- // test mode.
- // TODO SERVER-34283: remove once whole-database $changeStream is feature-complete.
- uassert(ErrorCodes::QueryFeatureNotAllowed,
- "Running $changeStream on an entire database or cluster is not permitted unless the "
- "deployment is in test mode.",
- !(expCtx->ns.isCollectionlessAggregateNS() && !getTestCommandsEnabled()));
-
- // Change stream on an entire database is a new 4.0 feature.
- uassert(ErrorCodes::QueryFeatureNotAllowed,
- str::stream() << "$changeStream on an entire database is not allowed in the current "
- "feature compatibility version. See "
- << feature_compatibility_version_documentation::kCompatibilityLink
- << " for more information.",
- !expCtx->ns.isCollectionlessAggregateNS() ||
- serverGlobalParams.featureCompatibility.getVersion() >=
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40);
-
auto spec = DocumentSourceChangeStreamSpec::parse(IDLParserErrorContext("$changeStream"),
elem.embeddedObject());
- // TODO SERVER-34086: $changeStream may run against the 'admin' database iff
- // 'allChangesForCluster' is true.
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db()
- << " database",
- !(expCtx->ns.isAdminDB() || expCtx->ns.isLocal() || expCtx->ns.isConfigDB()));
-
- uassert(ErrorCodes::InvalidNamespace,
- str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.ns()
- << " collection",
- !expCtx->ns.isSystem());
+ // Make sure that it is legal to run this $changeStream before proceeding.
+ DocumentSourceChangeStream::assertIsLegalSpecification(expCtx, spec);
boost::optional<Timestamp> startFrom;
intrusive_ptr<DocumentSource> resumeStage = nullptr;
@@ -496,6 +494,50 @@ BSONObj DocumentSourceChangeStream::replaceResumeTokenInCommand(const BSONObj or
return newCmd.freeze().toBson();
}
+void DocumentSourceChangeStream::assertIsLegalSpecification(
+ const intrusive_ptr<ExpressionContext>& expCtx, const DocumentSourceChangeStreamSpec& spec) {
+ // Prevent $changeStream from running on an entire database (or cluster-wide) unless we are in
+ // test mode.
+ // TODO SERVER-34283: remove once whole-database $changeStream is feature-complete.
+ uassert(ErrorCodes::QueryFeatureNotAllowed,
+ "Running $changeStream on an entire database or cluster is not permitted unless the "
+ "deployment is in test mode.",
+ !(expCtx->ns.isCollectionlessAggregateNS() && !getTestCommandsEnabled()));
+
+ // Change stream on an entire database is a new 4.0 feature.
+ uassert(ErrorCodes::QueryFeatureNotAllowed,
+ str::stream() << "$changeStream on an entire database is not allowed in the current "
+ "feature compatibility version. See "
+ << feature_compatibility_version_documentation::kCompatibilityLink
+ << " for more information.",
+ !expCtx->ns.isCollectionlessAggregateNS() ||
+ serverGlobalParams.featureCompatibility.getVersion() >=
+ ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40);
+
+ // If 'allChangesForCluster' is true, the stream must be opened on the 'admin' database with
+ // {aggregate: 1}.
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "A $changeStream with 'allChangesForCluster:true' may only be opened "
+ "on the 'admin' database, and with no collection name; found "
+ << expCtx->ns.ns(),
+ !spec.getAllChangesForCluster() ||
+ (expCtx->ns.isAdminDB() && expCtx->ns.isCollectionlessAggregateNS()));
+
+ // Prevent $changeStream from running on internal databases. A stream may run against the
+ // 'admin' database iff 'allChangesForCluster' is true.
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.db()
+ << " database",
+ expCtx->ns.isAdminDB() ? spec.getAllChangesForCluster()
+ : (!expCtx->ns.isLocal() && !expCtx->ns.isConfigDB()));
+
+ // Prevent $changeStream from running on internal collections in any database.
+ uassert(ErrorCodes::InvalidNamespace,
+ str::stream() << "$changeStream may not be opened on the internal " << expCtx->ns.ns()
+ << " collection",
+ !expCtx->ns.isSystem());
+}
+
intrusive_ptr<DocumentSource> DocumentSourceChangeStream::createTransformationStage(
BSONObj changeStreamSpec, const intrusive_ptr<ExpressionContext>& expCtx) {
// Mark the transformation stage as independent of any collection if the change stream is
diff --git a/src/mongo/db/pipeline/document_source_change_stream.h b/src/mongo/db/pipeline/document_source_change_stream.h
index 8d39159dc0f..bbaa868169b 100644
--- a/src/mongo/db/pipeline/document_source_change_stream.h
+++ b/src/mongo/db/pipeline/document_source_change_stream.h
@@ -200,6 +200,14 @@ public:
const BSONObj resumeToken);
private:
+ enum class ChangeStreamType { kSingleCollection, kSingleDatabase, kAllChangesForCluster };
+
+ // Helper function which throws if the $changeStream fails any of a series of semantic checks.
+ // For instance, whether it is permitted to run given the current FCV, whether the namespace is
+ // valid for the options specified in the spec, etc.
+ static void assertIsLegalSpecification(const boost::intrusive_ptr<ExpressionContext>& expCtx,
+ const DocumentSourceChangeStreamSpec& spec);
+
// It is illegal to construct a DocumentSourceChangeStream directly, use createFromBson()
// instead.
DocumentSourceChangeStream() = default;
diff --git a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
index c6f5c456c7d..3695aaf9d09 100644
--- a/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
+++ b/src/mongo/db/pipeline/document_source_lookup_change_post_image.cpp
@@ -82,13 +82,16 @@ NamespaceString DocumentSourceLookupChangePostImage::assertValidNamespace(
auto collectionName = assertFieldHasType(namespaceObject, "coll"_sd, BSONType::String);
NamespaceString nss(dbName.getString(), collectionName.getString());
- // Change streams on an entire database only need to verify that the database names match.
+ // Change streams on an entire database only need to verify that the database names match. If
+ // the database is 'admin', then this is a cluster-wide $changeStream and we are permitted to
+ // lookup into any namespace.
uassert(40579,
str::stream() << "unexpected namespace during post image lookup: " << nss.ns()
<< ", expected "
<< pExpCtx->ns.ns(),
nss == pExpCtx->ns ||
- (pExpCtx->ns.isCollectionlessAggregateNS() && nss.db() == pExpCtx->ns.db()));
+ (pExpCtx->isClusterAggregation() || pExpCtx->isDBAggregation(nss.db())));
+
return nss;
}
diff --git a/src/mongo/db/pipeline/document_sources.idl b/src/mongo/db/pipeline/document_sources.idl
index 835fdbcb5ab..df287295d2c 100644
--- a/src/mongo/db/pipeline/document_sources.idl
+++ b/src/mongo/db/pipeline/document_sources.idl
@@ -95,6 +95,12 @@ structs:
default: '"default"'
description: A string '"updateLookup"' or '"default"', indicating whether or not we
should return a full document or just changes for an update.
+ allChangesForCluster:
+ cpp_name: allChangesForCluster
+ type: bool
+ default: false
+ description: A flag indicating whether the stream should report all changes that occur
+ on the deployment, aside from those on internal databases or collections.
ListSessionsUser:
description: "A struct representing a $listSessions/$listLocalSessions User"
diff --git a/src/mongo/db/pipeline/expression_context.h b/src/mongo/db/pipeline/expression_context.h
index b5e490e91d7..4172f4fef57 100644
--- a/src/mongo/db/pipeline/expression_context.h
+++ b/src/mongo/db/pipeline/expression_context.h
@@ -116,6 +116,20 @@ public:
*/
void checkForInterrupt();
+ /**
+ * Returns true if this is a collectionless aggregation on the specified database.
+ */
+ bool isDBAggregation(StringData dbName) const {
+ return ns.db() == dbName && ns.isCollectionlessAggregateNS();
+ }
+
+ /**
+ * Returns true if this is a collectionless aggregation on the 'admin' database.
+ */
+ bool isClusterAggregation() const {
+ return ns.isAdminDB() && ns.isCollectionlessAggregateNS();
+ }
+
const CollatorInterface* getCollator() const {
return _collator;
}