summaryrefslogtreecommitdiff
path: root/jstests
diff options
context:
space:
mode:
authorjannaerin <golden.janna@gmail.com>2021-04-26 20:23:44 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-05 18:13:49 +0000
commit946fb51b19e4943a069b8af1442ba5daeef61fa2 (patch)
tree2b428d27a1bd2beed1fc80ee8480424bfe2331ef /jstests
parent6701ac9e1cb8f42ae479d70f0fa6d1fa2b8bc995 (diff)
downloadmongo-946fb51b19e4943a069b8af1442ba5daeef61fa2.tar.gz
SERVER-54982 Test resumability of changes streams together with reshardCollection
Diffstat (limited to 'jstests')
-rw-r--r--jstests/sharding/libs/resharding_test_fixture.js17
-rw-r--r--jstests/sharding/resharding_change_streams_resumability.js170
2 files changed, 186 insertions, 1 deletions
diff --git a/jstests/sharding/libs/resharding_test_fixture.js b/jstests/sharding/libs/resharding_test_fixture.js
index ba167f78fef..36bcfd52675 100644
--- a/jstests/sharding/libs/resharding_test_fixture.js
+++ b/jstests/sharding/libs/resharding_test_fixture.js
@@ -30,6 +30,8 @@ var ReshardingTest = class {
minimumOperationDurationMS: minimumOperationDurationMS = undefined,
criticalSectionTimeoutMS: criticalSectionTimeoutMS = 24 * 60 * 60 * 1000 /* 1 day */,
commitImplicitly: commitImplicitly = true,
+ periodicNoopIntervalSecs: periodicNoopIntervalSecs = undefined,
+ writePeriodicNoops: writePeriodicNoops = undefined,
} = {}) {
// The @private JSDoc comments cause VS Code to not display the corresponding properties and
// methods in its autocomplete list. This makes it simpler for test authors to know what the
@@ -50,6 +52,10 @@ var ReshardingTest = class {
this._criticalSectionTimeoutMS = criticalSectionTimeoutMS;
/** @private */
this._commitImplicitly = commitImplicitly;
+ /** @private */
+ this._periodicNoopIntervalSecs = periodicNoopIntervalSecs;
+ /** @private */
+ this._writePeriodicNoops = writePeriodicNoops;
// Properties set by setup().
/** @private */
@@ -97,6 +103,15 @@ var ReshardingTest = class {
this._criticalSectionTimeoutMS;
}
+ let rsConfig = {setParameter: {featureFlagResharding: true}};
+ if (this._periodicNoopIntervalSecs !== undefined) {
+ rsConfig.setParameter.periodicNoopIntervalSecs = this._periodicNoopIntervalSecs;
+ }
+
+ if (this._writePeriodicNoops !== undefined) {
+ rsConfig.setParameter.writePeriodicNoops = this._writePeriodicNoops;
+ }
+
this._st = new ShardingTest({
mongos: 1,
mongosOptions: {setParameter: {featureFlagResharding: true}},
@@ -104,7 +119,7 @@ var ReshardingTest = class {
configOptions: config,
shards: this._numShards,
rs: {nodes: 2},
- rsOptions: {setParameter: {featureFlagResharding: true}},
+ rsOptions: rsConfig,
manualAddShard: true,
});
diff --git a/jstests/sharding/resharding_change_streams_resumability.js b/jstests/sharding/resharding_change_streams_resumability.js
new file mode 100644
index 00000000000..577f4974109
--- /dev/null
+++ b/jstests/sharding/resharding_change_streams_resumability.js
@@ -0,0 +1,170 @@
+// Tests that change streams on a collection can be resumed during and after the given collection is
+// resharded.
+//
+// @tags: [
+// requires_majority_read_concern,
+// sbe_incompatible,
+// uses_change_streams,
+// requires_fcv_49,
+// uses_atclustertime,
+// ]
+(function() {
+"use strict";
+
+load('jstests/libs/change_stream_util.js');
+load("jstests/libs/discover_topology.js");
+load("jstests/sharding/libs/resharding_test_fixture.js");
+
+// Use a higher frequency for periodic noops to speed up the test.
+const reshardingTest = new ReshardingTest({
+ numDonors: 2,
+ numRecipients: 1,
+ reshardInPlace: false,
+ periodicNoopIntervalSecs: 1,
+ writePeriodicNoops: true
+});
+reshardingTest.setup();
+
+const kDbName = "reshardingDb";
+const collName = "coll";
+const ns = kDbName + "." + collName;
+
+const donorShardNames = reshardingTest.donorShardNames;
+const sourceCollection = reshardingTest.createShardedCollection({
+ ns: ns,
+ shardKeyPattern: {oldKey: 1},
+ chunks: [
+ {min: {oldKey: MinKey}, max: {oldKey: 0}, shard: donorShardNames[0]},
+ {min: {oldKey: 0}, max: {oldKey: MaxKey}, shard: donorShardNames[1]}
+ ],
+ primaryShardName: donorShardNames[0]
+});
+
+const mongos = sourceCollection.getMongo();
+const reshardingDb = mongos.getDB(kDbName);
+
+const cst = new ChangeStreamTest(reshardingDb);
+
+// Open a change streams cursor on the collection that will be resharded.
+let changeStreamsCursor =
+ cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collName});
+assert.eq([], changeStreamsCursor.firstBatch, "Expected cursor not to have changes, but it did");
+
+const expectedChanges = [
+ {
+ documentKey: {_id: 0, oldKey: 0},
+ fullDocument: {_id: 0, oldKey: 0},
+ ns: {db: kDbName, coll: collName},
+ operationType: "insert",
+ },
+ {
+ documentKey: {oldKey: 1, _id: 1},
+ fullDocument: {_id: 1, oldKey: 1},
+ ns: {db: kDbName, coll: collName},
+ operationType: "insert",
+ },
+ {
+ documentKey: {oldKey: 2, _id: 2},
+ fullDocument: {_id: 2, oldKey: 2},
+ ns: {db: kDbName, coll: collName},
+ operationType: "insert",
+ },
+ {
+ documentKey: {newKey: 3, _id: 3},
+ fullDocument: {_id: 3, newKey: 3, oldKey: 3},
+ ns: {db: kDbName, coll: collName},
+ operationType: "insert",
+ },
+ {
+ documentKey: {newKey: 4, _id: 4},
+ fullDocument: {_id: 4, newKey: 4, oldKey: 4},
+ ns: {db: kDbName, coll: collName},
+ operationType: "insert",
+ }
+];
+const preReshardCollectionChange = expectedChanges[0];
+const midReshardCollectionChanges = expectedChanges.slice(1, 3);
+const postReshardCollectionChanges = expectedChanges.slice(3);
+
+// Verify that the cursor sees changes before the collection is resharded.
+assert.commandWorked(sourceCollection.insert({_id: 0, oldKey: 0}));
+const preReshardCollectionResumeToken =
+ cst.assertNextChangesEqual(
+ {cursor: changeStreamsCursor, expectedChanges: [preReshardCollectionChange]})[0]
+ ._id;
+
+const recipientShardNames = reshardingTest.recipientShardNames;
+let midReshardCollectionResumeToken;
+let changeStreamsCursor2;
+reshardingTest.withReshardingInBackground( //
+ {
+ // If a donor is also a recipient, the donor state machine will run renameCollection with
+ // {dropTarget : true} rather than running drop and letting the recipient state machine run
+ // rename at the end of the resharding operation. So, we ensure that only one of the donor
+ // shards will also be a recipient shard order to verify that neither the rename with
+ // {dropTarget : true} nor the drop command are picked up by the change streams cursor.
+ newShardKeyPattern: {newKey: 1},
+ newChunks: [
+ {min: {newKey: MinKey}, max: {newKey: MaxKey}, shard: recipientShardNames[0]},
+ ],
+ },
+ () => {
+ // Wait until participants are aware of the resharding operation.
+ reshardingTest.awaitCloneTimestampChosen();
+
+ // Open another change streams cursor while the collection is being resharded.
+ changeStreamsCursor2 =
+ cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: collName});
+
+ assert.commandWorked(sourceCollection.insert({_id: 1, oldKey: 1}));
+ assert.commandWorked(sourceCollection.insert({_id: 2, oldKey: 2}));
+
+ // Assert that both the cursors see the two new inserts.
+ cst.assertNextChangesEqual(
+ {cursor: changeStreamsCursor, expectedChanges: midReshardCollectionChanges});
+ cst.assertNextChangesEqual(
+ {cursor: changeStreamsCursor2, expectedChanges: midReshardCollectionChanges});
+
+ // Check that we can resume from the token returned before resharding began.
+ let resumedCursor = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: preReshardCollectionResumeToken}}],
+ collection: collName
+ });
+ midReshardCollectionResumeToken =
+ cst.assertNextChangesEqual(
+ {cursor: resumedCursor, expectedChanges: midReshardCollectionChanges})[1]
+ ._id;
+ });
+
+assert.commandWorked(sourceCollection.insert({_id: 3, newKey: 3, oldKey: 3}));
+
+// Assert that both the cursor opened before resharding started and the one opened during
+// resharding see the insert after resharding has finished.
+cst.assertNextChangesEqual(
+ {cursor: changeStreamsCursor, expectedChanges: [postReshardCollectionChanges[0]]});
+cst.assertNextChangesEqual(
+ {cursor: changeStreamsCursor2, expectedChanges: [postReshardCollectionChanges[0]]});
+
+// Check that we can resume from both the token returned before resharding began and the token
+// returned during resharding.
+let resumedCursorFromPreOperation = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: preReshardCollectionResumeToken}}],
+ collection: collName
+});
+let midAndPostReshardCollectionChanges =
+ midReshardCollectionChanges.concat(postReshardCollectionChanges);
+
+let resumedCursorFromMidOperation = cst.startWatchingChanges({
+ pipeline: [{$changeStream: {resumeAfter: midReshardCollectionResumeToken}}],
+ collection: collName
+});
+
+assert.commandWorked(sourceCollection.insert({_id: 4, newKey: 4, oldKey: 4}));
+
+cst.assertNextChangesEqual(
+ {cursor: resumedCursorFromPreOperation, expectedChanges: midAndPostReshardCollectionChanges});
+cst.assertNextChangesEqual(
+ {cursor: resumedCursorFromMidOperation, expectedChanges: postReshardCollectionChanges});
+
+reshardingTest.teardown();
+})();