summaryrefslogtreecommitdiff
path: root/jstests/change_streams/whole_db_resumability.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams/whole_db_resumability.js')
-rw-r--r--jstests/change_streams/whole_db_resumability.js362
1 files changed, 181 insertions, 181 deletions
diff --git a/jstests/change_streams/whole_db_resumability.js b/jstests/change_streams/whole_db_resumability.js
index b71495355fa..697f72ddcf9 100644
--- a/jstests/change_streams/whole_db_resumability.js
+++ b/jstests/change_streams/whole_db_resumability.js
@@ -3,199 +3,199 @@
// invalidated by a database drop.
// @tags: [do_not_run_in_whole_cluster_passthrough]
(function() {
- "use strict";
-
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
- load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
-
- // Drop and recreate the collections to be used in this set of tests.
- const testDB = db.getSiblingDB(jsTestName());
- let coll = assertDropAndRecreateCollection(testDB, "resume_coll");
- const otherColl = assertDropAndRecreateCollection(testDB, "resume_coll_other");
-
- let cst = new ChangeStreamTest(testDB);
- let resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
-
- // Insert a single document to each collection and save the resume token from the first insert.
- assert.writeOK(coll.insert({_id: 1}));
- assert.writeOK(otherColl.insert({_id: 2}));
- const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
- assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
- assert.eq(firstInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()});
-
- // Test resuming the change stream after the first insert should pick up the insert on the
- // second collection.
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
- collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
+"use strict";
+
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+
+// Drop and recreate the collections to be used in this set of tests.
+const testDB = db.getSiblingDB(jsTestName());
+let coll = assertDropAndRecreateCollection(testDB, "resume_coll");
+const otherColl = assertDropAndRecreateCollection(testDB, "resume_coll_other");
+
+let cst = new ChangeStreamTest(testDB);
+let resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+
+// Insert a single document to each collection and save the resume token from the first insert.
+assert.writeOK(coll.insert({_id: 1}));
+assert.writeOK(otherColl.insert({_id: 2}));
+const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
+assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
+assert.eq(firstInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()});
+
+// Test resuming the change stream after the first insert should pick up the insert on the
+// second collection.
+resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+});
+
+const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
+assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
+assert.eq(secondInsertChangeDoc.ns, {db: testDB.getName(), coll: otherColl.getName()});
+
+// Insert a third document to the first collection and test that the change stream picks it up.
+assert.writeOK(coll.insert({_id: 3}));
+const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
+assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
+assert.eq(thirdInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()});
+
+// Test resuming after the first insert again.
+resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+});
+assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
+assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+// Test resume after second insert.
+resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+});
+assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+// Rename the collection and attempt to resume from the 'rename' notification. Skip this
+// test when running on a sharded collection, since these cannot be renamed.
+if (!FixtureHelpers.isSharded(coll)) {
+ assertDropAndRecreateCollection(coll.getDB(), coll.getName());
+ const renameColl = coll.getDB().getCollection("rename_coll");
+ assertDropCollection(renameColl.getDB(), renameColl.getName());
+
+ resumeCursor = cst.startWatchingChanges({collection: 1, pipeline: [{$changeStream: {}}]});
+ assert.writeOK(coll.renameCollection(renameColl.getName()));
+
+ const renameChanges = cst.assertNextChangesEqual({
+ cursor: resumeCursor,
+ expectedChanges: [
+ {
+ operationType: "rename",
+ ns: {db: coll.getDB().getName(), coll: coll.getName()},
+ to: {db: renameColl.getDB().getName(), coll: renameColl.getName()}
+ },
+ ]
});
+ const resumeTokenRename = renameChanges[0]._id;
- const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
- assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
- assert.eq(secondInsertChangeDoc.ns, {db: testDB.getName(), coll: otherColl.getName()});
+ // Insert into the renamed collection.
+ assert.writeOK(renameColl.insert({_id: "after rename"}));
- // Insert a third document to the first collection and test that the change stream picks it up.
- assert.writeOK(coll.insert({_id: 3}));
- const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
- assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
- assert.eq(thirdInsertChangeDoc.ns, {db: testDB.getName(), coll: coll.getName()});
+ // Resume from the rename notification using 'resumeAfter' and verify that the change stream
+ // returns the next insert.
+ let expectedInsert = {
+ operationType: "insert",
+ ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
+ fullDocument: {_id: "after rename"},
+ documentKey: {_id: "after rename"}
+ };
+ resumeCursor = cst.startWatchingChanges(
+ {collection: 1, pipeline: [{$changeStream: {resumeAfter: resumeTokenRename}}]});
+ cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
- // Test resuming after the first insert again.
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
- collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
- assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
- assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+ // Resume from the rename notification using 'startAfter' and verify that the change stream
+ // returns the next insert.
+ expectedInsert = {
+ operationType: "insert",
+ ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
+ fullDocument: {_id: "after rename"},
+ documentKey: {_id: "after rename"}
+ };
+ resumeCursor = cst.startWatchingChanges(
+ {collection: 1, pipeline: [{$changeStream: {startAfter: resumeTokenRename}}]});
+ cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
- // Test resume after second insert.
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}],
- collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
- assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
-
- // Rename the collection and attempt to resume from the 'rename' notification. Skip this
- // test when running on a sharded collection, since these cannot be renamed.
- if (!FixtureHelpers.isSharded(coll)) {
- assertDropAndRecreateCollection(coll.getDB(), coll.getName());
- const renameColl = coll.getDB().getCollection("rename_coll");
- assertDropCollection(renameColl.getDB(), renameColl.getName());
-
- resumeCursor = cst.startWatchingChanges({collection: 1, pipeline: [{$changeStream: {}}]});
- assert.writeOK(coll.renameCollection(renameColl.getName()));
-
- const renameChanges = cst.assertNextChangesEqual({
- cursor: resumeCursor,
- expectedChanges: [
- {
- operationType: "rename",
- ns: {db: coll.getDB().getName(), coll: coll.getName()},
- to: {db: renameColl.getDB().getName(), coll: renameColl.getName()}
- },
- ]
- });
- const resumeTokenRename = renameChanges[0]._id;
-
- // Insert into the renamed collection.
- assert.writeOK(renameColl.insert({_id: "after rename"}));
-
- // Resume from the rename notification using 'resumeAfter' and verify that the change stream
- // returns the next insert.
- let expectedInsert = {
- operationType: "insert",
- ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
- fullDocument: {_id: "after rename"},
- documentKey: {_id: "after rename"}
- };
- resumeCursor = cst.startWatchingChanges(
- {collection: 1, pipeline: [{$changeStream: {resumeAfter: resumeTokenRename}}]});
- cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
-
- // Resume from the rename notification using 'startAfter' and verify that the change stream
- // returns the next insert.
- expectedInsert = {
- operationType: "insert",
- ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
- fullDocument: {_id: "after rename"},
- documentKey: {_id: "after rename"}
- };
- resumeCursor = cst.startWatchingChanges(
- {collection: 1, pipeline: [{$changeStream: {startAfter: resumeTokenRename}}]});
- cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
-
- // Rename back to the original collection for reliability of the collection drops when
- // dropping the database.
- assert.writeOK(renameColl.renameCollection(coll.getName()));
- }
-
- // Explicitly drop one collection to ensure reliability of the order of notifications from the
- // dropDatabase command.
- resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
- assertDropCollection(testDB, otherColl.getName());
- const firstCollDrop = cst.getOneChange(resumeCursor);
- assert.eq(firstCollDrop.operationType, "drop", tojson(firstCollDrop));
- assert.eq(firstCollDrop.ns, {db: testDB.getName(), coll: otherColl.getName()});
-
- // Dropping a database should generate a 'drop' notification for each collection, a
- // 'dropDatabase' notification, and finally an 'invalidate'.
- assert.commandWorked(testDB.dropDatabase());
- const dropDbChanges = cst.assertDatabaseDrop({cursor: resumeCursor, db: testDB});
- const secondCollDrop = dropDbChanges[0];
- // For sharded passthrough suites, we know that the last entry will be a 'dropDatabase' however
- // there may be multiple collection drops in 'dropDbChanges' depending on the number of involved
- // shards.
- const resumeTokenDropDb = dropDbChanges[dropDbChanges.length - 1]._id;
- const resumeTokenInvalidate =
- cst.assertNextChangesEqual(
- {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]})[0]
- ._id;
-
- // Test resuming from the first collection drop and the second collection drop as a result of
+ // Rename back to the original collection for reliability of the collection drops when
// dropping the database.
- [firstCollDrop, secondCollDrop].forEach(token => {
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: token._id}}],
- collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
- cst.assertDatabaseDrop({cursor: resumeCursor, db: testDB});
- cst.assertNextChangesEqual(
- {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
- });
-
- // Recreate the test collection.
- assert.writeOK(coll.insert({_id: "after recreate"}));
-
- // Test resuming from the 'dropDatabase' entry using 'resumeAfter'.
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {resumeAfter: resumeTokenDropDb}}],
- collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
+ assert.writeOK(renameColl.renameCollection(coll.getName()));
+}
+
+// Explicitly drop one collection to ensure reliability of the order of notifications from the
+// dropDatabase command.
+resumeCursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: 1});
+assertDropCollection(testDB, otherColl.getName());
+const firstCollDrop = cst.getOneChange(resumeCursor);
+assert.eq(firstCollDrop.operationType, "drop", tojson(firstCollDrop));
+assert.eq(firstCollDrop.ns, {db: testDB.getName(), coll: otherColl.getName()});
+
+// Dropping a database should generate a 'drop' notification for each collection, a
+// 'dropDatabase' notification, and finally an 'invalidate'.
+assert.commandWorked(testDB.dropDatabase());
+const dropDbChanges = cst.assertDatabaseDrop({cursor: resumeCursor, db: testDB});
+const secondCollDrop = dropDbChanges[0];
+// For sharded passthrough suites, we know that the last entry will be a 'dropDatabase' however
+// there may be multiple collection drops in 'dropDbChanges' depending on the number of involved
+// shards.
+const resumeTokenDropDb = dropDbChanges[dropDbChanges.length - 1]._id;
+const resumeTokenInvalidate =
cst.assertNextChangesEqual(
- {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
-
- // Test resuming from the 'invalidate' entry using 'resumeAfter'.
- assert.commandFailedWithCode(db.runCommand({
- aggregate: 1,
- pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
- cursor: {},
- collation: {locale: "simple"},
- }),
- ErrorCodes.InvalidResumeToken);
+ {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]})[0]
+ ._id;
- // Test resuming from the 'dropDatabase' entry using 'startAfter'.
+// Test resuming from the first collection drop and the second collection drop as a result of
+// dropping the database.
+[firstCollDrop, secondCollDrop].forEach(token => {
resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {startAfter: resumeTokenDropDb}}],
+ pipeline: [{$changeStream: {resumeAfter: token._id}}],
collection: 1,
aggregateOptions: {cursor: {batchSize: 0}},
});
+ cst.assertDatabaseDrop({cursor: resumeCursor, db: testDB});
cst.assertNextChangesEqual(
{cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
-
- // Test resuming from the 'invalidate' entry using 'startAfter' and verifies it picks up the
- // insert after recreating the db/collection.
- const expectedInsert = {
- operationType: "insert",
- ns: {db: testDB.getName(), coll: coll.getName()},
- fullDocument: {_id: "after recreate"},
- documentKey: {_id: "after recreate"}
- };
- resumeCursor = cst.startWatchingChanges({
- pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}],
- collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
- cst.consumeDropUpTo({
- cursor: resumeCursor,
- dropType: "dropDatabase",
- expectedNext: expectedInsert,
- });
-
- cst.cleanUp();
+});
+
+// Recreate the test collection.
+assert.writeOK(coll.insert({_id: "after recreate"}));
+
+// Test resuming from the 'dropDatabase' entry using 'resumeAfter'.
+resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenDropDb}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+});
+cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
+
+// Test resuming from the 'invalidate' entry using 'resumeAfter'.
+assert.commandFailedWithCode(db.runCommand({
+ aggregate: 1,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenInvalidate}}],
+ cursor: {},
+ collation: {locale: "simple"},
+}),
+ ErrorCodes.InvalidResumeToken);
+
+// Test resuming from the 'dropDatabase' entry using 'startAfter'.
+resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {startAfter: resumeTokenDropDb}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+});
+cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: [{operationType: "invalidate"}]});
+
+// Test resuming from the 'invalidate' entry using 'startAfter' and verifies it picks up the
+// insert after recreating the db/collection.
+const expectedInsert = {
+ operationType: "insert",
+ ns: {db: testDB.getName(), coll: coll.getName()},
+ fullDocument: {_id: "after recreate"},
+ documentKey: {_id: "after recreate"}
+};
+resumeCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {startAfter: resumeTokenInvalidate}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+});
+cst.consumeDropUpTo({
+ cursor: resumeCursor,
+ dropType: "dropDatabase",
+ expectedNext: expectedInsert,
+});
+
+cst.cleanUp();
})();