diff options
author | clang-format-7.0.1 <adam.martin@10gen.com> | 2019-07-26 18:20:35 -0400 |
---|---|---|
committer | ADAM David Alan Martin <adam.martin@10gen.com> | 2019-07-27 11:02:23 -0400 |
commit | 134a4083953270e8a11430395357fb70a29047ad (patch) | |
tree | dd428e1230e31d92b20b393dfdc17ffe7fa79cb6 /jstests/sharding/change_stream_shard_failover.js | |
parent | 1e46b5049003f427047e723ea5fab15b5a9253ca (diff) | |
download | mongo-134a4083953270e8a11430395357fb70a29047ad.tar.gz |
SERVER-41772 Apply clang-format 7.0.1 to the codebase
Diffstat (limited to 'jstests/sharding/change_stream_shard_failover.js')
-rw-r--r-- | jstests/sharding/change_stream_shard_failover.js | 192 |
1 files changed, 96 insertions, 96 deletions
diff --git a/jstests/sharding/change_stream_shard_failover.js b/jstests/sharding/change_stream_shard_failover.js index f4b3007bd30..f5675aedd04 100644 --- a/jstests/sharding/change_stream_shard_failover.js +++ b/jstests/sharding/change_stream_shard_failover.js @@ -9,104 +9,104 @@ TestData.skipCheckingUUIDsConsistentAcrossCluster = true; (function() { - "use strict"; - // For supportsMajorityReadConcern(). - load("jstests/multiVersion/libs/causal_consistency_helpers.js"); - load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - - if (!supportsMajorityReadConcern()) { - jsTestLog("Skipping test since storage engine doesn't support majority read concern."); - return; +"use strict"; +// For supportsMajorityReadConcern(). +load("jstests/multiVersion/libs/causal_consistency_helpers.js"); +load("jstests/libs/change_stream_util.js"); // For ChangeStreamTest. +load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. + +if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; +} + +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 2, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}} +}); + +const sDB = st.s.getDB("test"); +const kCollName = "change_stream_failover"; + +for (let key of Object.keys(ChangeStreamWatchMode)) { + const watchMode = ChangeStreamWatchMode[key]; + jsTestLog("Running test for mode " + watchMode); + + const coll = assertDropAndRecreateCollection(sDB, kCollName); + + const nDocs = 100; + + // Split so ids < nDocs / 2 are for one shard, ids >= nDocs / 2 + 1 for another. + st.shardColl( + coll, + {_id: 1}, // key + {_id: nDocs / 2}, // split + {_id: nDocs / 2 + 1}, // move + "test", // dbName + false // waitForDelete + ); + + // Be sure we'll only read from the primaries. + st.s.setReadPref("primary"); + + // Open a changeStream. + const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, sDB)); + let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll}); + + // Be sure we can read from the change stream. Write some documents that will end up on + // each shard. Use a bulk write to increase the chance that two of the writes get the same + // cluster time on each shard. + const bulk = coll.initializeUnorderedBulkOp(); + const kIds = []; + for (let i = 0; i < nDocs / 2; i++) { + // Interleave elements which will end up on shard 0 with elements that will end up on + // shard 1. + kIds.push(i); + bulk.insert({_id: i}); + kIds.push(i + nDocs / 2); + bulk.insert({_id: i + nDocs / 2}); } + // Use {w: "majority"} so that we're still guaranteed to be able to read after the + // failover. + assert.commandWorked(bulk.execute({w: "majority"})); - const st = new ShardingTest({ - shards: 2, - rs: {nodes: 2, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}} - }); - - const sDB = st.s.getDB("test"); - const kCollName = "change_stream_failover"; - - for (let key of Object.keys(ChangeStreamWatchMode)) { - const watchMode = ChangeStreamWatchMode[key]; - jsTestLog("Running test for mode " + watchMode); - - const coll = assertDropAndRecreateCollection(sDB, kCollName); - - const nDocs = 100; - - // Split so ids < nDocs / 2 are for one shard, ids >= nDocs / 2 + 1 for another. - st.shardColl(coll, - {_id: 1}, // key - {_id: nDocs / 2}, // split - {_id: nDocs / 2 + 1}, // move - "test", // dbName - false // waitForDelete - ); - - // Be sure we'll only read from the primaries. - st.s.setReadPref("primary"); - - // Open a changeStream. - const cst = new ChangeStreamTest(ChangeStreamTest.getDBForChangeStream(watchMode, sDB)); - let changeStream = cst.getChangeStream({watchMode: watchMode, coll: coll}); - - // Be sure we can read from the change stream. Write some documents that will end up on - // each shard. Use a bulk write to increase the chance that two of the writes get the same - // cluster time on each shard. - const bulk = coll.initializeUnorderedBulkOp(); - const kIds = []; - for (let i = 0; i < nDocs / 2; i++) { - // Interleave elements which will end up on shard 0 with elements that will end up on - // shard 1. - kIds.push(i); - bulk.insert({_id: i}); - kIds.push(i + nDocs / 2); - bulk.insert({_id: i + nDocs / 2}); - } - // Use {w: "majority"} so that we're still guaranteed to be able to read after the - // failover. - assert.commandWorked(bulk.execute({w: "majority"})); - - const firstChange = cst.getOneChange(changeStream); - - // Make one of the primaries step down. - const oldPrimary = st.rs0.getPrimary(); - - assert.commandWorked(oldPrimary.adminCommand({replSetStepDown: 300, force: true})); - - st.rs0.awaitNodesAgreeOnPrimary(); - const newPrimary = st.rs0.getPrimary(); - // Be sure the new primary is not the previous primary. - assert.neq(newPrimary.port, oldPrimary.port); - - // Read the remaining documents from the original stream. - const docsFoundInOrder = [firstChange]; - for (let i = 0; i < nDocs - 1; i++) { - const change = cst.getOneChange(changeStream); - assert.docEq(change.ns, {db: sDB.getName(), coll: coll.getName()}); - assert.eq(change.operationType, "insert"); - - docsFoundInOrder.push(change); - } - - // Assert that we found the documents we inserted (in any order). - assert.setEq(new Set(kIds), new Set(docsFoundInOrder.map(doc => doc.fullDocument._id))); - - // Now resume using the resume token from the first change (which was read before the - // failover). The mongos should talk to the new primary. - const resumeCursor = - cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id}); - - // Be sure we can read the remaining changes in the same order as we read them initially. - cst.assertNextChangesEqual( - {cursor: resumeCursor, expectedChanges: docsFoundInOrder.splice(1)}); - cst.cleanUp(); - - // Reset the original primary's election timeout. - assert.commandWorked(oldPrimary.adminCommand({replSetFreeze: 0})); + const firstChange = cst.getOneChange(changeStream); + + // Make one of the primaries step down. + const oldPrimary = st.rs0.getPrimary(); + + assert.commandWorked(oldPrimary.adminCommand({replSetStepDown: 300, force: true})); + + st.rs0.awaitNodesAgreeOnPrimary(); + const newPrimary = st.rs0.getPrimary(); + // Be sure the new primary is not the previous primary. + assert.neq(newPrimary.port, oldPrimary.port); + + // Read the remaining documents from the original stream. + const docsFoundInOrder = [firstChange]; + for (let i = 0; i < nDocs - 1; i++) { + const change = cst.getOneChange(changeStream); + assert.docEq(change.ns, {db: sDB.getName(), coll: coll.getName()}); + assert.eq(change.operationType, "insert"); + + docsFoundInOrder.push(change); } - st.stop(); + // Assert that we found the documents we inserted (in any order). + assert.setEq(new Set(kIds), new Set(docsFoundInOrder.map(doc => doc.fullDocument._id))); + + // Now resume using the resume token from the first change (which was read before the + // failover). The mongos should talk to the new primary. + const resumeCursor = + cst.getChangeStream({watchMode: watchMode, coll: coll, resumeAfter: firstChange._id}); + + // Be sure we can read the remaining changes in the same order as we read them initially. + cst.assertNextChangesEqual({cursor: resumeCursor, expectedChanges: docsFoundInOrder.splice(1)}); + cst.cleanUp(); + + // Reset the original primary's election timeout. + assert.commandWorked(oldPrimary.adminCommand({replSetFreeze: 0})); +} + +st.stop(); }()); |