diff options
author | Ian Boros <ian.boros@10gen.com> | 2018-04-26 16:59:28 -0400 |
---|---|---|
committer | Ian Boros <ian.boros@10gen.com> | 2018-06-06 12:17:01 -0400 |
commit | 71fa3b81f9a3dbc682ccf087faf95e17780b21ff (patch) | |
tree | 9282230599be0b5c526e113dfc72fa7c3d7cf3a0 | |
parent | a0e09af21283097b897cce0261dba3c06e46f5d1 (diff) | |
download | mongo-71fa3b81f9a3dbc682ccf087faf95e17780b21ff.tar.gz |
SERVER-31396 add tests for change streams being resilient to node failures in sharded cluster
(cherry picked from commit ee5a75a518806dbdd20460ef81db2b6a9717ece4)
5 files changed, 231 insertions, 0 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index c2099123fec..3fbcb34f1f3 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -69,6 +69,8 @@ selector: - jstests/sharding/change_streams_whole_db.js - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js - jstests/sharding/resume_change_stream.js + - jstests/sharding/change_stream_resume_from_different_mongos.js + - jstests/sharding/change_stream_shard_failover.js - jstests/sharding/transactions_prohibited_in_sharded_cluster.js # Requires count command to be accurate on sharded clusters, introduced in v4.0. - jstests/sharding/accurate_count_with_predicate.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index d9925607801..719836db40d 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -77,6 +77,8 @@ selector: - jstests/sharding/change_streams_whole_db.js - jstests/sharding/lookup_change_stream_post_image_compound_shard_key.js - jstests/sharding/resume_change_stream.js + - jstests/sharding/change_stream_resume_from_different_mongos.js + - jstests/sharding/change_stream_shard_failover.js - jstests/sharding/transactions_prohibited_in_sharded_cluster.js # Requires count command to be accurate on sharded clusters, introduced in v4.0. - jstests/sharding/accurate_count_with_predicate.js diff --git a/jstests/sharding/change_stream_resume_from_different_mongos.js b/jstests/sharding/change_stream_resume_from_different_mongos.js new file mode 100644 index 00000000000..893bb47ab78 --- /dev/null +++ b/jstests/sharding/change_stream_resume_from_different_mongos.js @@ -0,0 +1,98 @@ +// Test resuming a change stream on a mongos other than the one the change stream was started on. +(function() { + "use strict"; + // For supportsMajorityReadConcern(). + load("jstests/multiVersion/libs/causal_consistency_helpers.js"); + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const st = new ShardingTest({ + shards: 2, + mongos: 2, + rs: {nodes: 3, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}} + }); + + for (let key of Object.keys(ChangeStreamTest.WatchMode)) { + const watchMode = ChangeStreamTest.WatchMode[key]; + jsTestLog("Running test for mode " + watchMode); + + const s0DB = st.s0.getDB("test"); + const s1DB = st.s1.getDB("test"); + const coll = assertDropAndRecreateCollection(s0DB, "change_stream_failover"); + + const nDocs = 100; + + // Split so ids < nDocs / 2 are for one shard, ids >= nDocs / 2 + 1 for another. + st.shardColl(coll, + {_id: 1}, // key + {_id: nDocs / 2}, // split + {_id: nDocs / 2 + 1}, // move + "test", // dbName + false // waitForDelete + ); + + // Open a change stream. + const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, s0DB)); + let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll}); + + // Be sure we can read from the change stream. Write some documents that will end up on + // each shard. Use a bulk write to increase the chance that two of the writes get the same + // cluster time on each shard. + const kIds = []; + const bulk = coll.initializeUnorderedBulkOp(); + for (let i = 0; i < nDocs / 2; i++) { + // Interleave elements which will end up on shard 0 with elements that will end up on + // shard 1. + kIds.push(i); + bulk.insert({_id: i}); + kIds.push(i + nDocs / 2); + bulk.insert({_id: i + nDocs / 2}); + } + assert.commandWorked(bulk.execute()); + + // Read from the change stream. The order of the documents isn't guaranteed because we + // performed a bulk write. + const firstChange = cst.getOneChange(changeStream); + const docsFoundInOrder = [firstChange]; + for (let i = 0; i < nDocs - 1; i++) { + const change = cst.getOneChange(changeStream); + assert.docEq(change.ns, {db: s0DB.getName(), coll: coll.getName()}); + assert.eq(change.operationType, "insert"); + + docsFoundInOrder.push(change); + } + + // Assert that we found the documents we inserted (in any order). + assert.setEq(new Set(kIds), new Set(docsFoundInOrder.map(doc => doc.fullDocument._id))); + cst.cleanUp(); + + // Now resume using the resume token from the first change on a different mongos. + const otherCst = + new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, s1DB)); + + const resumeCursor = otherCst.getChangeStream( + {watchMode: watchMode, coll: coll, resumeAfter: firstChange._id}); + + // Get the resume tokens for each change that occurred. + const resumeTokens = [firstChange._id]; + for (let i = 0; i < kIds.length - 1; i++) { + resumeTokens.push(otherCst.getOneChange(resumeCursor)._id); + } + + // Check that resuming from each possible resume token works. + for (let i = 0; i < resumeTokens.length; i++) { + const cursor = otherCst.getChangeStream( + {watchMode: watchMode, coll: coll, resumeAfter: resumeTokens[i]}); + otherCst.assertNextChangesEqual( + {cursor: cursor, expectedChanges: docsFoundInOrder.splice(i + 1)}); + } + otherCst.cleanUp(); + } + + st.stop(); +}()); diff --git a/jstests/sharding/change_stream_shard_failover.js b/jstests/sharding/change_stream_shard_failover.js new file mode 100644 index 00000000000..beb218f3584 --- /dev/null +++ b/jstests/sharding/change_stream_shard_failover.js @@ -0,0 +1,115 @@ +/** + * Test resuming a change stream on a node other than the one it was started on. Accomplishes this + * by triggering a stepdown. + */ + +// Checking UUID consistency uses cached connections, which are not valid across restarts or +// stepdowns. +TestData.skipCheckingUUIDsConsistentAcrossCluster = true; + +(function() { + "use strict"; + // For supportsMajorityReadConcern(). + load("jstests/multiVersion/libs/causal_consistency_helpers.js"); + load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. + load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + + if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; + } + + const st = new ShardingTest({ + shards: 2, + rs: {nodes: 2, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}} + }); + + const sDB = st.s.getDB("test"); + const kCollName = "change_stream_failover"; + + for (let key of Object.keys(ChangeStreamTest.WatchMode)) { + const watchMode = ChangeStreamTest.WatchMode[key]; + jsTestLog("Running test for mode " + watchMode); + + const coll = assertDropAndRecreateCollection(sDB, kCollName); + + const nDocs = 100; + + // Split so ids < nDocs / 2 are for one shard, ids >= nDocs / 2 + 1 for another. + st.shardColl(coll, + {_id: 1}, // key + {_id: nDocs / 2}, // split + {_id: nDocs / 2 + 1}, // move + "test", // dbName + false // waitForDelete + ); + + // Be sure we'll only read from the primaries. + st.s.setReadPref("primary"); + + // Open a changeStream. + const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, sDB)); + let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll}); + + // Be sure we can read from the change stream. Write some documents that will end up on + // each shard. Use a bulk write to increase the chance that two of the writes get the same + // cluster time on each shard. + const bulk = coll.initializeUnorderedBulkOp(); + const kIds = []; + for (let i = 0; i < nDocs / 2; i++) { + // Interleave elements which will end up on shard 0 with elements that will end up on + // shard 1. + kIds.push(i); + bulk.insert({_id: i}); + kIds.push(i + nDocs / 2); + bulk.insert({_id: i + nDocs / 2}); + } + // Use {w: "majority"} so that we're still guaranteed to be able to read after the + // failover. + assert.commandWorked(bulk.execute({w: "majority"})); + + const firstChange = cst.getOneChange(changeStream); + + // Make one of the primaries step down. + const oldPrimary = st.rs0.getPrimary(); + + const stepDownError = assert.throws(function() { + oldPrimary.adminCommand({replSetStepDown: 300, force: true}); + }); + assert(isNetworkError(stepDownError), + "replSetStepDown did not disconnect client; failed with " + tojson(stepDownError)); + + st.rs0.awaitNodesAgreeOnPrimary(); + const newPrimary = st.rs0.getPrimary(); + // Be sure the new primary is not the previous primary. + assert.neq(newPrimary.port, oldPrimary.port); + + // Read the remaining documents from the original stream. + const docsFoundInOrder = [firstChange]; + for (let i = 0; i < nDocs - 1; i++) { + const change = cst.getOneChange(changeStream); + assert.docEq(change.ns, {db: sDB.getName(), coll: coll.getName()}); + assert.eq(change.operationType, "insert"); + + docsFoundInOrder.push(change); + } + + // Assert that we found the documents we inserted (in any order). + assert.setEq(new Set(kIds), new Set(docsFoundInOrder.map(doc => doc.fullDocument._id))); + + // Now resume using the resume token from the first change (which was read before the + // failover). The mongos should talk to the new primary. + const resumeCursor = + cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id}); + + // Be sure we can read the remaining changes in the same order as we read them initially. + cst.assertNextChangesEqual( + {cursor: resumeCursor, expectedChanges: docsFoundInOrder.splice(1)}); + cst.cleanUp(); + + // Reset the original primary's election timeout. + assert.commandWorked(oldPrimary.adminCommand({replSetFreeze: 0})); + } + + st.stop(); +}()); diff --git a/src/mongo/shell/assert.js b/src/mongo/shell/assert.js index d27d072fb8d..dbd1b01c2cf 100644 --- a/src/mongo/shell/assert.js +++ b/src/mongo/shell/assert.js @@ -197,6 +197,20 @@ assert = (function() { msg, "[" + tojson(aSorted) + "] != [" + tojson(bSorted) + "] are not equal")); }; + assert.setEq = function(aSet, bSet, msg) { + const failAssertion = function() { + doassert(_buildAssertionMessage(msg, tojson(aSet) + " != " + tojson(bSet))); + }; + if (aSet.size !== bSet.size) { + failAssertion(); + } + for (let a of aSet) { + if (!bSet.has(a)) { + failAssertion(); + } + } + }; + assert.eq.automsg = function(a, b) { assert.eq(eval(a), eval(b), "[" + a + "] != [" + b + "]"); }; |