summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2018-04-24 14:55:10 -0400
committerIan Boros <ian.boros@10gen.com>2018-04-25 11:11:40 -0400
commit48bd5b26de59873952ed110c1ce6e1eb4afbdfce (patch)
tree9170cc28cb1ef63be33b2e8862f532324e42f8d7
parentdb80e9fbd16df2ef3b357bad4c4c7d9212e57dcf (diff)
downloadmongo-48bd5b26de59873952ed110c1ce6e1eb4afbdfce.tar.gz
SERVER-31395 Test resuming changeStream against a node other than the original
-rw-r--r--jstests/libs/change_stream_util.js51
-rw-r--r--jstests/noPassthrough/change_stream_failover.js89
2 files changed, 140 insertions, 0 deletions
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index f81d0870946..4221adbaa30 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -227,6 +227,36 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
}
};
+
+ /**
+ * Returns the document to be used for the value of a $changeStream stage, given a watchMode
+ * of type ChangeStreamTest.WatchMode and optional resumeAfter value.
+ */
+ self.getChangeStreamStage = function(watchMode, resumeAfter) {
+ const changeStreamDoc = {};
+ if (resumeAfter) {
+ changeStreamDoc.resumeAfter = resumeAfter;
+ }
+
+ if (watchMode == ChangeStreamTest.WatchMode.kCluster) {
+ changeStreamDoc.allChangesForCluster = true;
+ }
+ return changeStreamDoc;
+ };
+
+ /**
+ * Create a change stream of the given watch mode (see ChangeStreamTest.WatchMode) on the given
+ * collection. Will resume from a given point if resumeAfter is specified.
+ */
+ self.getChangeStream = function({watchMode, coll, resumeAfter}) {
+ return self.startWatchingChanges({
+ pipeline: [{$changeStream: self.getChangeStreamStage(watchMode, resumeAfter)}],
+ collection: (watchMode == ChangeStreamTest.WatchMode.kCollection ? coll : 1),
+ // Use a batch size of 0 to prevent any notifications from being returned in the first
+ // batch. These would be ignored by ChangeStreamTest.getOneChange().
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
+ };
}
/**
@@ -259,6 +289,27 @@ ChangeStreamTest.assertChangeStreamThrowsCode = function assertChangeStreamThrow
};
/**
+ * Static method for determining which database to run the change stream aggregation on based on
+ * the watchMode.
+ */
+ChangeStreamTest.getDBForChangeStream = function(watchMode, dbObj) {
+ if (watchMode == ChangeStreamTest.WatchMode.kCluster) {
+ return dbObj.getSiblingDB("admin");
+ }
+ return dbObj;
+};
+
+/**
+ * Used in getChangeStream() and getChangeStreamStage() helpers, for specifying which type of
+ * changeStream to open.
+ */
+ChangeStreamTest.WatchMode = Object.freeze({
+ kCollection: 1,
+ kDb: 2,
+ kCluster: 3,
+});
+
+/**
* A set of functions to help validate the behaviour of $changeStreams for a given namespace.
*/
function assertChangeStreamNssBehaviour(dbName, collName = "test", options, assertFunc) {
diff --git a/jstests/noPassthrough/change_stream_failover.js b/jstests/noPassthrough/change_stream_failover.js
new file mode 100644
index 00000000000..120fe8819fa
--- /dev/null
+++ b/jstests/noPassthrough/change_stream_failover.js
@@ -0,0 +1,89 @@
+// Test resuming a change stream on a node other than the one it was started on. Accomplishes this
+// by triggering a stepdown.
+(function() {
+ "use strict";
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+ load("jstests/replsets/rslib.js"); // For startSetIfSupportsReadMajority.
+
+ const rst = new ReplSetTest({nodes: 3});
+ if (!startSetIfSupportsReadMajority(rst)) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ rst.stopSet();
+ return;
+ }
+
+ rst.initiate();
+
+ for (let key of Object.keys(ChangeStreamTest.WatchMode)) {
+ const watchMode = ChangeStreamTest.WatchMode[key];
+ jsTestLog("Running test for mode " + watchMode);
+
+ const primary = rst.getPrimary();
+ const primaryDB = primary.getDB("test");
+ const coll = assertDropAndRecreateCollection(primaryDB, "change_stream_failover");
+
+ // Be sure we'll only read from the primary.
+ primary.setReadPref("primary");
+
+ // Open a changeStream on the primary.
+ const cst =
+ new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, primaryDB));
+
+ let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll});
+
+ // Be sure we can read from the change stream. Use {w: "majority"} so that we're still
+ // guaranteed to be able to read after the failover.
+ assert.writeOK(coll.insert({_id: 0}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(coll.insert({_id: 1}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(coll.insert({_id: 2}, {writeConcern: {w: "majority"}}));
+
+ const firstChange = cst.getOneChange(changeStream);
+ assert.docEq(firstChange.fullDocument, {_id: 0});
+
+ // Make the primary step down
+ assert.throws(function() {
+ primaryDB.adminCommand({replSetStepDown: 30});
+ });
+
+ // Now wait for another primary to be elected.
+ const newPrimary = rst.getPrimary();
+ // Be sure we got a different node that the previous primary.
+ assert.neq(newPrimary.port, primary.port);
+
+ cst.assertNextChangesEqual({
+ cursor: changeStream,
+ expectedChanges: [{
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1},
+ ns: {db: primaryDB.getName(), coll: coll.getName()},
+ operationType: "insert",
+ }]
+ });
+
+ // Now resume using the resume token from the first change (before the failover).
+ const resumeCursor =
+ cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id});
+
+ // Be sure we can read the 2nd and 3rd changes.
+ cst.assertNextChangesEqual({
+ cursor: resumeCursor,
+ expectedChanges: [
+ {
+ documentKey: {_id: 1},
+ fullDocument: {_id: 1},
+ ns: {db: primaryDB.getName(), coll: coll.getName()},
+ operationType: "insert",
+ },
+ {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2},
+ ns: {db: primaryDB.getName(), coll: coll.getName()},
+ operationType: "insert",
+ }
+ ]
+ });
+ }
+
+ rst.stopSet();
+}());