summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/change_stream.js10
-rw-r--r--jstests/change_streams/change_stream_whole_cluster.js47
-rw-r--r--jstests/change_streams/change_stream_whole_cluster_invalidations.js132
-rw-r--r--jstests/change_streams/change_stream_whole_cluster_resumability.js60
-rw-r--r--jstests/change_streams/change_stream_whole_db.js3
-rw-r--r--jstests/change_streams/lookup_post_image.js39
6 files changed, 271 insertions, 20 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index e7520d974b2..01d60bde295 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -19,6 +19,16 @@
assertInvalidChangeStreamNss("local");
}
+ // Test that a change stream can be opened on the admin database if {allChangesForCluster:true}
+ // is specified.
+ assertValidChangeStreamNss("admin", 1, {allChangesForCluster: true});
+ // Test that a change stream cannot be opened on the admin database if a collection is
+ // specified, even with {allChangesForCluster:true}.
+ assertInvalidChangeStreamNss("admin", "testcoll", {allChangesForCluster: true});
+ // Test that a change stream cannot be opened on a database other than admin if
+ // {allChangesForCluster:true} is specified.
+ assertInvalidChangeStreamNss(db.getName(), 1, {allChangesForCluster: true});
+
// Test that a change stream cannot be opened on 'system.' collections.
assertInvalidChangeStreamNss(db.getName(), "system.users");
assertInvalidChangeStreamNss(db.getName(), "system.profile");
diff --git a/jstests/change_streams/change_stream_whole_cluster.js b/jstests/change_streams/change_stream_whole_cluster.js
new file mode 100644
index 00000000000..b6a6c8a3483
--- /dev/null
+++ b/jstests/change_streams/change_stream_whole_cluster.js
@@ -0,0 +1,47 @@
+// Basic tests for $changeStream against all databases in the cluster.
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest and
+ // assert[Valid|Invalid]ChangeStreamNss.
+
+ const adminDB = db.getSiblingDB("admin");
+ const otherDB = db.getSiblingDB(`${db.getName()}_other`);
+
+ let cst = new ChangeStreamTest(adminDB);
+ let cursor = cst.startWatchingAllChangesForCluster();
+
+ assertCreateCollection(db, "t1");
+ // Test that if there are no changes, we return an empty batch.
+ assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
+
+ // Test that the change stream returns an inserted doc.
+ assert.writeOK(db.t1.insert({_id: 0, a: 1}));
+ let expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, a: 1},
+ ns: {db: db.getName(), coll: "t1"},
+ operationType: "insert",
+ };
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Test that the change stream returns another inserted doc in a different database.
+ assert.writeOK(otherDB.t2.insert({_id: 0, a: 2}));
+ expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, a: 2},
+ ns: {db: otherDB.getName(), coll: "t2"},
+ operationType: "insert",
+ };
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Dropping either database should invalidate the change stream.
+ assert.commandWorked(otherDB.dropDatabase());
+ expected = {operationType: "invalidate"};
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Drop the remaining database and clean up the test.
+ assert.commandWorked(db.dropDatabase());
+ cst.cleanUp();
+}());
diff --git a/jstests/change_streams/change_stream_whole_cluster_invalidations.js b/jstests/change_streams/change_stream_whole_cluster_invalidations.js
new file mode 100644
index 00000000000..143d7e47b91
--- /dev/null
+++ b/jstests/change_streams/change_stream_whole_cluster_invalidations.js
@@ -0,0 +1,132 @@
+// Tests of invalidate entries for a $changeStream on a whole cluster.
+(function() {
+ "use strict";
+
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+ load('jstests/replsets/libs/two_phase_drops.js'); // For 'TwoPhaseDropCollectionTest'.
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+
+ // Define two databases. We will conduct our tests by creating one collection in each.
+ const testDB1 = db, testDB2 = db.getSiblingDB(`${db.getName()}_other`);
+ const adminDB = db.getSiblingDB("admin");
+
+ // Create one collection on each database.
+ let [db1Coll, db2Coll] =
+ [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName()));
+
+ // Create a ChangeStreamTest on the 'admin' db. Cluster-wide change streams can only be opened
+ // on admin.
+ let cst = new ChangeStreamTest(adminDB);
+ let aggCursor = cst.startWatchingAllChangesForCluster();
+
+ // Generate oplog entries of type insert, update, and delete across both databases.
+ for (let coll of[db1Coll, db2Coll]) {
+ assert.writeOK(coll.insert({_id: 1}));
+ assert.writeOK(coll.update({_id: 1}, {$set: {a: 1}}));
+ assert.writeOK(coll.remove({_id: 1}));
+ }
+
+ // Drop the second database, which should invalidate the stream.
+ assert.commandWorked(testDB2.dropDatabase());
+
+ // We should get 7 oplog entries; three ops of type insert, update, delete from each database,
+ // and then an invalidate. The cursor should be closed.
+ for (let expectedDB of[testDB1, testDB2]) {
+ let change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "update", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "delete", tojson(change));
+ assert.eq(change.ns.db, expectedDB.getName(), tojson(change));
+ }
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+
+ // Test that a cluster-wide change stream cannot be resumed using a token from a collection
+ // which has been dropped.
+ db1Coll = assertDropAndRecreateCollection(testDB1, jsTestName());
+
+ // Get a valid resume token that the next change stream can use.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+
+ assert.writeOK(db1Coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ let change = cst.getOneChange(aggCursor, false);
+ const resumeToken = change._id;
+
+ // It should not possible to resume a change stream after a collection drop, even if the
+ // invalidate has not been received.
+ assertDropCollection(db1Coll, db1Coll.getName());
+ // Wait for two-phase drop to complete, so that the UUID no longer exists.
+ assert.soon(function() {
+ return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(testDB1,
+ db1Coll.getName());
+ });
+ assert.commandFailedWithCode(adminDB.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {resumeAfter: resumeToken, allChangesForCluster: true}}],
+ cursor: {}
+ }),
+ 40615);
+
+ // Test that invalidation entries from any database invalidate the stream.
+ [db1Coll, db2Coll] =
+ [testDB1, testDB2].map((testDB) => assertDropAndRecreateCollection(testDB, jsTestName()));
+ let _idForTest = 0;
+ for (let collToInvalidate of[db1Coll, db2Coll]) {
+ // Start watching all changes in the cluster.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+
+ let testDB = collToInvalidate.getDB();
+
+ // Insert into the collections on both databases, and verify the change stream is able to
+ // pick them up.
+ for (let collToWrite of[db1Coll, db2Coll]) {
+ assert.writeOK(collToWrite.insert({_id: _idForTest}));
+ change = cst.getOneChange(aggCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.documentKey._id, _idForTest);
+ assert.eq(change.ns.db, collToWrite.getDB().getName());
+ _idForTest++;
+ }
+
+ // Renaming the collection should invalidate the change stream.
+ // TODO SERVER-34088: cannot rename collections on mongoS.
+ assert.writeOK(collToInvalidate.renameCollection("renamed_coll"));
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+ collToInvalidate = testDB.getCollection("renamed_coll");
+
+ // Dropping a collection should invalidate the change stream.
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ assertDropCollection(testDB, collToInvalidate.getName());
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+
+ // Dropping a 'system' collection should invalidate the change stream.
+ // Create a view to ensure that the 'system.views' collection exists.
+ assert.commandWorked(
+ testDB.runCommand({create: "view1", viewOn: collToInvalidate.getName(), pipeline: []}));
+ aggCursor = cst.startWatchingAllChangesForCluster();
+ assertDropCollection(testDB, "system.views");
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [{operationType: "invalidate"}],
+ expectInvalidate: true
+ });
+ }
+
+ cst.cleanUp();
+}());
diff --git a/jstests/change_streams/change_stream_whole_cluster_resumability.js b/jstests/change_streams/change_stream_whole_cluster_resumability.js
new file mode 100644
index 00000000000..df1041d0628
--- /dev/null
+++ b/jstests/change_streams/change_stream_whole_cluster_resumability.js
@@ -0,0 +1,60 @@
+// Basic tests for resuming a $changeStream that is open against all databases in a cluster.
+(function() {
+ "use strict";
+
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+
+ // Create two databases, with one collection in each.
+ const testDBs = [db, db.getSiblingDB(`${db.getName()}_other`)];
+ const[db1Coll, db2Coll] =
+ testDBs.map((db) => assertDropAndRecreateCollection(db, jsTestName()));
+ const adminDB = db.getSiblingDB("admin");
+
+ let cst = new ChangeStreamTest(adminDB);
+ let resumeCursor = cst.startWatchingAllChangesForCluster();
+
+ // Insert a document in the first database and save the resulting change stream.
+ assert.writeOK(db1Coll.insert({_id: 1}));
+ const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
+ assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
+
+ // Test resume after the first insert.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+
+ // Write the next document into the second database.
+ assert.writeOK(db2Coll.insert({_id: 2}));
+ const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
+ assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
+
+ // Write the third document into the first database again.
+ assert.writeOK(db1Coll.insert({_id: 3}));
+ const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
+ assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
+
+ // Test resuming after the first insert again.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
+ assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+ // Test resume after second insert.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: secondInsertChangeDoc._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+ cst.cleanUp();
+})();
diff --git a/jstests/change_streams/change_stream_whole_db.js b/jstests/change_streams/change_stream_whole_db.js
index 4b2cd749c3e..77ecfe932d1 100644
--- a/jstests/change_streams/change_stream_whole_db.js
+++ b/jstests/change_streams/change_stream_whole_db.js
@@ -7,8 +7,7 @@
// assert[Valid|Invalid]ChangeStreamNss.
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
- // Test that a change stream cannot be opened on the "admin", "config", or "local" databases.
- // TODO SERVER-34086: $changeStream may run against 'admin' if 'allChangesForCluster' is true.
+ // Test that a single-database change stream cannot be opened on "admin", "config", or "local".
assertInvalidChangeStreamNss("admin", 1);
assertInvalidChangeStreamNss("config", 1);
if (!FixtureHelpers.isMongos(db)) {
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index 7d21884864b..434360aff4c 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -10,17 +10,18 @@
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
load("jstests/replsets/libs/two_phase_drops.js"); // For 'TwoPhaseDropCollectionTest'.
- let cst = new ChangeStreamTest(db);
const coll = assertDropAndRecreateCollection(db, "change_post_image");
- function testUpdateLookup(coll, collToWatch) {
+ function testUpdateLookup(coll, collToWatch, changeStreamDB = db, changeStreamSpec = {}) {
coll.drop();
+ const cst = new ChangeStreamTest(changeStreamDB);
+
jsTestLog("Testing change streams without 'fullDocument' specified");
// Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for
// an insert.
- let cursor =
- cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collToWatch});
+ let cursor = cst.startWatchingChanges(
+ {pipeline: [{$changeStream: changeStreamSpec}], collection: collToWatch});
assert.writeOK(coll.insert({_id: "fullDocument not specified"}));
let latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "insert");
@@ -45,8 +46,9 @@
// Test that specifying 'fullDocument' as 'default' does include a 'fullDocument' in the
// result for an insert.
+ const defaultFullDocSpec = Object.assign({fullDocument: "default"}, changeStreamSpec);
cursor = cst.startWatchingChanges(
- {collection: collToWatch, pipeline: [{$changeStream: {fullDocument: "default"}}]});
+ {collection: collToWatch, pipeline: [{$changeStream: defaultFullDocSpec}]});
assert.writeOK(coll.insert({_id: "fullDocument is default"}));
latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "insert");
@@ -71,8 +73,9 @@
// Test that specifying 'fullDocument' as 'updateLookup' does include a 'fullDocument' in
// the result for an insert.
+ const updateLookupSpec = Object.assign({fullDocument: "updateLookup"}, changeStreamSpec);
cursor = cst.startWatchingChanges(
- {collection: collToWatch, pipeline: [{$changeStream: {fullDocument: "updateLookup"}}]});
+ {collection: collToWatch, pipeline: [{$changeStream: updateLookupSpec}]});
assert.writeOK(coll.insert({_id: "fullDocument is lookup"}));
latestChange = cst.getOneChange(cursor);
assert.eq(latestChange.operationType, "insert");
@@ -98,10 +101,7 @@
// in a 'fullDocument' with a value of null.
cursor = cst.startWatchingChanges({
collection: collToWatch,
- pipeline: [
- {$changeStream: {fullDocument: "updateLookup"}},
- {$match: {operationType: "update"}}
- ]
+ pipeline: [{$changeStream: updateLookupSpec}, {$match: {operationType: "update"}}]
});
assert.writeOK(coll.update({_id: "fullDocument is lookup"}, {$set: {updatedAgain: true}}));
assert.writeOK(coll.remove({_id: "fullDocument is lookup"}));
@@ -123,10 +123,12 @@
assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}}));
// Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet.
+ const resumeAfterDeleteAndUpdateLookupSpec = Object.assign(
+ {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}, changeStreamSpec);
cursor = cst.startWatchingChanges({
collection: collToWatch,
pipeline: [
- {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
+ {$changeStream: resumeAfterDeleteAndUpdateLookupSpec},
{$match: {operationType: {$ne: "delete"}}}
],
aggregateOptions: {cursor: {batchSize: 0}}
@@ -136,7 +138,7 @@
let cursorBeforeDrop = cst.startWatchingChanges({
collection: collToWatch,
pipeline: [
- {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
+ {$changeStream: resumeAfterDeleteAndUpdateLookupSpec},
{$match: {operationType: {$ne: "delete"}}}
],
aggregateOptions: {cursor: {batchSize: 0}}
@@ -177,10 +179,10 @@
assert.eq(latestChange.fullDocument, null);
// Test establishing new cursors with resume token on dropped collections fails.
- let res = db.runCommand({
+ let res = changeStreamDB.runCommand({
aggregate: collToWatch,
pipeline: [
- {$changeStream: {fullDocument: "updateLookup", resumeAfter: deleteDocResumePoint}},
+ {$changeStream: resumeAfterDeleteAndUpdateLookupSpec},
{$match: {operationType: "update"}}
],
cursor: {batchSize: 0}
@@ -207,8 +209,8 @@
// specified.
const collInvalidate = assertDropAndRecreateCollection(db, "collInvalidate");
cursor = cst.startWatchingChanges({
- collection: collInvalidate.getName(),
- pipeline: [{$changeStream: {fullDocument: "updateLookup"}}],
+ collection: isNumber(collToWatch) ? collToWatch : collInvalidate.getName(),
+ pipeline: [{$changeStream: updateLookupSpec}],
aggregateOptions: {cursor: {batchSize: 0}}
});
assert.writeOK(collInvalidate.insert({_id: "testing invalidate"}));
@@ -229,7 +231,7 @@
cursor = cst.startWatchingChanges({
collection: collToWatch,
- pipeline: [{$changeStream: {fullDocument: "updateLookup"}}],
+ pipeline: [{$changeStream: updateLookupSpec}],
});
assert.writeOK(coll.update({_id: "getMoreEnabled"}, {$set: {updated: true}}));
@@ -243,5 +245,6 @@
// Test update lookup with a change stream on a whole database.
testUpdateLookup(coll, 1);
- cst.cleanUp();
+ // Test update lookup with a change stream on the whole cluster.
+ testUpdateLookup(coll, 1, db.getSiblingDB("admin"), {allChangesForCluster: true});
}());