summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorIan Boros <ian.boros@10gen.com>2018-04-26 16:59:28 -0400
committerIan Boros <ian.boros@10gen.com>2018-06-06 12:17:01 -0400
commit71fa3b81f9a3dbc682ccf087faf95e17780b21ff (patch)
tree9282230599be0b5c526e113dfc72fa7c3d7cf3a0
parenta0e09af21283097b897cce0261dba3c06e46f5d1 (diff)
downloadmongo-71fa3b81f9a3dbc682ccf087faf95e17780b21ff.tar.gz
SERVER-31396 add tests for change streams being resilient to node failures in sharded cluster
(cherry picked from commit ee5a75a518806dbdd20460ef81db2b6a9717ece4)
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml2
-rw-r--r--jstests/sharding/change_stream_resume_from_different_mongos.js98
-rw-r--r--jstests/sharding/change_stream_shard_failover.js115
-rw-r--r--src/mongo/shell/assert.js14
5 files changed, 231 insertions, 0 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index c2099123fec..3fbcb34f1f3 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -69,6 +69,8 @@ selector:
- jstests/sharding/change_streams_whole_db.js
- jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
- jstests/sharding/resume_change_stream.js
+ - jstests/sharding/change_stream_resume_from_different_mongos.js
+ - jstests/sharding/change_stream_shard_failover.js
- jstests/sharding/transactions_prohibited_in_sharded_cluster.js
# Requires count command to be accurate on sharded clusters, introduced in v4.0.
- jstests/sharding/accurate_count_with_predicate.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index d9925607801..719836db40d 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -77,6 +77,8 @@ selector:
- jstests/sharding/change_streams_whole_db.js
- jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js
- jstests/sharding/resume_change_stream.js
+ - jstests/sharding/change_stream_resume_from_different_mongos.js
+ - jstests/sharding/change_stream_shard_failover.js
- jstests/sharding/transactions_prohibited_in_sharded_cluster.js
# Requires count command to be accurate on sharded clusters, introduced in v4.0.
- jstests/sharding/accurate_count_with_predicate.js
diff --git a/jstests/sharding/change_stream_resume_from_different_mongos.js b/jstests/sharding/change_stream_resume_from_different_mongos.js
new file mode 100644
index 00000000000..893bb47ab78
--- /dev/null
+++ b/jstests/sharding/change_stream_resume_from_different_mongos.js
@@ -0,0 +1,98 @@
+// Test resuming a change stream on a mongos other than the one the change stream was started on.
+(function() {
+ "use strict";
+ // For supportsMajorityReadConcern().
+ load("jstests/multiVersion/libs/causal_consistency_helpers.js");
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ shards: 2,
+ mongos: 2,
+ rs: {nodes: 3, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
+ });
+
+ for (let key of Object.keys(ChangeStreamTest.WatchMode)) {
+ const watchMode = ChangeStreamTest.WatchMode[key];
+ jsTestLog("Running test for mode " + watchMode);
+
+ const s0DB = st.s0.getDB("test");
+ const s1DB = st.s1.getDB("test");
+ const coll = assertDropAndRecreateCollection(s0DB, "change_stream_failover");
+
+ const nDocs = 100;
+
+ // Split so ids < nDocs / 2 are for one shard, ids >= nDocs / 2 + 1 for another.
+ st.shardColl(coll,
+ {_id: 1}, // key
+ {_id: nDocs / 2}, // split
+ {_id: nDocs / 2 + 1}, // move
+ "test", // dbName
+ false // waitForDelete
+ );
+
+ // Open a change stream.
+ const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, s0DB));
+ let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll});
+
+ // Be sure we can read from the change stream. Write some documents that will end up on
+ // each shard. Use a bulk write to increase the chance that two of the writes get the same
+ // cluster time on each shard.
+ const kIds = [];
+ const bulk = coll.initializeUnorderedBulkOp();
+ for (let i = 0; i < nDocs / 2; i++) {
+ // Interleave elements which will end up on shard 0 with elements that will end up on
+ // shard 1.
+ kIds.push(i);
+ bulk.insert({_id: i});
+ kIds.push(i + nDocs / 2);
+ bulk.insert({_id: i + nDocs / 2});
+ }
+ assert.commandWorked(bulk.execute());
+
+ // Read from the change stream. The order of the documents isn't guaranteed because we
+ // performed a bulk write.
+ const firstChange = cst.getOneChange(changeStream);
+ const docsFoundInOrder = [firstChange];
+ for (let i = 0; i < nDocs - 1; i++) {
+ const change = cst.getOneChange(changeStream);
+ assert.docEq(change.ns, {db: s0DB.getName(), coll: coll.getName()});
+ assert.eq(change.operationType, "insert");
+
+ docsFoundInOrder.push(change);
+ }
+
+ // Assert that we found the documents we inserted (in any order).
+ assert.setEq(new Set(kIds), new Set(docsFoundInOrder.map(doc => doc.fullDocument._id)));
+ cst.cleanUp();
+
+ // Now resume using the resume token from the first change on a different mongos.
+ const otherCst =
+ new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, s1DB));
+
+ const resumeCursor = otherCst.getChangeStream(
+ {watchMode: watchMode, coll: coll, resumeAfter: firstChange._id});
+
+ // Get the resume tokens for each change that occurred.
+ const resumeTokens = [firstChange._id];
+ for (let i = 0; i < kIds.length - 1; i++) {
+ resumeTokens.push(otherCst.getOneChange(resumeCursor)._id);
+ }
+
+ // Check that resuming from each possible resume token works.
+ for (let i = 0; i < resumeTokens.length; i++) {
+ const cursor = otherCst.getChangeStream(
+ {watchMode: watchMode, coll: coll, resumeAfter: resumeTokens[i]});
+ otherCst.assertNextChangesEqual(
+ {cursor: cursor, expectedChanges: docsFoundInOrder.splice(i + 1)});
+ }
+ otherCst.cleanUp();
+ }
+
+ st.stop();
+}());
diff --git a/jstests/sharding/change_stream_shard_failover.js b/jstests/sharding/change_stream_shard_failover.js
new file mode 100644
index 00000000000..beb218f3584
--- /dev/null
+++ b/jstests/sharding/change_stream_shard_failover.js
@@ -0,0 +1,115 @@
+/**
+ * Test resuming a change stream on a node other than the one it was started on. Accomplishes this
+ * by triggering a stepdown.
+ */
+
+// Checking UUID consistency uses cached connections, which are not valid across restarts or
+// stepdowns.
+TestData.skipCheckingUUIDsConsistentAcrossCluster = true;
+
+(function() {
+ "use strict";
+ // For supportsMajorityReadConcern().
+ load("jstests/multiVersion/libs/causal_consistency_helpers.js");
+ load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest.
+ load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection.
+
+ if (!supportsMajorityReadConcern()) {
+ jsTestLog("Skipping test since storage engine doesn't support majority read concern.");
+ return;
+ }
+
+ const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 2, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
+ });
+
+ const sDB = st.s.getDB("test");
+ const kCollName = "change_stream_failover";
+
+ for (let key of Object.keys(ChangeStreamTest.WatchMode)) {
+ const watchMode = ChangeStreamTest.WatchMode[key];
+ jsTestLog("Running test for mode " + watchMode);
+
+ const coll = assertDropAndRecreateCollection(sDB, kCollName);
+
+ const nDocs = 100;
+
+ // Split so ids < nDocs / 2 are for one shard, ids >= nDocs / 2 + 1 for another.
+ st.shardColl(coll,
+ {_id: 1}, // key
+ {_id: nDocs / 2}, // split
+ {_id: nDocs / 2 + 1}, // move
+ "test", // dbName
+ false // waitForDelete
+ );
+
+ // Be sure we'll only read from the primaries.
+ st.s.setReadPref("primary");
+
+ // Open a changeStream.
+ const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, sDB));
+ let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll});
+
+ // Be sure we can read from the change stream. Write some documents that will end up on
+ // each shard. Use a bulk write to increase the chance that two of the writes get the same
+ // cluster time on each shard.
+ const bulk = coll.initializeUnorderedBulkOp();
+ const kIds = [];
+ for (let i = 0; i < nDocs / 2; i++) {
+ // Interleave elements which will end up on shard 0 with elements that will end up on
+ // shard 1.
+ kIds.push(i);
+ bulk.insert({_id: i});
+ kIds.push(i + nDocs / 2);
+ bulk.insert({_id: i + nDocs / 2});
+ }
+ // Use {w: "majority"} so that we're still guaranteed to be able to read after the
+ // failover.
+ assert.commandWorked(bulk.execute({w: "majority"}));
+
+ const firstChange = cst.getOneChange(changeStream);
+
+ // Make one of the primaries step down.
+ const oldPrimary = st.rs0.getPrimary();
+
+ const stepDownError = assert.throws(function() {
+ oldPrimary.adminCommand({replSetStepDown: 300, force: true});
+ });
+ assert(isNetworkError(stepDownError),
+ "replSetStepDown did not disconnect client; failed with " + tojson(stepDownError));
+
+ st.rs0.awaitNodesAgreeOnPrimary();
+ const newPrimary = st.rs0.getPrimary();
+ // Be sure the new primary is not the previous primary.
+ assert.neq(newPrimary.port, oldPrimary.port);
+
+ // Read the remaining documents from the original stream.
+ const docsFoundInOrder = [firstChange];
+ for (let i = 0; i < nDocs - 1; i++) {
+ const change = cst.getOneChange(changeStream);
+ assert.docEq(change.ns, {db: sDB.getName(), coll: coll.getName()});
+ assert.eq(change.operationType, "insert");
+
+ docsFoundInOrder.push(change);
+ }
+
+ // Assert that we found the documents we inserted (in any order).
+ assert.setEq(new Set(kIds), new Set(docsFoundInOrder.map(doc => doc.fullDocument._id)));
+
+ // Now resume using the resume token from the first change (which was read before the
+ // failover). The mongos should talk to the new primary.
+ const resumeCursor =
+ cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id});
+
+ // Be sure we can read the remaining changes in the same order as we read them initially.
+ cst.assertNextChangesEqual(
+ {cursor: resumeCursor, expectedChanges: docsFoundInOrder.splice(1)});
+ cst.cleanUp();
+
+ // Reset the original primary's election timeout.
+ assert.commandWorked(oldPrimary.adminCommand({replSetFreeze: 0}));
+ }
+
+ st.stop();
+}());
diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js
index d27d072fb8d..dbd1b01c2cf 100644
--- a/src/mongo/shell/assert.js
+++ b/src/mongo/shell/assert.js
@@ -197,6 +197,20 @@ assert = (function() {
msg, "[" + tojson(aSorted) + "] != [" + tojson(bSorted) + "] are not equal"));
};
+ assert.setEq = function(aSet, bSet, msg) {
+ const failAssertion = function() {
+ doassert(_buildAssertionMessage(msg, tojson(aSet) + " != " + tojson(bSet)));
+ };
+ if (aSet.size !== bSet.size) {
+ failAssertion();
+ }
+ for (let a of aSet) {
+ if (!bSet.has(a)) {
+ failAssertion();
+ }
+ }
+ };
+
assert.eq.automsg = function(a, b) {
assert.eq(eval(a), eval(b), "[" + a + "] != [" + b + "]");
};