summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js')
-rw-r--r--jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js148
1 files changed, 74 insertions, 74 deletions
diff --git a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
index 6ac410870a9..e8cdf1dc722 100644
--- a/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
+++ b/jstests/noPassthrough/change_streams_resume_same_clustertime_different_uuid.js
@@ -5,94 +5,94 @@
* @tags: [requires_sharding, uses_change_streams]
*/
(function() {
- "use strict";
+"use strict";
- load("jstests/libs/fixture_helpers.js"); // For runCommandOnEachPrimary.
+load("jstests/libs/fixture_helpers.js"); // For runCommandOnEachPrimary.
- // Asserts that the expected operation type and documentKey are found on the change stream
- // cursor. Returns the change stream document.
- function assertWriteVisible({cursor, opType, docKey}) {
- assert.soon(() => cursor.hasNext());
- const changeDoc = cursor.next();
- assert.eq(opType, changeDoc.operationType, changeDoc);
- assert.eq(docKey, changeDoc.documentKey, changeDoc);
- return changeDoc;
- }
+// Asserts that the expected operation type and documentKey are found on the change stream
+// cursor. Returns the change stream document.
+function assertWriteVisible({cursor, opType, docKey}) {
+ assert.soon(() => cursor.hasNext());
+ const changeDoc = cursor.next();
+ assert.eq(opType, changeDoc.operationType, changeDoc);
+ assert.eq(docKey, changeDoc.documentKey, changeDoc);
+ return changeDoc;
+}
- // Create a new cluster with 2 shards. Disable periodic no-ops to ensure that we have control
- // over the ordering of events across the cluster.
- const st = new ShardingTest({
- shards: 2,
- rs: {nodes: 1, setParameter: {writePeriodicNoops: false, periodicNoopIntervalSecs: 1}}
- });
+// Create a new cluster with 2 shards. Disable periodic no-ops to ensure that we have control
+// over the ordering of events across the cluster.
+const st = new ShardingTest({
+ shards: 2,
+ rs: {nodes: 1, setParameter: {writePeriodicNoops: false, periodicNoopIntervalSecs: 1}}
+});
- // Create two databases. We will place one of these on each shard.
- const mongosDB0 = st.s.getDB(`${jsTestName()}_0`);
- const mongosDB1 = st.s.getDB(`${jsTestName()}_1`);
- const adminDB = st.s.getDB("admin");
+// Create two databases. We will place one of these on each shard.
+const mongosDB0 = st.s.getDB(`${jsTestName()}_0`);
+const mongosDB1 = st.s.getDB(`${jsTestName()}_1`);
+const adminDB = st.s.getDB("admin");
- // Enable sharding on mongosDB0 and ensure its primary is shard0.
- assert.commandWorked(mongosDB0.adminCommand({enableSharding: mongosDB0.getName()}));
- st.ensurePrimaryShard(mongosDB0.getName(), st.rs0.getURL());
+// Enable sharding on mongosDB0 and ensure its primary is shard0.
+assert.commandWorked(mongosDB0.adminCommand({enableSharding: mongosDB0.getName()}));
+st.ensurePrimaryShard(mongosDB0.getName(), st.rs0.getURL());
- // Enable sharding on mongosDB1 and ensure its primary is shard1.
- assert.commandWorked(mongosDB1.adminCommand({enableSharding: mongosDB1.getName()}));
- st.ensurePrimaryShard(mongosDB1.getName(), st.rs1.getURL());
+// Enable sharding on mongosDB1 and ensure its primary is shard1.
+assert.commandWorked(mongosDB1.adminCommand({enableSharding: mongosDB1.getName()}));
+st.ensurePrimaryShard(mongosDB1.getName(), st.rs1.getURL());
- // Open a connection to a different collection on each shard. We use direct connections to
- // ensure that the oplog timestamps across the shards overlap.
- const coll0 = st.rs0.getPrimary().getCollection(`${mongosDB0.getName()}.test`);
- const coll1 = st.rs1.getPrimary().getCollection(`${mongosDB1.getName()}.test`);
+// Open a connection to a different collection on each shard. We use direct connections to
+// ensure that the oplog timestamps across the shards overlap.
+const coll0 = st.rs0.getPrimary().getCollection(`${mongosDB0.getName()}.test`);
+const coll1 = st.rs1.getPrimary().getCollection(`${mongosDB1.getName()}.test`);
- // Open a change stream on the test cluster. We will capture events in 'changeList'.
- const changeStreamCursor = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]);
- const changeList = [];
+// Open a change stream on the test cluster. We will capture events in 'changeList'.
+const changeStreamCursor = adminDB.aggregate([{$changeStream: {allChangesForCluster: true}}]);
+const changeList = [];
- // Insert ten documents on each shard, alternating between the two collections.
- for (let i = 0; i < 20; ++i) {
- const coll = (i % 2 ? coll1 : coll0);
- assert.commandWorked(coll.insert({shard: (i % 2)}));
- }
+// Insert ten documents on each shard, alternating between the two collections.
+for (let i = 0; i < 20; ++i) {
+ const coll = (i % 2 ? coll1 : coll0);
+ assert.commandWorked(coll.insert({shard: (i % 2)}));
+}
- // Verify that each shard now has ten total documents present in the associated collection.
- assert.eq(st.rs0.getPrimary().getCollection(coll0.getFullName()).count(), 10);
- assert.eq(st.rs1.getPrimary().getCollection(coll1.getFullName()).count(), 10);
+// Verify that each shard now has ten total documents present in the associated collection.
+assert.eq(st.rs0.getPrimary().getCollection(coll0.getFullName()).count(), 10);
+assert.eq(st.rs1.getPrimary().getCollection(coll1.getFullName()).count(), 10);
- // Re-enable 'writePeriodicNoops' to ensure that all change stream events are returned.
- FixtureHelpers.runCommandOnEachPrimary(
- {db: adminDB, cmdObj: {setParameter: 1, writePeriodicNoops: true}});
+// Re-enable 'writePeriodicNoops' to ensure that all change stream events are returned.
+FixtureHelpers.runCommandOnEachPrimary(
+ {db: adminDB, cmdObj: {setParameter: 1, writePeriodicNoops: true}});
- // Read the stream of events, capture them in 'changeList', and confirm that all events occurred
- // at or later than the clusterTime of the first event. Unfortunately, we cannot guarantee that
- // corresponding events occurred at the same clusterTime on both shards; we expect, however,
- // that this will be true in the vast majority of runs, and so there is value in testing.
- for (let i = 0; i < 20; ++i) {
- assert.soon(() => changeStreamCursor.hasNext());
- changeList.push(changeStreamCursor.next());
- }
- const clusterTime = changeList[0].clusterTime;
- for (let event of changeList) {
- assert.gte(event.clusterTime, clusterTime);
- }
+// Read the stream of events, capture them in 'changeList', and confirm that all events occurred
+// at or later than the clusterTime of the first event. Unfortunately, we cannot guarantee that
+// corresponding events occurred at the same clusterTime on both shards; we expect, however,
+// that this will be true in the vast majority of runs, and so there is value in testing.
+for (let i = 0; i < 20; ++i) {
+ assert.soon(() => changeStreamCursor.hasNext());
+ changeList.push(changeStreamCursor.next());
+}
+const clusterTime = changeList[0].clusterTime;
+for (let event of changeList) {
+ assert.gte(event.clusterTime, clusterTime);
+}
- // Test that resuming from each event returns the expected set of subsequent documents.
- for (let i = 0; i < changeList.length; ++i) {
- const resumeCursor = adminDB.aggregate(
- [{$changeStream: {allChangesForCluster: true, resumeAfter: changeList[i]._id}}]);
+// Test that resuming from each event returns the expected set of subsequent documents.
+for (let i = 0; i < changeList.length; ++i) {
+ const resumeCursor = adminDB.aggregate(
+ [{$changeStream: {allChangesForCluster: true, resumeAfter: changeList[i]._id}}]);
- // Confirm that the first event in the resumed stream matches the next event recorded in
- // 'changeList' from the original stream. The order of the events should be stable across
- // resumes from any point.
- for (let x = (i + 1); x < changeList.length; ++x) {
- const expectedChangeDoc = changeList[x];
- assertWriteVisible({
- cursor: resumeCursor,
- opType: expectedChangeDoc.operationType,
- docKey: expectedChangeDoc.documentKey
- });
- }
- resumeCursor.close();
+ // Confirm that the first event in the resumed stream matches the next event recorded in
+ // 'changeList' from the original stream. The order of the events should be stable across
+ // resumes from any point.
+ for (let x = (i + 1); x < changeList.length; ++x) {
+ const expectedChangeDoc = changeList[x];
+ assertWriteVisible({
+ cursor: resumeCursor,
+ opType: expectedChangeDoc.operationType,
+ docKey: expectedChangeDoc.documentKey
+ });
}
+ resumeCursor.close();
+}
- st.stop();
+st.stop();
})();