diff options
Diffstat (limited to 'jstests/replsets/change_stream_stepdown.js')
-rw-r--r-- | jstests/replsets/change_stream_stepdown.js | 258 |
1 files changed, 129 insertions, 129 deletions
diff --git a/jstests/replsets/change_stream_stepdown.js b/jstests/replsets/change_stream_stepdown.js index 2df2a11c8c8..1a6f6fb28cb 100644 --- a/jstests/replsets/change_stream_stepdown.js +++ b/jstests/replsets/change_stream_stepdown.js @@ -5,134 +5,134 @@ * @tags: [requires_wiredtiger] */ (function() { - "use strict"; - - load("jstests/libs/write_concern_util.js"); // for [stop|restart]ServerReplication. - - const name = "change_stream_stepdown"; - const replTest = new ReplSetTest({name: name, nodes: [{}, {}]}); - replTest.startSet(); - replTest.initiate(); - - const dbName = name; - const collName = "change_stream_stepdown"; - const changeStreamComment = collName + "_comment"; - - const primary = replTest.getPrimary(); - const secondary = replTest.getSecondary(); - const primaryDb = primary.getDB(dbName); - const secondaryDb = secondary.getDB(dbName); - const primaryColl = primaryDb[collName]; - - // Tell the secondary to stay secondary until we say otherwise. - assert.commandWorked(secondaryDb.adminCommand({replSetFreeze: 999999})); - - // Open a change stream. - let res = primaryDb.runCommand({ - aggregate: collName, - pipeline: [{$changeStream: {}}], - cursor: {}, - comment: changeStreamComment, - maxTimeMS: 5000 - }); - assert.commandWorked(res); - let cursorId = res.cursor.id; - - // Insert several documents on primary and let them majority commit. - assert.commandWorked( - primaryColl.insert([{_id: 1}, {_id: 2}, {_id: 3}], {writeConcern: {w: "majority"}})); - replTest.awaitReplication(); - - jsTestLog("Testing that changestream survives stepdown between find and getmore"); - // Step down. - assert.commandWorked(primaryDb.adminCommand({replSetStepDown: 60, force: true})); - replTest.waitForState(primary, ReplSetTest.State.SECONDARY); - - // Receive the first change event. This tests stepdown between find and getmore. - res = assert.commandWorked( - primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1})); - let changes = res.cursor.nextBatch; - assert.eq(changes.length, 1); - assert.eq(changes[0]["fullDocument"], {_id: 1}); - assert.eq(changes[0]["operationType"], "insert"); - - jsTestLog("Testing that changestream survives step-up"); - // Step back up and wait for primary. - assert.commandWorked(primaryDb.adminCommand({replSetFreeze: 0})); - replTest.getPrimary(); - - // Get the next one. This tests that changestreams survives a step-up. - res = assert.commandWorked( - primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1})); - changes = res.cursor.nextBatch; - assert.eq(changes.length, 1); - assert.eq(changes[0]["fullDocument"], {_id: 2}); - assert.eq(changes[0]["operationType"], "insert"); - - jsTestLog("Testing that changestream survives stepdown between two getmores"); - // Step down again. - assert.commandWorked(primaryDb.adminCommand({replSetStepDown: 60, force: true})); - replTest.waitForState(primary, ReplSetTest.State.SECONDARY); - - // Get the next one. This tests that changestreams survives a step down between getmores. - res = assert.commandWorked( - primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1})); - changes = res.cursor.nextBatch; - assert.eq(changes.length, 1); - assert.eq(changes[0]["fullDocument"], {_id: 3}); - assert.eq(changes[0]["operationType"], "insert"); - - // Step back up and wait for primary. - assert.commandWorked(primaryDb.adminCommand({replSetFreeze: 0})); - replTest.getPrimary(); - - jsTestLog("Testing that changestream waiting on old primary sees docs inserted on new primary"); - - replTest.awaitReplication(); // Ensure secondary is up to date and can win an election. - TestData.changeStreamComment = changeStreamComment; - TestData.secondaryHost = secondary.host; - TestData.dbName = dbName; - TestData.collName = collName; - let waitForShell = startParallelShell(function() { - // Wait for the getMore to be in progress. - assert.soon( - () => db.getSiblingDB("admin") - .aggregate([ - {'$currentOp': {}}, - { - '$match': { - op: 'getmore', - 'cursor.originatingCommand.comment': TestData.changeStreamComment - } +"use strict"; + +load("jstests/libs/write_concern_util.js"); // for [stop|restart]ServerReplication. + +const name = "change_stream_stepdown"; +const replTest = new ReplSetTest({name: name, nodes: [{}, {}]}); +replTest.startSet(); +replTest.initiate(); + +const dbName = name; +const collName = "change_stream_stepdown"; +const changeStreamComment = collName + "_comment"; + +const primary = replTest.getPrimary(); +const secondary = replTest.getSecondary(); +const primaryDb = primary.getDB(dbName); +const secondaryDb = secondary.getDB(dbName); +const primaryColl = primaryDb[collName]; + +// Tell the secondary to stay secondary until we say otherwise. +assert.commandWorked(secondaryDb.adminCommand({replSetFreeze: 999999})); + +// Open a change stream. +let res = primaryDb.runCommand({ + aggregate: collName, + pipeline: [{$changeStream: {}}], + cursor: {}, + comment: changeStreamComment, + maxTimeMS: 5000 +}); +assert.commandWorked(res); +let cursorId = res.cursor.id; + +// Insert several documents on primary and let them majority commit. +assert.commandWorked( + primaryColl.insert([{_id: 1}, {_id: 2}, {_id: 3}], {writeConcern: {w: "majority"}})); +replTest.awaitReplication(); + +jsTestLog("Testing that changestream survives stepdown between find and getmore"); +// Step down. +assert.commandWorked(primaryDb.adminCommand({replSetStepDown: 60, force: true})); +replTest.waitForState(primary, ReplSetTest.State.SECONDARY); + +// Receive the first change event. This tests stepdown between find and getmore. +res = assert.commandWorked( + primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1})); +let changes = res.cursor.nextBatch; +assert.eq(changes.length, 1); +assert.eq(changes[0]["fullDocument"], {_id: 1}); +assert.eq(changes[0]["operationType"], "insert"); + +jsTestLog("Testing that changestream survives step-up"); +// Step back up and wait for primary. +assert.commandWorked(primaryDb.adminCommand({replSetFreeze: 0})); +replTest.getPrimary(); + +// Get the next one. This tests that changestreams survives a step-up. +res = assert.commandWorked( + primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1})); +changes = res.cursor.nextBatch; +assert.eq(changes.length, 1); +assert.eq(changes[0]["fullDocument"], {_id: 2}); +assert.eq(changes[0]["operationType"], "insert"); + +jsTestLog("Testing that changestream survives stepdown between two getmores"); +// Step down again. +assert.commandWorked(primaryDb.adminCommand({replSetStepDown: 60, force: true})); +replTest.waitForState(primary, ReplSetTest.State.SECONDARY); + +// Get the next one. This tests that changestreams survives a step down between getmores. +res = assert.commandWorked( + primaryDb.runCommand({getMore: cursorId, collection: collName, batchSize: 1})); +changes = res.cursor.nextBatch; +assert.eq(changes.length, 1); +assert.eq(changes[0]["fullDocument"], {_id: 3}); +assert.eq(changes[0]["operationType"], "insert"); + +// Step back up and wait for primary. +assert.commandWorked(primaryDb.adminCommand({replSetFreeze: 0})); +replTest.getPrimary(); + +jsTestLog("Testing that changestream waiting on old primary sees docs inserted on new primary"); + +replTest.awaitReplication(); // Ensure secondary is up to date and can win an election. +TestData.changeStreamComment = changeStreamComment; +TestData.secondaryHost = secondary.host; +TestData.dbName = dbName; +TestData.collName = collName; +let waitForShell = startParallelShell(function() { + // Wait for the getMore to be in progress. + assert.soon( + () => db.getSiblingDB("admin") + .aggregate([ + {'$currentOp': {}}, + { + '$match': { + op: 'getmore', + 'cursor.originatingCommand.comment': TestData.changeStreamComment } - ]) - .itcount() == 1); - - const secondary = new Mongo(TestData.secondaryHost); - const secondaryDb = secondary.getDB(TestData.dbName); - // Step down the old primary and wait for new primary. - assert.commandWorked(secondaryDb.adminCommand({replSetFreeze: 0})); - assert.commandWorked(secondaryDb.adminCommand({replSetStepUp: 1, skipDryRun: true})); - jsTestLog("Waiting for new primary"); - assert.soon(() => secondaryDb.adminCommand({isMaster: 1}).ismaster); - - jsTestLog("Inserting document on new primary"); - assert.commandWorked(secondaryDb[TestData.collName].insert({_id: 4}), - {writeConcern: {w: "majority"}}); - }, primary.port); - - res = assert.commandWorked(primaryDb.runCommand({ - getMore: cursorId, - collection: collName, - batchSize: 1, - maxTimeMS: ReplSetTest.kDefaultTimeoutMS - })); - changes = res.cursor.nextBatch; - assert.eq(changes.length, 1); - assert.eq(changes[0]["fullDocument"], {_id: 4}); - assert.eq(changes[0]["operationType"], "insert"); - - waitForShell(); - - replTest.stopSet(); + } + ]) + .itcount() == 1); + + const secondary = new Mongo(TestData.secondaryHost); + const secondaryDb = secondary.getDB(TestData.dbName); + // Step down the old primary and wait for new primary. + assert.commandWorked(secondaryDb.adminCommand({replSetFreeze: 0})); + assert.commandWorked(secondaryDb.adminCommand({replSetStepUp: 1, skipDryRun: true})); + jsTestLog("Waiting for new primary"); + assert.soon(() => secondaryDb.adminCommand({isMaster: 1}).ismaster); + + jsTestLog("Inserting document on new primary"); + assert.commandWorked(secondaryDb[TestData.collName].insert({_id: 4}), + {writeConcern: {w: "majority"}}); +}, primary.port); + +res = assert.commandWorked(primaryDb.runCommand({ + getMore: cursorId, + collection: collName, + batchSize: 1, + maxTimeMS: ReplSetTest.kDefaultTimeoutMS +})); +changes = res.cursor.nextBatch; +assert.eq(changes.length, 1); +assert.eq(changes[0]["fullDocument"], {_id: 4}); +assert.eq(changes[0]["operationType"], "insert"); + +waitForShell(); + +replTest.stopSet(); })(); |