diff options
Diffstat (limited to 'jstests/noPassthrough/change_stream_failover.js')
-rw-r--r-- | jstests/noPassthrough/change_stream_failover.js | 161 |
1 files changed, 80 insertions, 81 deletions
diff --git a/jstests/noPassthrough/change_stream_failover.js b/jstests/noPassthrough/change_stream_failover.js index 8168c7722de..b8ec132fdd8 100644 --- a/jstests/noPassthrough/change_stream_failover.js +++ b/jstests/noPassthrough/change_stream_failover.js @@ -3,90 +3,89 @@ // This test uses the WiredTiger storage engine, which does not support running without journaling. // @tags: [requires_replication,requires_journaling] (function() { - "use strict"; - load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. - - const rst = new ReplSetTest({nodes: 3}); - if (!startSetIfSupportsReadMajority(rst)) { - jsTestLog("Skipping test since storage engine doesn't support majority read concern."); - rst.stopSet(); - return; - } - - rst.initiate(); - - for (let key of Object.keys(ChangeStreamWatchMode)) { - const watchMode = ChangeStreamWatchMode[key]; - jsTestLog("Running test for mode " + watchMode); - - const primary = rst.getPrimary(); - const primaryDB = primary.getDB("test"); - const coll = assertDropAndRecreateCollection(primaryDB, "change_stream_failover"); - - // Be sure we'll only read from the primary. - primary.setReadPref("primary"); - - // Open a changeStream on the primary. - const cst = - new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, primaryDB)); - - let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll}); - - // Be sure we can read from the change stream. Use {w: "majority"} so that we're still - // guaranteed to be able to read after the failover. - assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}})); - assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); - assert.writeOK(coll.insert({_id: 2}, {writeConcern: {w: "majority"}})); - - const firstChange = cst.getOneChange(changeStream); - assert.docEq(firstChange.fullDocument, {_id: 0}); - - // Make the primary step down - assert.commandWorked(primaryDB.adminCommand({replSetStepDown: 30})); - - // Now wait for another primary to be elected. - const newPrimary = rst.getPrimary(); - // Be sure we got a different node that the previous primary. - assert.neq(newPrimary.port, primary.port); - - cst.assertNextChangesEqual({ - cursor: changeStream, - expectedChanges: [{ +"use strict"; +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. +load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. +load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + +const rst = new ReplSetTest({nodes: 3}); +if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; +} + +rst.initiate(); + +for (let key of Object.keys(ChangeStreamWatchMode)) { + const watchMode = ChangeStreamWatchMode[key]; + jsTestLog("Running test for mode " + watchMode); + + const primary = rst.getPrimary(); + const primaryDB = primary.getDB("test"); + const coll = assertDropAndRecreateCollection(primaryDB, "change_stream_failover"); + + // Be sure we'll only read from the primary. + primary.setReadPref("primary"); + + // Open a changeStream on the primary. + const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, primaryDB)); + + let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll}); + + // Be sure we can read from the change stream. Use {w: "majority"} so that we're still + // guaranteed to be able to read after the failover. + assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}})); + assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); + assert.writeOK(coll.insert({_id: 2}, {writeConcern: {w: "majority"}})); + + const firstChange = cst.getOneChange(changeStream); + assert.docEq(firstChange.fullDocument, {_id: 0}); + + // Make the primary step down + assert.commandWorked(primaryDB.adminCommand({replSetStepDown: 30})); + + // Now wait for another primary to be elected. + const newPrimary = rst.getPrimary(); + // Be sure we got a different node that the previous primary. + assert.neq(newPrimary.port, primary.port); + + cst.assertNextChangesEqual({ + cursor: changeStream, + expectedChanges: [{ + documentKey: {_id: 1}, + fullDocument: {_id: 1}, + ns: {db: primaryDB.getName(), coll: coll.getName()}, + operationType: "insert", + }] + }); + + // Now resume using the resume token from the first change (before the failover). + const resumeCursor = + cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id}); + + // Be sure we can read the 2nd and 3rd changes. + cst.assertNextChangesEqual({ + cursor: resumeCursor, + expectedChanges: [ + { documentKey: {_id: 1}, fullDocument: {_id: 1}, ns: {db: primaryDB.getName(), coll: coll.getName()}, operationType: "insert", - }] - }); - - // Now resume using the resume token from the first change (before the failover). - const resumeCursor = - cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id}); - - // Be sure we can read the 2nd and 3rd changes. - cst.assertNextChangesEqual({ - cursor: resumeCursor, - expectedChanges: [ - { - documentKey: {_id: 1}, - fullDocument: {_id: 1}, - ns: {db: primaryDB.getName(), coll: coll.getName()}, - operationType: "insert", - }, - { - documentKey: {_id: 2}, - fullDocument: {_id: 2}, - ns: {db: primaryDB.getName(), coll: coll.getName()}, - operationType: "insert", - } - ] - }); - - // Unfreeze the original primary so that it can stand for election again. - assert.commandWorked(primaryDB.adminCommand({replSetFreeze: 0})); - } + }, + { + documentKey: {_id: 2}, + fullDocument: {_id: 2}, + ns: {db: primaryDB.getName(), coll: coll.getName()}, + operationType: "insert", + } + ] + }); - rst.stopSet(); + // Unfreeze the original primary so that it can stand for election again. + assert.commandWorked(primaryDB.adminCommand({replSetFreeze: 0})); +} + +rst.stopSet(); }()); |