diff options
author | Ian Boros <ian.boros@10gen.com> | 2018-04-24 14:55:10 -0400 |
---|---|---|
committer | Ian Boros <ian.boros@10gen.com> | 2018-04-25 11:11:40 -0400 |
commit | 48bd5b26de59873952ed110c1ce6e1eb4afbdfce (patch) | |
tree | 9170cc28cb1ef63be33b2e8862f532324e42f8d7 | |
parent | db80e9fbd16df2ef3b357bad4c4c7d9212e57dcf (diff) | |
download | mongo-48bd5b26de59873952ed110c1ce6e1eb4afbdfce.tar.gz |
SERVER-31395 Test resuming changeStream against a node other than the original
-rw-r--r-- | jstests/libs/change_stream_util.js | 51 | ||||
-rw-r--r-- | jstests/noPassthrough/change_stream_failover.js | 89 |
2 files changed, 140 insertions, 0 deletions
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index f81d0870946..4221adbaa30 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -227,6 +227,36 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { } }; + + /** + * Returns the document to be used for the value of a $changeStream stage, given a watchMode + * of type ChangeStreamTest.WatchMode and optional resumeAfter value. + */ + self.getChangeStreamStage = function(watchMode, resumeAfter) { + const changeStreamDoc = {}; + if (resumeAfter) { + changeStreamDoc.resumeAfter = resumeAfter; + } + + if (watchMode == ChangeStreamTest.WatchMode.kCluster) { + changeStreamDoc.allChangesForCluster = true; + } + return changeStreamDoc; + }; + + /** + * Create a change stream of the given watch mode (see ChangeStreamTest.WatchMode) on the given + * collection. Will resume from a given point if resumeAfter is specified. + */ + self.getChangeStream = function({watchMode, coll, resumeAfter}) { + return self.startWatchingChanges({ + pipeline: [{$changeStream: self.getChangeStreamStage(watchMode, resumeAfter)}], + collection: (watchMode == ChangeStreamTest.WatchMode.kCollection ? coll : 1), + // Use a batch size of 0 to prevent any notifications from being returned in the first + // batch. These would be ignored by ChangeStreamTest.getOneChange(). + aggregateOptions: {cursor: {batchSize: 0}}, + }); + }; } /** @@ -259,6 +289,27 @@ ChangeStreamTest.assertChangeStreamThrowsCode = function assertChangeStreamThrow }; /** + * Static method for determining which database to run the change stream aggregation on based on + * the watchMode. + */ +ChangeStreamTest.getDBForChangeStream = function(watchMode, dbObj) { + if (watchMode == ChangeStreamTest.WatchMode.kCluster) { + return dbObj.getSiblingDB("admin"); + } + return dbObj; +}; + +/** + * Used in getChangeStream() and getChangeStreamStage() helpers, for specifying which type of + * changeStream to open. + */ +ChangeStreamTest.WatchMode = Object.freeze({ + kCollection: 1, + kDb: 2, + kCluster: 3, +}); + +/** * A set of functions to help validate the behaviour of $changeStreams for a given namespace. */ function assertChangeStreamNssBehaviour(dbName, collName = "test", options, assertFunc) { diff --git a/jstests/noPassthrough/change_stream_failover.js b/jstests/noPassthrough/change_stream_failover.js new file mode 100644 index 00000000000..120fe8819fa --- /dev/null +++ b/jstests/noPassthrough/change_stream_failover.js @@ -0,0 +1,89 @@ +// Test resuming a change stream on a node other than the one it was started on. Accomplishes this +// by triggering a stepdown. +(function() { + "use strict"; + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority. + + const rst = new ReplSetTest({nodes: 3}); + if (!startSetIfSupportsReadMajority(rst)) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + rst.stopSet(); + return; + } + + rst.initiate(); + + for (let key of Object.keys(ChangeStreamTest.WatchMode)) { + const watchMode = ChangeStreamTest.WatchMode[key]; + jsTestLog("Running test for mode " + watchMode); + + const primary = rst.getPrimary(); + const primaryDB = primary.getDB("test"); + const coll = assertDropAndRecreateCollection(primaryDB, "change_stream_failover"); + + // Be sure we'll only read from the primary. + primary.setReadPref("primary"); + + // Open a changeStream on the primary. + const cst = + new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, primaryDB)); + + let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll}); + + // Be sure we can read from the change stream. Use {w: "majority"} so that we're still + // guaranteed to be able to read after the failover. + assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}})); + assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}})); + assert.writeOK(coll.insert({_id: 2}, {writeConcern: {w: "majority"}})); + + const firstChange = cst.getOneChange(changeStream); + assert.docEq(firstChange.fullDocument, {_id: 0}); + + // Make the primary step down + assert.throws(function() { + primaryDB.adminCommand({replSetStepDown: 30}); + }); + + // Now wait for another primary to be elected. + const newPrimary = rst.getPrimary(); + // Be sure we got a different node that the previous primary. + assert.neq(newPrimary.port, primary.port); + + cst.assertNextChangesEqual({ + cursor: changeStream, + expectedChanges: [{ + documentKey: {_id: 1}, + fullDocument: {_id: 1}, + ns: {db: primaryDB.getName(), coll: coll.getName()}, + operationType: "insert", + }] + }); + + // Now resume using the resume token from the first change (before the failover). + const resumeCursor = + cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id}); + + // Be sure we can read the 2nd and 3rd changes. + cst.assertNextChangesEqual({ + cursor: resumeCursor, + expectedChanges: [ + { + documentKey: {_id: 1}, + fullDocument: {_id: 1}, + ns: {db: primaryDB.getName(), coll: coll.getName()}, + operationType: "insert", + }, + { + documentKey: {_id: 2}, + fullDocument: {_id: 2}, + ns: {db: primaryDB.getName(), coll: coll.getName()}, + operationType: "insert", + } + ] + }); + } + + rst.stopSet(); +}()); |