summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_stream_failover.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/noPassthrough/change_stream_failover.js')
-rw-r--r--jstests/noPassthrough/change_stream_failover.js161
1 files changed, 80 insertions, 81 deletions
diff --git a/jstests/noPassthrough/change_stream_failover.js b/jstests/noPassthrough/change_stream_failover.js
index 8168c7722de..b8ec132fdd8 100644
--- a/jstests/noPassthrough/change_stream_failover.js
+++ b/jstests/noPassthrough/change_stream_failover.js
@@ -3,90 +3,89 @@
// This test uses the WiredTiger storage engine, which does not support running without journaling.
// @tags: [requires_replication,requires_journaling]
(function() {
- "use strict";
- load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
- load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
- load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
-
- const rst = new ReplSetTest({nodes: 3});
- if (!startSetIfSupportsReadMajority(rst)) {
- jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
- rst.stopSet();
- return;
- }
-
- rst.initiate();
-
- for (let key of Object.keys(ChangeStreamWatchMode)) {
- const watchMode = ChangeStreamWatchMode[key];
- jsTestLog("Running test for mode " + watchMode);
-
- const primary = rst.getPrimary();
- const primaryDB = primary.getDB("test");
- const coll = assertDropAndRecreateCollection(primaryDB, "change_stream_failover");
-
- // Be sure we'll only read from the primary.
- primary.setReadPref("primary");
-
- // Open a changeStream on the primary.
- const cst =
- new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, primaryDB));
-
- let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll});
-
- // Be sure we can read from the change stream. Use {w: "majority"} so that we're still
- // guaranteed to be able to read after the failover.
- assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
- assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
- assert.writeOK(coll.insert({_id: 2}, {writeConcern: {w: "majority"}}));
-
- const firstChange = cst.getOneChange(changeStream);
- assert.docEq(firstChange.fullDocument, {_id: 0});
-
- // Make the primary step down
- assert.commandWorked(primaryDB.adminCommand({replSetStepDown: 30}));
-
- // Now wait for another primary to be elected.
- const newPrimary = rst.getPrimary();
- // Be sure we got a different node that the previous primary.
- assert.neq(newPrimary.port, primary.port);
-
- cst.assertNextChangesEqual({
- cursor: changeStream,
- expectedChanges: [{
+"use strict";
+load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+
+const rst = new ReplSetTest({nodes: 3});
+if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+}
+
+rst.initiate();
+
+for (let key of Object.keys(ChangeStreamWatchMode)) {
+ const watchMode = ChangeStreamWatchMode[key];
+ jsTestLog("Running test for mode " + watchMode);
+
+ const primary = rst.getPrimary();
+ const primaryDB = primary.getDB("test");
+ const coll = assertDropAndRecreateCollection(primaryDB, "change_stream_failover");
+
+ // Be sure we'll only read from the primary.
+ primary.setReadPref("primary");
+
+ // Open a changeStream on the primary.
+ const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, primaryDB));
+
+ let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll});
+
+ // Be sure we can read from the change stream. Use {w: "majority"} so that we're still
+ // guaranteed to be able to read after the failover.
+ assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(coll.insert({_id: 2}, {writeConcern: {w: "majority"}}));
+
+ const firstChange = cst.getOneChange(changeStream);
+ assert.docEq(firstChange.fullDocument, {_id: 0});
+
+ // Make the primary step down
+ assert.commandWorked(primaryDB.adminCommand({replSetStepDown: 30}));
+
+ // Now wait for another primary to be elected.
+ const newPrimary = rst.getPrimary();
+ // Be sure we got a different node that the previous primary.
+ assert.neq(newPrimary.port, primary.port);
+
+ cst.assertNextChangesEqual({
+ cursor: changeStream,
+ expectedChanges: [{
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1},
+ ns: {db: primaryDB.getName(), coll: coll.getName()},
+ operationType: "insert",
+ }]
+ });
+
+ // Now resume using the resume token from the first change (before the failover).
+ const resumeCursor =
+ cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id});
+
+ // Be sure we can read the 2nd and 3rd changes.
+ cst.assertNextChangesEqual({
+ cursor: resumeCursor,
+ expectedChanges: [
+ {
documentKey: {_id: 1},
fullDocument: {_id: 1},
ns: {db: primaryDB.getName(), coll: coll.getName()},
operationType: "insert",
- }]
- });
-
- // Now resume using the resume token from the first change (before the failover).
- const resumeCursor =
- cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id});
-
- // Be sure we can read the 2nd and 3rd changes.
- cst.assertNextChangesEqual({
- cursor: resumeCursor,
- expectedChanges: [
- {
- documentKey: {_id: 1},
- fullDocument: {_id: 1},
- ns: {db: primaryDB.getName(), coll: coll.getName()},
- operationType: "insert",
- },
- {
- documentKey: {_id: 2},
- fullDocument: {_id: 2},
- ns: {db: primaryDB.getName(), coll: coll.getName()},
- operationType: "insert",
- }
- ]
- });
-
- // Unfreeze the original primary so that it can stand for election again.
- assert.commandWorked(primaryDB.adminCommand({replSetFreeze: 0}));
- }
+ },
+ {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2},
+ ns: {db: primaryDB.getName(), coll: coll.getName()},
+ operationType: "insert",
+ }
+ ]
+ });
- rst.stopSet();
+ // Unfreeze the original primary so that it can stand for election again.
+ assert.commandWorked(primaryDB.adminCommand({replSetFreeze: 0}));
+}
+
+rst.stopSet();
}());