diff options
author | jannaerin <golden.janna@gmail.com> | 2021-04-26 20:23:44 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-05 18:13:49 +0000 |
commit | 946fb51b19e4943a069b8af1442ba5daeef61fa2 (patch) | |
tree | 2b428d27a1bd2beed1fc80ee8480424bfe2331ef /jstests | |
parent | 6701ac9e1cb8f42ae479d70f0fa6d1fa2b8bc995 (diff) | |
download | mongo-946fb51b19e4943a069b8af1442ba5daeef61fa2.tar.gz |
SERVER-54982 Test resumability of changes streams together with reshardCollection
Diffstat (limited to 'jstests')
-rw-r--r-- | jstests/sharding/libs/resharding_test_fixture.js | 17 | ||||
-rw-r--r-- | jstests/sharding/resharding_change_streams_resumability.js | 170 |
2 files changed, 186 insertions, 1 deletions
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js index ba167f78fef..36bcfd52675 100644 --- a/jstests/sharding/libs/resharding_test_fixture.js +++ b/jstests/sharding/libs/resharding_test_fixture.js @@ -30,6 +30,8 @@ var ReshardingTest = class { minimumOperationDurationMS: minimumOperationDurationMS = undefined, criticalSectionTimeoutMS: criticalSectionTimeoutMS = 24 * 60 * 60 * 1000 /* 1 day */, commitImplicitly: commitImplicitly = true, + periodicNoopIntervalSecs: periodicNoopIntervalSecs = undefined, + writePeriodicNoops: writePeriodicNoops = undefined, } = {}) { // The @private JSDoc comments cause VS Code to not display the corresponding properties and // methods in its autocomplete list. This makes it simpler for test authors to know what the @@ -50,6 +52,10 @@ var ReshardingTest = class { this._criticalSectionTimeoutMS = criticalSectionTimeoutMS; /** @private */ this._commitImplicitly = commitImplicitly; + /** @private */ + this._periodicNoopIntervalSecs = periodicNoopIntervalSecs; + /** @private */ + this._writePeriodicNoops = writePeriodicNoops; // Properties set by setup(). /** @private */ @@ -97,6 +103,15 @@ var ReshardingTest = class { this._criticalSectionTimeoutMS; } + let rsConfig = {setParameter: {featureFlagResharding: true}}; + if (this._periodicNoopIntervalSecs !== undefined) { + rsConfig.setParameter.periodicNoopIntervalSecs = this._periodicNoopIntervalSecs; + } + + if (this._writePeriodicNoops !== undefined) { + rsConfig.setParameter.writePeriodicNoops = this._writePeriodicNoops; + } + this._st = new ShardingTest({ mongos: 1, mongosOptions: {setParameter: {featureFlagResharding: true}}, @@ -104,7 +119,7 @@ var ReshardingTest = class { configOptions: config, shards: this._numShards, rs: {nodes: 2}, - rsOptions: {setParameter: {featureFlagResharding: true}}, + rsOptions: rsConfig, manualAddShard: true, }); diff --git a/jstests/sharding/resharding_change_streams_resumability.js b/jstests/sharding/resharding_change_streams_resumability.js new file mode 100644 index 00000000000..577f4974109 --- /dev/null +++ b/jstests/sharding/resharding_change_streams_resumability.js @@ -0,0 +1,170 @@ +// Tests that change streams on a collection can be resumed during and after the given collection is +// resharded. +// +// @tags: [ +// requires_majority_read_concern, +// sbe_incompatible, +// uses_change_streams, +// requires_fcv_49, +// uses_atclustertime, +// ] +(function() { +"use strict"; + +load('jstests/libs/change_stream_util.js'); +load("jstests/libs/discover_topology.js"); +load("jstests/sharding/libs/resharding_test_fixture.js"); + +// Use a higher frequency for periodic noops to speed up the test. +const reshardingTest = new ReshardingTest({ + numDonors: 2, + numRecipients: 1, + reshardInPlace: false, + periodicNoopIntervalSecs: 1, + writePeriodicNoops: true +}); +reshardingTest.setup(); + +const kDbName = "reshardingDb"; +const collName = "coll"; +const ns = kDbName + "." + collName; + +const donorShardNames = reshardingTest.donorShardNames; +const sourceCollection = reshardingTest.createShardedCollection({ + ns: ns, + shardKeyPattern: {oldKey: 1}, + chunks: [ + {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]}, + {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]} + ], + primaryShardName: donorShardNames[0] +}); + +const mongos = sourceCollection.getMongo(); +const reshardingDb = mongos.getDB(kDbName); + +const cst = new ChangeStreamTest(reshardingDb); + +// Open a change streams cursor on the collection that will be resharded. +let changeStreamsCursor = + cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collName}); +assert.eq([], changeStreamsCursor.firstBatch, "Expected cursor not to have changes, but it did"); + +const expectedChanges = [ + { + documentKey: {_id: 0, oldKey: 0}, + fullDocument: {_id: 0, oldKey: 0}, + ns: {db: kDbName, coll: collName}, + operationType: "insert", + }, + { + documentKey: {oldKey: 1, _id: 1}, + fullDocument: {_id: 1, oldKey: 1}, + ns: {db: kDbName, coll: collName}, + operationType: "insert", + }, + { + documentKey: {oldKey: 2, _id: 2}, + fullDocument: {_id: 2, oldKey: 2}, + ns: {db: kDbName, coll: collName}, + operationType: "insert", + }, + { + documentKey: {newKey: 3, _id: 3}, + fullDocument: {_id: 3, newKey: 3, oldKey: 3}, + ns: {db: kDbName, coll: collName}, + operationType: "insert", + }, + { + documentKey: {newKey: 4, _id: 4}, + fullDocument: {_id: 4, newKey: 4, oldKey: 4}, + ns: {db: kDbName, coll: collName}, + operationType: "insert", + } +]; +const preReshardCollectionChange = expectedChanges[0]; +const midReshardCollectionChanges = expectedChanges.slice(1, 3); +const postReshardCollectionChanges = expectedChanges.slice(3); + +// Verify that the cursor sees changes before the collection is resharded. +assert.commandWorked(sourceCollection.insert({_id: 0, oldKey: 0})); +const preReshardCollectionResumeToken = + cst.assertNextChangesEqual( + {cursor: changeStreamsCursor, expectedChanges: [preReshardCollectionChange]})[0] + ._id; + +const recipientShardNames = reshardingTest.recipientShardNames; +let midReshardCollectionResumeToken; +let changeStreamsCursor2; +reshardingTest.withReshardingInBackground( // + { + // If a donor is also a recipient, the donor state machine will run renameCollection with + // {dropTarget : true} rather than running drop and letting the recipient state machine run + // rename at the end of the resharding operation. So, we ensure that only one of the donor + // shards will also be a recipient shard order to verify that neither the rename with + // {dropTarget : true} nor the drop command are picked up by the change streams cursor. + newShardKeyPattern: {newKey: 1}, + newChunks: [ + {min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]}, + ], + }, + () => { + // Wait until participants are aware of the resharding operation. + reshardingTest.awaitCloneTimestampChosen(); + + // Open another change streams cursor while the collection is being resharded. + changeStreamsCursor2 = + cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collName}); + + assert.commandWorked(sourceCollection.insert({_id: 1, oldKey: 1})); + assert.commandWorked(sourceCollection.insert({_id: 2, oldKey: 2})); + + // Assert that both the cursors see the two new inserts. + cst.assertNextChangesEqual( + {cursor: changeStreamsCursor, expectedChanges: midReshardCollectionChanges}); + cst.assertNextChangesEqual( + {cursor: changeStreamsCursor2, expectedChanges: midReshardCollectionChanges}); + + // Check that we can resume from the token returned before resharding began. + let resumedCursor = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: preReshardCollectionResumeToken}}], + collection: collName + }); + midReshardCollectionResumeToken = + cst.assertNextChangesEqual( + {cursor: resumedCursor, expectedChanges: midReshardCollectionChanges})[1] + ._id; + }); + +assert.commandWorked(sourceCollection.insert({_id: 3, newKey: 3, oldKey: 3})); + +// Assert that both the cursor opened before resharding started and the one opened during +// resharding see the insert after resharding has finished. +cst.assertNextChangesEqual( + {cursor: changeStreamsCursor, expectedChanges: [postReshardCollectionChanges[0]]}); +cst.assertNextChangesEqual( + {cursor: changeStreamsCursor2, expectedChanges: [postReshardCollectionChanges[0]]}); + +// Check that we can resume from both the token returned before resharding began and the token +// returned during resharding. +let resumedCursorFromPreOperation = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: preReshardCollectionResumeToken}}], + collection: collName +}); +let midAndPostReshardCollectionChanges = + midReshardCollectionChanges.concat(postReshardCollectionChanges); + +let resumedCursorFromMidOperation = cst.startWatchingChanges({ + pipeline: [{$changeStream: {resumeAfter: midReshardCollectionResumeToken}}], + collection: collName +}); + +assert.commandWorked(sourceCollection.insert({_id: 4, newKey: 4, oldKey: 4})); + +cst.assertNextChangesEqual( + {cursor: resumedCursorFromPreOperation, expectedChanges: midAndPostReshardCollectionChanges}); +cst.assertNextChangesEqual( + {cursor: resumedCursorFromMidOperation, expectedChanges: postReshardCollectionChanges}); + +reshardingTest.teardown(); +})(); |