diff options
Diffstat (limited to 'jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js')
-rw-r--r-- | jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js | 148 |
1 files changed, 74 insertions, 74 deletions
diff --git a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js index 6ac410870a9..e8cdf1dc722 100644 --- a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js +++ b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js @@ -5,94 +5,94 @@ * @tags: [requires_sharding, uses_change_streams] */ (function() { - "use strict"; +"use strict"; - load("jstests/libs/fixture_helpers.js"); // For runCommandOnEachPrimary. +load("jstests/libs/fixture_helpers.js"); // For runCommandOnEachPrimary. - // Asserts that the expected operation type and documentKey are found on the change stream - // cursor. Returns the change stream document. - function assertWriteVisible({cursor, opType, docKey}) { - assert.soon(() => cursor.hasNext()); - const changeDoc = cursor.next(); - assert.eq(opType, changeDoc.operationType, changeDoc); - assert.eq(docKey, changeDoc.documentKey, changeDoc); - return changeDoc; - } +// Asserts that the expected operation type and documentKey are found on the change stream +// cursor. Returns the change stream document. +function assertWriteVisible({cursor, opType, docKey}) { + assert.soon(() => cursor.hasNext()); + const changeDoc = cursor.next(); + assert.eq(opType, changeDoc.operationType, changeDoc); + assert.eq(docKey, changeDoc.documentKey, changeDoc); + return changeDoc; +} - // Create a new cluster with 2 shards. Disable periodic no-ops to ensure that we have control - // over the ordering of events across the cluster. - const st = new ShardingTest({ - shards: 2, - rs: {nodes: 1, setParameter: {writePeriodicNoops: false, periodicNoopIntervalSecs: 1}} - }); +// Create a new cluster with 2 shards. Disable periodic no-ops to ensure that we have control +// over the ordering of events across the cluster. +const st = new ShardingTest({ + shards: 2, + rs: {nodes: 1, setParameter: {writePeriodicNoops: false, periodicNoopIntervalSecs: 1}} +}); - // Create two databases. We will place one of these on each shard. - const mongosDB0 = st.s.getDB(`${jsTestName()}_0`); - const mongosDB1 = st.s.getDB(`${jsTestName()}_1`); - const adminDB = st.s.getDB("admin"); +// Create two databases. We will place one of these on each shard. +const mongosDB0 = st.s.getDB(`${jsTestName()}_0`); +const mongosDB1 = st.s.getDB(`${jsTestName()}_1`); +const adminDB = st.s.getDB("admin"); - // Enable sharding on mongosDB0 and ensure its primary is shard0. - assert.commandWorked(mongosDB0.adminCommand({enableSharding: mongosDB0.getName()})); - st.ensurePrimaryShard(mongosDB0.getName(), st.rs0.getURL()); +// Enable sharding on mongosDB0 and ensure its primary is shard0. +assert.commandWorked(mongosDB0.adminCommand({enableSharding: mongosDB0.getName()})); +st.ensurePrimaryShard(mongosDB0.getName(), st.rs0.getURL()); - // Enable sharding on mongosDB1 and ensure its primary is shard1. - assert.commandWorked(mongosDB1.adminCommand({enableSharding: mongosDB1.getName()})); - st.ensurePrimaryShard(mongosDB1.getName(), st.rs1.getURL()); +// Enable sharding on mongosDB1 and ensure its primary is shard1. +assert.commandWorked(mongosDB1.adminCommand({enableSharding: mongosDB1.getName()})); +st.ensurePrimaryShard(mongosDB1.getName(), st.rs1.getURL()); - // Open a connection to a different collection on each shard. We use direct connections to - // ensure that the oplog timestamps across the shards overlap. - const coll0 = st.rs0.getPrimary().getCollection(`${mongosDB0.getName()}.test`); - const coll1 = st.rs1.getPrimary().getCollection(`${mongosDB1.getName()}.test`); +// Open a connection to a different collection on each shard. We use direct connections to +// ensure that the oplog timestamps across the shards overlap. +const coll0 = st.rs0.getPrimary().getCollection(`${mongosDB0.getName()}.test`); +const coll1 = st.rs1.getPrimary().getCollection(`${mongosDB1.getName()}.test`); - // Open a change stream on the test cluster. We will capture events in 'changeList'. - const changeStreamCursor = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]); - const changeList = []; +// Open a change stream on the test cluster. We will capture events in 'changeList'. +const changeStreamCursor = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]); +const changeList = []; - // Insert ten documents on each shard, alternating between the two collections. - for (let i = 0; i < 20; ++i) { - const coll = (i % 2 ? coll1 : coll0); - assert.commandWorked(coll.insert({shard: (i % 2)})); - } +// Insert ten documents on each shard, alternating between the two collections. +for (let i = 0; i < 20; ++i) { + const coll = (i % 2 ? coll1 : coll0); + assert.commandWorked(coll.insert({shard: (i % 2)})); +} - // Verify that each shard now has ten total documents present in the associated collection. - assert.eq(st.rs0.getPrimary().getCollection(coll0.getFullName()).count(), 10); - assert.eq(st.rs1.getPrimary().getCollection(coll1.getFullName()).count(), 10); +// Verify that each shard now has ten total documents present in the associated collection. +assert.eq(st.rs0.getPrimary().getCollection(coll0.getFullName()).count(), 10); +assert.eq(st.rs1.getPrimary().getCollection(coll1.getFullName()).count(), 10); - // Re-enable 'writePeriodicNoops' to ensure that all change stream events are returned. - FixtureHelpers.runCommandOnEachPrimary( - {db: adminDB, cmdObj: {setParameter: 1, writePeriodicNoops: true}}); +// Re-enable 'writePeriodicNoops' to ensure that all change stream events are returned. +FixtureHelpers.runCommandOnEachPrimary( + {db: adminDB, cmdObj: {setParameter: 1, writePeriodicNoops: true}}); - // Read the stream of events, capture them in 'changeList', and confirm that all events occurred - // at or later than the clusterTime of the first event. Unfortunately, we cannot guarantee that - // corresponding events occurred at the same clusterTime on both shards; we expect, however, - // that this will be true in the vast majority of runs, and so there is value in testing. - for (let i = 0; i < 20; ++i) { - assert.soon(() => changeStreamCursor.hasNext()); - changeList.push(changeStreamCursor.next()); - } - const clusterTime = changeList[0].clusterTime; - for (let event of changeList) { - assert.gte(event.clusterTime, clusterTime); - } +// Read the stream of events, capture them in 'changeList', and confirm that all events occurred +// at or later than the clusterTime of the first event. Unfortunately, we cannot guarantee that +// corresponding events occurred at the same clusterTime on both shards; we expect, however, +// that this will be true in the vast majority of runs, and so there is value in testing. +for (let i = 0; i < 20; ++i) { + assert.soon(() => changeStreamCursor.hasNext()); + changeList.push(changeStreamCursor.next()); +} +const clusterTime = changeList[0].clusterTime; +for (let event of changeList) { + assert.gte(event.clusterTime, clusterTime); +} - // Test that resuming from each event returns the expected set of subsequent documents. - for (let i = 0; i < changeList.length; ++i) { - const resumeCursor = adminDB.aggregate( - [{$changeStream: {allChangesForCluster: true, resumeAfter: changeList[i]._id}}]); +// Test that resuming from each event returns the expected set of subsequent documents. +for (let i = 0; i < changeList.length; ++i) { + const resumeCursor = adminDB.aggregate( + [{$changeStream: {allChangesForCluster: true, resumeAfter: changeList[i]._id}}]); - // Confirm that the first event in the resumed stream matches the next event recorded in - // 'changeList' from the original stream. The order of the events should be stable across - // resumes from any point. - for (let x = (i + 1); x < changeList.length; ++x) { - const expectedChangeDoc = changeList[x]; - assertWriteVisible({ - cursor: resumeCursor, - opType: expectedChangeDoc.operationType, - docKey: expectedChangeDoc.documentKey - }); - } - resumeCursor.close(); + // Confirm that the first event in the resumed stream matches the next event recorded in + // 'changeList' from the original stream. The order of the events should be stable across + // resumes from any point. + for (let x = (i + 1); x < changeList.length; ++x) { + const expectedChangeDoc = changeList[x]; + assertWriteVisible({ + cursor: resumeCursor, + opType: expectedChangeDoc.operationType, + docKey: expectedChangeDoc.documentKey + }); } + resumeCursor.close(); +} - st.stop(); +st.stop(); })(); |