summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/metadata_notifications.js10
-rw-r--r--jstests/change_streams/whole_cluster.js8
-rw-r--r--jstests/change_streams/whole_cluster_metadata_notifications.js7
-rw-r--r--jstests/change_streams/whole_cluster_resumability.js46
-rw-r--r--jstests/change_streams/whole_db.js18
-rw-r--r--jstests/change_streams/whole_db_metadata_notifications.js29
-rw-r--r--jstests/change_streams/whole_db_resumability.js80
7 files changed, 167 insertions, 31 deletions
diff --git a/jstests/change_streams/metadata_notifications.js b/jstests/change_streams/metadata_notifications.js
index ea7559de77b..ef17b2b4249 100644
--- a/jstests/change_streams/metadata_notifications.js
+++ b/jstests/change_streams/metadata_notifications.js
@@ -1,4 +1,7 @@
// Tests of $changeStream notifications for metadata operations.
+// Do not run in whole-cluster passthrough since this test assumes that the change stream will be
+// invalidated by a database drop.
+// @tags: [do_not_run_in_whole_cluster_passthrough]
(function() {
"use strict";
@@ -27,6 +30,13 @@
assert.neq(change.id, 0);
assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch));
+ // Dropping the empty database should not generate any notification for the change stream, since
+ // the collection does not exist yet.
+ assert.commandWorked(db.dropDatabase());
+ change = cst.getNextBatch(cursor);
+ assert.neq(change.id, 0);
+ assert.eq(change.nextBatch.length, 0, tojson(change.nextBatch));
+
// After collection creation, we expect to see oplog entries for each subsequent operation.
let coll = assertCreateCollection(db, collName);
assert.writeOK(coll.insert({_id: 0}));
diff --git a/jstests/change_streams/whole_cluster.js b/jstests/change_streams/whole_cluster.js
index 9d154220b6e..894e90fc31d 100644
--- a/jstests/change_streams/whole_cluster.js
+++ b/jstests/change_streams/whole_cluster.js
@@ -119,14 +119,12 @@
assertDropCollection(db.getSiblingDB(dbName), "test");
});
- // Dropping a database should generate drop entries for each collection followed by an
- // invalidate.
- // TODO SERVER-35029: This test should not invalidate the stream once there's support for
- // returning a notification for the dropDatabase command.
+ // Dropping a database should generate drop entries for each collection followed by a database
+ // drop.
assert.commandWorked(otherDB.dropDatabase());
expected = [
{operationType: "drop", ns: {db: otherDB.getName(), coll: "t2"}},
- {operationType: "invalidate"}
+ {operationType: "dropDatabase", ns: {db: otherDB.getName()}},
];
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
diff --git a/jstests/change_streams/whole_cluster_metadata_notifications.js b/jstests/change_streams/whole_cluster_metadata_notifications.js
index 7c8e9c59f8f..471f31282ff 100644
--- a/jstests/change_streams/whole_cluster_metadata_notifications.js
+++ b/jstests/change_streams/whole_cluster_metadata_notifications.js
@@ -31,7 +31,8 @@
assert.writeOK(coll.remove({_id: 1}));
}
- // Drop the second database, which should invalidate the stream.
+ // Drop the second database, which should generate a 'drop' entry for the collection followed
+ // by a 'dropDatabase' entry.
assert.commandWorked(testDB2.dropDatabase());
// We should get 6 oplog entries; three ops of type insert, update, delete from each database.
@@ -50,10 +51,8 @@
cursor: aggCursor,
expectedChanges: [
{operationType: "drop", ns: {db: testDB2.getName(), coll: db2Coll.getName()}},
- // TODO SERVER-35029: Return an entry for a database drop, instead of "invalidate".
- {operationType: "invalidate"}
+ {operationType: "dropDatabase", ns: {db: testDB2.getName()}},
],
- expectInvalidate: true
});
// Test that a cluster-wide change stream can be resumed using a token from a collection which
diff --git a/jstests/change_streams/whole_cluster_resumability.js b/jstests/change_streams/whole_cluster_resumability.js
index 9058c58010a..ad9145ba74a 100644
--- a/jstests/change_streams/whole_cluster_resumability.js
+++ b/jstests/change_streams/whole_cluster_resumability.js
@@ -6,8 +6,8 @@
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
// Create two databases, with one collection in each.
- const testDBs = [db, db.getSiblingDB(jsTestName() + "_other")];
- const[db1Coll, db2Coll] = testDBs.map((db) => assertDropAndRecreateCollection(db, "test"));
+ const testDBs = [db.getSiblingDB(jsTestName()), db.getSiblingDB(jsTestName() + "_other")];
+ let [db1Coll, db2Coll] = testDBs.map((db) => assertDropAndRecreateCollection(db, "test"));
const adminDB = db.getSiblingDB("admin");
let cst = new ChangeStreamTest(adminDB);
@@ -55,5 +55,47 @@
});
assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+ // Dropping a database should generate a 'drop' notification for the collection followed by a
+ // 'dropDatabase' notification.
+ assert.commandWorked(testDBs[0].dropDatabase());
+ const dropDbChanges = cst.assertNextChangesEqual({
+ cursor: resumeCursor,
+ expectedChanges: [
+ {operationType: "drop", ns: {db: testDBs[0].getName(), coll: db1Coll.getName()}},
+ {operationType: "dropDatabase", ns: {db: testDBs[0].getName()}}
+ ]
+ });
+
+ // Recreate the database and verify that the change stream picks up another insert.
+ assert.writeOK(db1Coll.insert({_id: 5}));
+
+ let change = cst.getOneChange(resumeCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns, {db: testDBs[0].getName(), coll: db1Coll.getName()}, tojson(change));
+ assert.eq(change.fullDocument, {_id: 5}, tojson(change));
+
+ // Resume the change stream from the 'drop' entry.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: dropDbChanges[0]._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ change = cst.getOneChange(resumeCursor);
+ assert.eq(change.operationType, "dropDatabase", tojson(change));
+ assert.eq(change.ns, {db: testDBs[0].getName()}, tojson(change));
+
+ // Resume the change stream from the 'dropDatabase' entry.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: dropDbChanges[1]._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ change = cst.getOneChange(resumeCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.ns, {db: testDBs[0].getName(), coll: db1Coll.getName()}, tojson(change));
+ assert.eq(change.fullDocument, {_id: 5}, tojson(change));
+
cst.cleanUp();
})();
diff --git a/jstests/change_streams/whole_db.js b/jstests/change_streams/whole_db.js
index 8c287e8493f..82240dacd4b 100644
--- a/jstests/change_streams/whole_db.js
+++ b/jstests/change_streams/whole_db.js
@@ -1,4 +1,7 @@
// Basic tests for $changeStream against all collections in a database.
+// Do not run in whole-cluster passthrough since this test assumes that the change stream will be
+// invalidated by a database drop.
+// @tags: [do_not_run_in_whole_cluster_passthrough]
(function() {
"use strict";
@@ -8,6 +11,7 @@
load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
db = db.getSiblingDB(jsTestName());
+ assert.commandWorked(db.dropDatabase());
// Test that a single-database change stream cannot be opened on "admin", "config", or "local".
assertInvalidChangeStreamNss("admin", 1);
@@ -63,19 +67,5 @@
cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
});
- // Dropping the database should generate collection drop entries followed by an invalidate. Note
- // that the order of collection drops is not guaranteed so only check the database name.
- assert.commandWorked(db.dropDatabase());
- let change = cst.getOneChange(cursor);
- assert.eq(change.operationType, "drop", tojson(change));
- assert.eq(change.ns.db, db.getName(), tojson(change));
- change = cst.getOneChange(cursor);
- assert.eq(change.operationType, "drop", tojson(change));
- assert.eq(change.ns.db, db.getName(), tojson(change));
-
- // TODO SERVER-35029: Expect to see a 'dropDatabase' entry before the invalidate.
- change = cst.getOneChange(cursor, true);
- assert.eq(change.operationType, "invalidate", tojson(change));
-
cst.cleanUp();
}());
diff --git a/jstests/change_streams/whole_db_metadata_notifications.js b/jstests/change_streams/whole_db_metadata_notifications.js
index 6fa694aa1e8..0cb15d57680 100644
--- a/jstests/change_streams/whole_db_metadata_notifications.js
+++ b/jstests/change_streams/whole_db_metadata_notifications.js
@@ -1,4 +1,7 @@
// Tests of metadata notifications for a $changeStream on a whole database.
+// Do not run in whole-cluster passthrough since this test assumes that the change stream will be
+// invalidated by a database drop.
+// @tags: [do_not_run_in_whole_cluster_passthrough]
(function() {
"use strict";
@@ -159,6 +162,15 @@
assert.eq(0, cst.getNextBatch(aggCursor).nextBatch.length);
assert.writeOK(coll.insert({_id: 2}));
assert.eq(cst.getOneChange(aggCursor).operationType, "insert");
+
+ // Drop the new collection to avoid an additional 'drop' notification when the database is
+ // dropped.
+ assertDropCollection(testDB, "renamed_coll");
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges:
+ [{operationType: "drop", ns: {db: testDB.getName(), coll: "renamed_coll"}}],
+ });
}
// Dropping a collection should return a 'drop' entry.
@@ -228,6 +240,23 @@
// Drop the "system.views" collection to avoid view catalog errors in subsequent tests.
assertDropCollection(testDB, "system.views");
assertDropCollection(testDB, "non_system_collection");
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [
+ {operationType: "drop", ns: {db: testDB.getName(), coll: "non_system_collection"}},
+ ]
+ });
+
+ // Dropping the database should generate a 'dropDatabase' notification followed by an
+ // 'invalidate'.
+ assert.commandWorked(testDB.dropDatabase());
+ cst.assertNextChangesEqual({
+ cursor: aggCursor,
+ expectedChanges: [
+ {operationType: "dropDatabase", ns: {db: testDB.getName()}},
+ {operationType: "invalidate"}
+ ]
+ });
cst.cleanUp();
}());
diff --git a/jstests/change_streams/whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js
index 682d0546cb7..25f03943112 100644
--- a/jstests/change_streams/whole_db_resumability.js
+++ b/jstests/change_streams/whole_db_resumability.js
@@ -1,4 +1,7 @@
// Basic tests for resuming a $changeStream that is open against all collections in a database.
+// Do not run in whole-cluster passthrough since this test assumes that the change stream will be
+// invalidated by a database drop.
+// @tags: [do_not_run_in_whole_cluster_passthrough]
(function() {
"use strict";
@@ -6,10 +9,11 @@
load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
// Drop and recreate the collections to be used in this set of tests.
- const coll = assertDropAndRecreateCollection(db, jsTestName() + "resume_coll");
- const otherColl = assertDropAndRecreateCollection(db, jsTestName() + "resume_coll_other");
+ const testDB = db.getSiblingDB(jsTestName());
+ let coll = assertDropAndRecreateCollection(testDB, "resume_coll");
+ const otherColl = assertDropAndRecreateCollection(testDB, "resume_coll_other");
- let cst = new ChangeStreamTest(db);
+ let cst = new ChangeStreamTest(testDB);
let resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
// Insert a single document to each collection and save the resume token from the first insert.
@@ -17,7 +21,7 @@
assert.writeOK(otherColl.insert({_id: 2}));
const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
- assert.eq(firstInsertChangeDoc.ns, {db: "test", coll: coll.getName()});
+ assert.eq(firstInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()});
// Test resuming the change stream after the first insert should pick up the insert on the
// second collection.
@@ -29,13 +33,13 @@
const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
- assert.eq(secondInsertChangeDoc.ns, {db: "test", coll: otherColl.getName()});
+ assert.eq(secondInsertChangeDoc.ns, {db: testDB.getName(), coll: otherColl.getName()});
// Insert a third document to the first collection and test that the change stream picks it up.
assert.writeOK(coll.insert({_id: 3}));
const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
- assert.eq(thirdInsertChangeDoc.ns, {db: "test", coll: coll.getName()});
+ assert.eq(thirdInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()});
// Test resuming after the first insert again.
resumeCursor = cst.startWatchingChanges({
@@ -54,5 +58,69 @@
});
assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+ // Explicitly drop one collection to ensure reliability of the order of notifications from the
+ // dropDatabase command.
+ assertDropCollection(testDB, otherColl.getName());
+ const firstCollDrop = cst.getOneChange(resumeCursor);
+ assert.eq(firstCollDrop.operationType, "drop", tojson(firstCollDrop));
+ assert.eq(firstCollDrop.ns, {db: testDB.getName(), coll: otherColl.getName()});
+
+ // Dropping a database should generate a 'drop' notification for each collection, a
+ // 'dropDatabase' notification, and finally an 'invalidate'.
+ assert.commandWorked(testDB.dropDatabase());
+ const expectedChangesAfterFirstDrop = [
+ {operationType: "drop", ns: {db: testDB.getName(), coll: coll.getName()}},
+ {operationType: "dropDatabase", ns: {db: testDB.getName()}},
+ {operationType: "invalidate"}
+ ];
+ const dropDbChanges = cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop});
+
+ // Resume from the first collection drop.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: firstCollDrop._id}}],
+ collection: 1,
+ });
+ cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop});
+
+ // Resume from the second collection drop.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: dropDbChanges[0]._id}}],
+ collection: 1,
+ });
+ cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: expectedChangesAfterFirstDrop.slice(1)});
+
+ // Recreate the test collection.
+ coll = assertCreateCollection(testDB, coll.getName());
+ assert.writeOK(coll.insert({_id: 0}));
+
+ // Test resuming from the 'dropDatabase' entry.
+ // TODO SERVER-34789: Resuming from the 'dropDatabase' should return a single invalidate
+ // notification.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: dropDbChanges[1]._id}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ let change = cst.getOneChange(resumeCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.fullDocument, {_id: 0}, tojson(change));
+ assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}, tojson(change));
+
+ // Test resuming from the 'invalidate' entry.
+ // TODO SERVER-34789: Resuming from an invalidate should error or return an invalidate
+ // notification.
+ resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: dropDbChanges[2]._id}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ change = cst.getOneChange(resumeCursor);
+ assert.eq(change.operationType, "insert", tojson(change));
+ assert.eq(change.fullDocument, {_id: 0}, tojson(change));
+ assert.eq(change.ns, {db: testDB.getName(), coll: coll.getName()}, tojson(change));
+
cst.cleanUp();
})();