summaryrefslogtreecommitdiff
path: root/jstests/change_streams/whole_cluster_resumability.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/change_streams/whole_cluster_resumability.js')
-rw-r--r--jstests/change_streams/whole_cluster_resumability.js290
1 files changed, 144 insertions, 146 deletions
diff --git a/jstests/change_streams/whole_cluster_resumability.js b/jstests/change_streams/whole_cluster_resumability.js
index 4a907315fd5..270f6c465db 100644
--- a/jstests/change_streams/whole_cluster_resumability.js
+++ b/jstests/change_streams/whole_cluster_resumability.js
@@ -1,169 +1,167 @@
// Basic tests for resuming a $changeStream that is open against all databases in a cluster.
(function() {
- "use strict";
+"use strict";
+
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
+
+// Create two databases, with one collection in each.
+const testDBs = [db.getSiblingDB(jsTestName()), db.getSiblingDB(jsTestName() + "_other")];
+let [db1Coll, db2Coll] = testDBs.map((db) => assertDropAndRecreateCollection(db, "test"));
+const adminDB = db.getSiblingDB("admin");
+
+let cst = new ChangeStreamTest(adminDB);
+let resumeCursor = cst.startWatchingAllChangesForCluster();
+
+// Insert a document in the first database and save the resulting change stream.
+assert.writeOK(db1Coll.insert({_id: 1}));
+const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
+assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
+
+// Test resume after the first insert.
+resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+});
+
+// Write the next document into the second database.
+assert.writeOK(db2Coll.insert({_id: 2}));
+const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
+assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
+
+// Write the third document into the first database again.
+assert.writeOK(db1Coll.insert({_id: 3}));
+const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
+assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
+
+// Test resuming after the first insert again.
+resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+});
+assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
+assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+// Test resume after second insert.
+resumeCursor = cst.startWatchingChanges({
+ pipeline:
+ [{$changeStream: {resumeAfter: secondInsertChangeDoc._id, allChangesForCluster: true}}],
+ collection: 1,
+ aggregateOptions: {cursor: {batchSize: 0}},
+});
+assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+
+// Rename the collection and obtain a resume token from the 'rename' notification. Skip this
+// test when running on a sharded collection, since these cannot be renamed.
+if (!FixtureHelpers.isSharded(db1Coll)) {
+ assertDropAndRecreateCollection(db1Coll.getDB(), db1Coll.getName());
+ const renameColl = db1Coll.getDB().getCollection("rename_coll");
+ assertDropCollection(renameColl.getDB(), renameColl.getName());
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
- load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers.
-
- // Create two databases, with one collection in each.
- const testDBs = [db.getSiblingDB(jsTestName()), db.getSiblingDB(jsTestName() + "_other")];
- let [db1Coll, db2Coll] = testDBs.map((db) => assertDropAndRecreateCollection(db, "test"));
- const adminDB = db.getSiblingDB("admin");
-
- let cst = new ChangeStreamTest(adminDB);
- let resumeCursor = cst.startWatchingAllChangesForCluster();
-
- // Insert a document in the first database and save the resulting change stream.
- assert.writeOK(db1Coll.insert({_id: 1}));
- const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
- assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
-
- // Test resume after the first insert.
resumeCursor = cst.startWatchingChanges({
- pipeline:
- [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}],
collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
+ pipeline: [{$changeStream: {allChangesForCluster: true}}],
+ aggregateOptions: {cursor: {batchSize: 0}}
});
+ assert.writeOK(db1Coll.renameCollection(renameColl.getName()));
- // Write the next document into the second database.
- assert.writeOK(db2Coll.insert({_id: 2}));
- const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
- assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
-
- // Write the third document into the first database again.
- assert.writeOK(db1Coll.insert({_id: 3}));
- const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
- assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
-
- // Test resuming after the first insert again.
- resumeCursor = cst.startWatchingChanges({
- pipeline:
- [{$changeStream: {resumeAfter: firstInsertChangeDoc._id, allChangesForCluster: true}}],
- collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
+ const renameChanges = cst.assertNextChangesEqual({
+ cursor: resumeCursor,
+ expectedChanges: [
+ {
+ operationType: "rename",
+ ns: {db: db1Coll.getDB().getName(), coll: db1Coll.getName()},
+ to: {db: renameColl.getDB().getName(), coll: renameColl.getName()}
+ },
+ ]
});
- assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
- assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
+ const resumeTokenRename = renameChanges[0]._id;
- // Test resume after second insert.
- resumeCursor = cst.startWatchingChanges({
- pipeline:
- [{$changeStream: {resumeAfter: secondInsertChangeDoc._id, allChangesForCluster: true}}],
- collection: 1,
- aggregateOptions: {cursor: {batchSize: 0}},
- });
- assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
-
- // Rename the collection and obtain a resume token from the 'rename' notification. Skip this
- // test when running on a sharded collection, since these cannot be renamed.
- if (!FixtureHelpers.isSharded(db1Coll)) {
- assertDropAndRecreateCollection(db1Coll.getDB(), db1Coll.getName());
- const renameColl = db1Coll.getDB().getCollection("rename_coll");
- assertDropCollection(renameColl.getDB(), renameColl.getName());
-
- resumeCursor = cst.startWatchingChanges({
- collection: 1,
- pipeline: [{$changeStream: {allChangesForCluster: true}}],
- aggregateOptions: {cursor: {batchSize: 0}}
- });
- assert.writeOK(db1Coll.renameCollection(renameColl.getName()));
-
- const renameChanges = cst.assertNextChangesEqual({
- cursor: resumeCursor,
- expectedChanges: [
- {
- operationType: "rename",
- ns: {db: db1Coll.getDB().getName(), coll: db1Coll.getName()},
- to: {db: renameColl.getDB().getName(), coll: renameColl.getName()}
- },
- ]
- });
- const resumeTokenRename = renameChanges[0]._id;
-
- // Insert into the renamed collection.
- assert.writeOK(renameColl.insert({_id: "after rename"}));
-
- // Resume from the rename notification using 'resumeAfter' and verify that the change stream
- // returns the next insert.
- let expectedInsert = {
- operationType: "insert",
- ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
- fullDocument: {_id: "after rename"},
- documentKey: {_id: "after rename"}
- };
- resumeCursor = cst.startWatchingChanges({
- collection: 1,
- pipeline:
- [{$changeStream: {resumeAfter: resumeTokenRename, allChangesForCluster: true}}],
- aggregateOptions: {cursor: {batchSize: 0}}
- });
- cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
-
- // Resume from the rename notification using 'startAfter' and verify that the change stream
- // returns the next insert.
- expectedInsert = {
- operationType: "insert",
- ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
- fullDocument: {_id: "after rename"},
- documentKey: {_id: "after rename"}
- };
- resumeCursor = cst.startWatchingChanges({
- collection: 1,
- pipeline:
- [{$changeStream: {startAfter: resumeTokenRename, allChangesForCluster: true}}],
- aggregateOptions: {cursor: {batchSize: 0}}
- });
- cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
-
- // Rename back to the original collection for reliability of the collection drops when
- // dropping the database.
- assert.writeOK(renameColl.renameCollection(db1Coll.getName()));
- }
-
- // Dropping a database should generate a 'drop' notification for the collection followed by a
- // 'dropDatabase' notification.
- resumeCursor = cst.startWatchingAllChangesForCluster();
- assert.commandWorked(testDBs[0].dropDatabase());
- const dropDbChanges = cst.assertDatabaseDrop({cursor: resumeCursor, db: testDBs[0]});
- const resumeTokenDbDrop = dropDbChanges[dropDbChanges.length - 1]._id;
-
- // Recreate the collection and insert a document.
- assert.writeOK(db1Coll.insert({_id: "after recreate"}));
+ // Insert into the renamed collection.
+ assert.writeOK(renameColl.insert({_id: "after rename"}));
+ // Resume from the rename notification using 'resumeAfter' and verify that the change stream
+ // returns the next insert.
let expectedInsert = {
operationType: "insert",
- ns: {db: testDBs[0].getName(), coll: db1Coll.getName()},
- fullDocument: {_id: "after recreate"},
- documentKey: {_id: "after recreate"}
+ ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
+ fullDocument: {_id: "after rename"},
+ documentKey: {_id: "after rename"}
};
-
- // Resume from the database drop using 'resumeAfter', and verify the change stream picks up
- // the insert.
resumeCursor = cst.startWatchingChanges({
collection: 1,
- pipeline: [{$changeStream: {resumeAfter: resumeTokenDbDrop, allChangesForCluster: true}}],
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenRename, allChangesForCluster: true}}],
aggregateOptions: {cursor: {batchSize: 0}}
});
- cst.consumeDropUpTo({
- cursor: resumeCursor,
- dropType: "dropDatabase",
- expectedNext: expectedInsert,
- });
+ cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
- // Resume from the database drop using 'startAfter', and verify the change stream picks up the
- // insert.
+ // Resume from the rename notification using 'startAfter' and verify that the change stream
+ // returns the next insert.
+ expectedInsert = {
+ operationType: "insert",
+ ns: {db: renameColl.getDB().getName(), coll: renameColl.getName()},
+ fullDocument: {_id: "after rename"},
+ documentKey: {_id: "after rename"}
+ };
resumeCursor = cst.startWatchingChanges({
collection: 1,
- pipeline: [{$changeStream: {startAfter: resumeTokenDbDrop, allChangesForCluster: true}}],
+ pipeline: [{$changeStream: {startAfter: resumeTokenRename, allChangesForCluster: true}}],
aggregateOptions: {cursor: {batchSize: 0}}
});
- cst.consumeDropUpTo({
- cursor: resumeCursor,
- dropType: "dropDatabase",
- expectedNext: expectedInsert,
- });
-
- cst.cleanUp();
+ cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: expectedInsert});
+
+ // Rename back to the original collection for reliability of the collection drops when
+ // dropping the database.
+ assert.writeOK(renameColl.renameCollection(db1Coll.getName()));
+}
+
+// Dropping a database should generate a 'drop' notification for the collection followed by a
+// 'dropDatabase' notification.
+resumeCursor = cst.startWatchingAllChangesForCluster();
+assert.commandWorked(testDBs[0].dropDatabase());
+const dropDbChanges = cst.assertDatabaseDrop({cursor: resumeCursor, db: testDBs[0]});
+const resumeTokenDbDrop = dropDbChanges[dropDbChanges.length - 1]._id;
+
+// Recreate the collection and insert a document.
+assert.writeOK(db1Coll.insert({_id: "after recreate"}));
+
+let expectedInsert = {
+ operationType: "insert",
+ ns: {db: testDBs[0].getName(), coll: db1Coll.getName()},
+ fullDocument: {_id: "after recreate"},
+ documentKey: {_id: "after recreate"}
+};
+
+// Resume from the database drop using 'resumeAfter', and verify the change stream picks up
+// the insert.
+resumeCursor = cst.startWatchingChanges({
+ collection: 1,
+ pipeline: [{$changeStream: {resumeAfter: resumeTokenDbDrop, allChangesForCluster: true}}],
+ aggregateOptions: {cursor: {batchSize: 0}}
+});
+cst.consumeDropUpTo({
+ cursor: resumeCursor,
+ dropType: "dropDatabase",
+ expectedNext: expectedInsert,
+});
+
+// Resume from the database drop using 'startAfter', and verify the change stream picks up the
+// insert.
+resumeCursor = cst.startWatchingChanges({
+ collection: 1,
+ pipeline: [{$changeStream: {startAfter: resumeTokenDbDrop, allChangesForCluster: true}}],
+ aggregateOptions: {cursor: {batchSize: 0}}
+});
+cst.consumeDropUpTo({
+ cursor: resumeCursor,
+ dropType: "dropDatabase",
+ expectedNext: expectedInsert,
+});
+
+cst.cleanUp();
})();