diff options
Diffstat (limited to 'jstests/sharding/resume_change_stream.js')
-rw-r--r-- | jstests/sharding/resume_change_stream.js | 356 |
1 files changed, 178 insertions, 178 deletions
diff --git a/jstests/sharding/resume_change_stream.js b/jstests/sharding/resume_change_stream.js index 9b5e33d0173..19c53012fda 100644 --- a/jstests/sharding/resume_change_stream.js +++ b/jstests/sharding/resume_change_stream.js @@ -2,211 +2,211 @@ // We need to use a readConcern in this test, which requires read commands. // @tags: [requires_find_command, uses_change_streams] (function() { - "use strict"; +"use strict"; + +load('jstests/replsets/rslib.js'); // For getLatestOp. +load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. + +// For supportsMajorityReadConcern. +load('jstests/multiVersion/libs/causal_consistency_helpers.js'); + +// This test only works on storage engines that support committed reads, skip it if the +// configured engine doesn't support it. +if (!supportsMajorityReadConcern()) { + jsTestLog("Skipping test since storage engine doesn't support majority read concern."); + return; +} + +const oplogSize = 1; // size in MB +const st = new ShardingTest({ + shards: 2, + rs: { + nodes: 1, + oplogSize: oplogSize, + enableMajorityReadConcern: '', + // Use the noop writer with a higher frequency for periodic noops to speed up the test. + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} + } +}); - load('jstests/replsets/rslib.js'); // For getLatestOp. - load('jstests/libs/change_stream_util.js'); // For ChangeStreamTest. +const mongosDB = st.s0.getDB(jsTestName()); +const mongosColl = mongosDB[jsTestName()]; - // For supportsMajorityReadConcern. - load('jstests/multiVersion/libs/causal_consistency_helpers.js'); +let cst = new ChangeStreamTest(mongosDB); - // This test only works on storage engines that support committed reads, skip it if the - // configured engine doesn't support it. - if (!supportsMajorityReadConcern()) { - jsTestLog("Skipping test since storage engine doesn't support majority read concern."); - return; - } +function testResume(mongosColl, collToWatch) { + mongosColl.drop(); - const oplogSize = 1; // size in MB - const st = new ShardingTest({ - shards: 2, - rs: { - nodes: 1, - oplogSize: oplogSize, - enableMajorityReadConcern: '', - // Use the noop writer with a higher frequency for periodic noops to speed up the test. - setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} - } - }); + // Enable sharding on the test DB and ensure its primary is st.shard0.shardName. + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); + st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); - const mongosDB = st.s0.getDB(jsTestName()); - const mongosColl = mongosDB[jsTestName()]; + // Shard the test collection on _id. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); - let cst = new ChangeStreamTest(mongosDB); + // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); - function testResume(mongosColl, collToWatch) { - mongosColl.drop(); + // Move the [0, MaxKey] chunk to st.shard1.shardName. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); - // Enable sharding on the test DB and ensure its primary is st.shard0.shardName. - assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); - st.ensurePrimaryShard(mongosDB.getName(), st.rs0.getURL()); + // Write a document to each chunk. + assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); - // Shard the test collection on _id. - assert.commandWorked( - mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); + let changeStream = cst.startWatchingChanges( + {pipeline: [{$changeStream: {}}], collection: collToWatch, includeToken: true}); - // Split the collection into 2 chunks: [MinKey, 0), [0, MaxKey]. - assert.commandWorked( - mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {_id: 0}})); + // We awaited the replication of the first writes, so the change stream shouldn't return + // them. + assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}})); - // Move the [0, MaxKey] chunk to st.shard1.shardName. - assert.commandWorked(mongosDB.adminCommand( - {moveChunk: mongosColl.getFullName(), find: {_id: 1}, to: st.rs1.getURL()})); + // Record current time to resume a change stream later in the test. + const resumeTimeFirstUpdate = mongosDB.runCommand({isMaster: 1}).$clusterTime.clusterTime; - // Write a document to each chunk. - assert.writeOK(mongosColl.insert({_id: -1}, {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: 1}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}})); - let changeStream = cst.startWatchingChanges( - {pipeline: [{$changeStream: {}}], collection: collToWatch, includeToken: true}); + // Test that we see the two writes, and remember their resume tokens. + let next = cst.getOneChange(changeStream); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey._id, -1); + const resumeTokenFromFirstUpdateOnShard0 = next._id; - // We awaited the replication of the first writes, so the change stream shouldn't return - // them. - assert.writeOK(mongosColl.update({_id: -1}, {$set: {updated: true}})); + next = cst.getOneChange(changeStream); + assert.eq(next.operationType, "update"); + assert.eq(next.documentKey._id, 1); + const resumeTokenFromFirstUpdateOnShard1 = next._id; - // Record current time to resume a change stream later in the test. - const resumeTimeFirstUpdate = mongosDB.runCommand({isMaster: 1}).$clusterTime.clusterTime; + // Write some additional documents, then test that it's possible to resume after the first + // update. + assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.update({_id: 1}, {$set: {updated: true}})); + changeStream = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], + collection: collToWatch + }); - // Test that we see the two writes, and remember their resume tokens. - let next = cst.getOneChange(changeStream); - assert.eq(next.operationType, "update"); - assert.eq(next.documentKey._id, -1); - const resumeTokenFromFirstUpdateOnShard0 = next._id; + for (let nextExpectedId of [1, -2, 2]) { + assert.eq(cst.getOneChange(changeStream).documentKey._id, nextExpectedId); + } - next = cst.getOneChange(changeStream); - assert.eq(next.operationType, "update"); - assert.eq(next.documentKey._id, 1); - const resumeTokenFromFirstUpdateOnShard1 = next._id; + // Test that the stream can't resume if the resume token is no longer present in the oplog. + + // Roll over the entire oplog on the shard with the resume token for the first update. + const shardWithResumeToken = st.rs1.getPrimary(); // Resume from shard 1. + const mostRecentOplogEntry = getLatestOp(shardWithResumeToken); + assert.neq(mostRecentOplogEntry, null); + const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi'); + let i = 0; + + function oplogIsRolledOver() { + // The oplog has rolled over if the op that used to be newest is now older than the + // oplog's current oldest entry. Said another way, the oplog is rolled over when + // everything in the oplog is newer than what used to be the newest entry. + return bsonWoCompare( + mostRecentOplogEntry.ts, + getLeastRecentOp({server: shardWithResumeToken, readConcern: "majority"}).ts) < + 0; + } - // Write some additional documents, then test that it's possible to resume after the first - // update. - assert.writeOK(mongosColl.insert({_id: -2}, {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: 2}, {writeConcern: {w: "majority"}})); + while (!oplogIsRolledOver()) { + let idVal = 100 + (i++); + assert.writeOK( + mongosColl.insert({_id: idVal, long_str: largeStr}, {writeConcern: {w: "majority"}})); + sleep(100); + } - changeStream = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], - collection: collToWatch - }); + ChangeStreamTest.assertChangeStreamThrowsCode({ + db: mongosDB, + collName: collToWatch, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard1}}], + expectedCode: 40576 + }); - for (let nextExpectedId of[1, -2, 2]) { - assert.eq(cst.getOneChange(changeStream).documentKey._id, nextExpectedId); - } - - // Test that the stream can't resume if the resume token is no longer present in the oplog. - - // Roll over the entire oplog on the shard with the resume token for the first update. - const shardWithResumeToken = st.rs1.getPrimary(); // Resume from shard 1. - const mostRecentOplogEntry = getLatestOp(shardWithResumeToken); - assert.neq(mostRecentOplogEntry, null); - const largeStr = new Array(4 * 1024 * oplogSize).join('abcdefghi'); - let i = 0; - - function oplogIsRolledOver() { - // The oplog has rolled over if the op that used to be newest is now older than the - // oplog's current oldest entry. Said another way, the oplog is rolled over when - // everything in the oplog is newer than what used to be the newest entry. - return bsonWoCompare(mostRecentOplogEntry.ts, getLeastRecentOp({ - server: shardWithResumeToken, - readConcern: "majority" - }).ts) < 0; - } - - while (!oplogIsRolledOver()) { - let idVal = 100 + (i++); - assert.writeOK(mongosColl.insert({_id: idVal, long_str: largeStr}, - {writeConcern: {w: "majority"}})); - sleep(100); - } - - ChangeStreamTest.assertChangeStreamThrowsCode({ - db: mongosDB, - collName: collToWatch, - pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard1}}], - expectedCode: 40576 - }); + ChangeStreamTest.assertChangeStreamThrowsCode({ + db: mongosDB, + collName: collToWatch, + pipeline: [{$changeStream: {startAtOperationTime: resumeTimeFirstUpdate}}], + expectedCode: 40576 + }); - ChangeStreamTest.assertChangeStreamThrowsCode({ - db: mongosDB, - collName: collToWatch, - pipeline: [{$changeStream: {startAtOperationTime: resumeTimeFirstUpdate}}], - expectedCode: 40576 - }); + // Test that the change stream can't resume if the resume token *is* present in the oplog, + // but one of the shards has rolled over its oplog enough that it doesn't have a long enough + // history to resume. Since we just rolled over the oplog on shard 1, we know that + // 'resumeTokenFromFirstUpdateOnShard0' is still present on shard 0, but shard 1 doesn't + // have any changes earlier than that, so won't be able to resume. + ChangeStreamTest.assertChangeStreamThrowsCode({ + db: mongosDB, + collName: collToWatch, + pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], + expectedCode: 40576 + }); - // Test that the change stream can't resume if the resume token *is* present in the oplog, - // but one of the shards has rolled over its oplog enough that it doesn't have a long enough - // history to resume. Since we just rolled over the oplog on shard 1, we know that - // 'resumeTokenFromFirstUpdateOnShard0' is still present on shard 0, but shard 1 doesn't - // have any changes earlier than that, so won't be able to resume. - ChangeStreamTest.assertChangeStreamThrowsCode({ - db: mongosDB, - collName: collToWatch, - pipeline: [{$changeStream: {resumeAfter: resumeTokenFromFirstUpdateOnShard0}}], - expectedCode: 40576 - }); + // Drop the collection. + assert(mongosColl.drop()); + + // Shard the test collection on shardKey. + assert.commandWorked( + mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}})); + + // Split the collection into 2 chunks: [MinKey, 50), [50, MaxKey]. + assert.commandWorked( + mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 50}})); + + // Move the [50, MaxKey] chunk to st.shard1.shardName. + assert.commandWorked(mongosDB.adminCommand( + {moveChunk: mongosColl.getFullName(), find: {shardKey: 51}, to: st.rs1.getURL()})); + + const numberOfDocs = 100; + + // Insert test documents. + for (let counter = 0; counter < numberOfDocs / 5; ++counter) { + assert.writeOK(mongosColl.insert({_id: "abcd" + counter, shardKey: counter * 5 + 0}, + {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: "Abcd" + counter, shardKey: counter * 5 + 1}, + {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: "aBcd" + counter, shardKey: counter * 5 + 2}, + {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: "abCd" + counter, shardKey: counter * 5 + 3}, + {writeConcern: {w: "majority"}})); + assert.writeOK(mongosColl.insert({_id: "abcD" + counter, shardKey: counter * 5 + 4}, + {writeConcern: {w: "majority"}})); + } - // Drop the collection. - assert(mongosColl.drop()); - - // Shard the test collection on shardKey. - assert.commandWorked( - mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {shardKey: 1}})); - - // Split the collection into 2 chunks: [MinKey, 50), [50, MaxKey]. - assert.commandWorked( - mongosDB.adminCommand({split: mongosColl.getFullName(), middle: {shardKey: 50}})); - - // Move the [50, MaxKey] chunk to st.shard1.shardName. - assert.commandWorked(mongosDB.adminCommand( - {moveChunk: mongosColl.getFullName(), find: {shardKey: 51}, to: st.rs1.getURL()})); - - const numberOfDocs = 100; - - // Insert test documents. - for (let counter = 0; counter < numberOfDocs / 5; ++counter) { - assert.writeOK(mongosColl.insert({_id: "abcd" + counter, shardKey: counter * 5 + 0}, - {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: "Abcd" + counter, shardKey: counter * 5 + 1}, - {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: "aBcd" + counter, shardKey: counter * 5 + 2}, - {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: "abCd" + counter, shardKey: counter * 5 + 3}, - {writeConcern: {w: "majority"}})); - assert.writeOK(mongosColl.insert({_id: "abcD" + counter, shardKey: counter * 5 + 4}, - {writeConcern: {w: "majority"}})); - } - - let allChangesCursor = cst.startWatchingChanges( - {pipeline: [{$changeStream: {}}], collection: collToWatch, includeToken: true}); - - // Perform the multi-update that will induce timestamp collisions - assert.writeOK(mongosColl.update({}, {$set: {updated: true}}, {multi: true})); - - // Loop over documents and open inner change streams resuming from a specified position. - // Note we skip the last document as it does not have the next document so we would - // hang indefinitely. - for (let counter = 0; counter < numberOfDocs - 1; ++counter) { - let next = cst.getOneChange(allChangesCursor); - - const resumeToken = next._id; - const caseInsensitive = {locale: "en_US", strength: 2}; - let resumedCaseInsensitiveCursor = cst.startWatchingChanges({ - pipeline: [{$changeStream: {resumeAfter: resumeToken}}], - collection: collToWatch, - aggregateOptions: {collation: caseInsensitive} - }); - cst.getOneChange(resumedCaseInsensitiveCursor); - } + let allChangesCursor = cst.startWatchingChanges( + {pipeline: [{$changeStream: {}}], collection: collToWatch, includeToken: true}); + + // Perform the multi-update that will induce timestamp collisions + assert.writeOK(mongosColl.update({}, {$set: {updated: true}}, {multi: true})); + + // Loop over documents and open inner change streams resuming from a specified position. + // Note we skip the last document as it does not have the next document so we would + // hang indefinitely. + for (let counter = 0; counter < numberOfDocs - 1; ++counter) { + let next = cst.getOneChange(allChangesCursor); + + const resumeToken = next._id; + const caseInsensitive = {locale: "en_US", strength: 2}; + let resumedCaseInsensitiveCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: resumeToken}}], + collection: collToWatch, + aggregateOptions: {collation: caseInsensitive} + }); + cst.getOneChange(resumedCaseInsensitiveCursor); } +} - // Test change stream on a single collection. - testResume(mongosColl, mongosColl.getName()); +// Test change stream on a single collection. +testResume(mongosColl, mongosColl.getName()); - // Test change stream on all collections. - testResume(mongosColl, 1); +// Test change stream on all collections. +testResume(mongosColl, 1); - cst.cleanUp(); +cst.cleanUp(); - st.stop(); +st.stop(); })(); |