diff options
author | Drew Paroski <drew.paroski@mongodb.com> | 2021-06-25 19:06:33 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-07-08 05:53:42 +0000 |
commit | 50315039df4c36d9cde809eaaf56c439ee767340 (patch) | |
tree | 5d0b6dec8597ad9d841ce3d46ca0289e6f2edf26 /jstests/sharding | |
parent | fd4bca02d3ec8ceb03e8ff62d22469e14b093889 (diff) | |
download | mongo-50315039df4c36d9cde809eaaf56c439ee767340.tar.gz |
SERVER-55309 Re-order post-image lookup stages when expanding $changeStream
Diffstat (limited to 'jstests/sharding')
3 files changed, 223 insertions, 48 deletions
diff --git a/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js b/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js new file mode 100644 index 00000000000..9e20d81c806 --- /dev/null +++ b/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js @@ -0,0 +1,164 @@ +// Tests that change stream with 'fullDocument: updateLookup' works correctly when chunks are +// migrated after updates are performed. +// @tags: [ +// requires_majority_read_concern, +// uses_change_streams, +// ] +(function() { +'use strict'; + +load('jstests/libs/change_stream_util.js'); // For isChangeStreamOptimizationEnabled(). +load('jstests/libs/profiler.js'); // For various profiler helpers. + +const st = new ShardingTest({ + shards: 3, + rs: {nodes: 1, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}} +}); + +const dbName = jsTestName(); +const collName = jsTestName(); +const ns = dbName + "." + collName; + +const mongos = st.s0; +const mongosColl = mongos.getDB(dbName).getCollection(collName); + +const shard0 = st.shard0; +const shard1 = st.shard1; +const shard2 = st.shard2; + +const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongos.getDB(dbName)); + +// Enable sharding to inform mongos of the database, and make sure all chunks start on shard 0. +assert.commandWorked(mongos.adminCommand({enableSharding: dbName})); +st.ensurePrimaryShard(dbName, shard0.shardName); + +// Create a change stream before any insert or update operations are performed. +const csBeforeSharding = mongosColl.watch([], {fullDocument: "updateLookup"}); + +// Insert four documents, and then read these 'insert' events from the change stream. +let next; +for (const id of [0, 1, 2, 3]) { + assert.commandWorked(mongosColl.insert({_id: id})); + assert.soon(() => csBeforeSharding.hasNext()); + next = csBeforeSharding.next(); + assert.eq(next.operationType, "insert"); + assert.eq(next.fullDocument, {_id: id}); +} + +// Save a resume token after the 'insert' operations and then close the change stream. +const resumeTokenBeforeUpdates = next._id; +csBeforeSharding.close(); + +function checkUpdateEvents(changeStream, csComment, idShardPairs) { + for (const [id, shard] of idShardPairs) { + assert.soon(() => changeStream.hasNext()); + const next = changeStream.next(); + assert.eq(next.operationType, "update"); + assert.eq(next.fullDocument, {_id: id, updated: true}); + + // Test that each update lookup goes to the appropriate shard. + const filter = { + op: isChangeStreamOptimized ? "command" : "query", + ns: ns, + "command.comment": csComment, + errCode: {$ne: ErrorCodes.StaleConfig} + }; + filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = collName; + filter[isChangeStreamOptimized ? "command.pipeline.0.$match._id" : "command.filter._id"] = + id; + + profilerHasSingleMatchingEntryOrThrow({profileDB: shard.getDB(dbName), filter: filter}); + } +} + +// Update two of the documents. +assert.commandWorked(mongosColl.update({_id: 0}, {$set: {updated: true}})); +assert.commandWorked(mongosColl.update({_id: 2}, {$set: {updated: true}})); + +// Drop the 'profile' tables and then enable profiling on all shards. +for (const shard of [shard0, shard1, shard2]) { + const db = shard.getDB(dbName); + assert.commandWorked(db.setProfilingLevel(0)); + db.system.profile.drop(); + assert.commandWorked(db.setProfilingLevel(2)); +} + +// Now, actually shard the collection. +jsTestLog("Sharding collection"); +assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}})); + +// Split the collection into two chunks: [MinKey, 2) and [2, MaxKey]. +assert.commandWorked(mongos.adminCommand({split: ns, middle: {_id: 2}})); + +jsTestLog("Migrating [MinKey, 2) to shard1 and [2, MaxKey] to shard2."); + +for (const [id, shard] of [[0, shard1], [2, shard2]]) { + const dest = shard.shardName; + assert.commandWorked( + mongos.adminCommand({moveChunk: ns, find: {_id: id}, to: dest, _waitForDelete: true})); +} + +// After sharding the collection and moving the documents to different shards, create a change +// stream with a resume token from before the collection was sharded. +const commentAfterSharding = "change stream after sharding"; +const csAfterSharding = mongosColl.watch([], { + resumeAfter: resumeTokenBeforeUpdates, + fullDocument: "updateLookup", + comment: commentAfterSharding +}); + +// Next two events in csAfterSharding should be the two 'update' operations. +checkUpdateEvents(csAfterSharding, commentAfterSharding, [[0, shard1], [2, shard2]]); + +csAfterSharding.close(); + +// Update the other two documents. +assert.commandWorked(mongosColl.update({_id: 1}, {$set: {updated: true}})); +assert.commandWorked(mongosColl.update({_id: 3}, {$set: {updated: true}})); + +// Add a new shard to the cluster +const rs3 = new ReplSetTest({ + name: "shard3", + nodes: 1, + setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true} +}); +rs3.startSet({shardsvr: ''}); +rs3.initiate(); +const shard3 = rs3.getPrimary(); +shard3.shardName = "shard3"; +assert.commandWorked(mongos.adminCommand({addShard: rs3.getURL(), name: shard3.shardName})); + +// Drop the 'profile' tables and then enable profiling on all shards. +for (const shard of [shard0, shard1, shard2, shard3]) { + const db = shard.getDB(dbName); + assert.commandWorked(db.setProfilingLevel(0)); + db.system.profile.drop(); + assert.commandWorked(db.setProfilingLevel(2)); +} + +jsTestLog("Migrating [MinKey, 2) to shard2 and [2, MaxKey] to shard3."); + +for (const [id, shard] of [[0, shard2], [2, shard3]]) { + const dest = shard.shardName; + assert.commandWorked( + mongos.adminCommand({moveChunk: ns, find: {_id: id}, to: dest, _waitForDelete: true})); +} + +// After adding the new shard and migrating the documents to the new shard, create a change stream +// with a resume token from before the collection was sharded. +const commentAfterAddShard = "change stream after addShard"; +const csAfterAddShard = mongosColl.watch([], { + resumeAfter: resumeTokenBeforeUpdates, + fullDocument: "updateLookup", + comment: commentAfterAddShard +}); + +// Next four events in csAfterAddShard should be the four 'update' operations. +checkUpdateEvents( + csAfterAddShard, commentAfterAddShard, [[0, shard2], [2, shard3], [1, shard2], [3, shard3]]); + +csAfterAddShard.close(); + +st.stop(); +rs3.stopSet(); +})(); diff --git a/jstests/sharding/change_stream_read_preference.js b/jstests/sharding/change_stream_read_preference.js index 752f1726c2e..2db8fb8323b 100644 --- a/jstests/sharding/change_stream_read_preference.js +++ b/jstests/sharding/change_stream_read_preference.js @@ -7,7 +7,8 @@ (function() { "use strict"; -load('jstests/libs/profiler.js'); // For various profiler helpers. +load('jstests/libs/change_stream_util.js'); // For isChangeStreamOptimizationEnabled(). +load('jstests/libs/profiler.js'); // For various profiler helpers. const st = new ShardingTest({ name: "change_stream_read_pref", @@ -61,6 +62,8 @@ assert.eq(primaryStream.next().fullDocument, {_id: -1, updated: true}); assert.soon(() => primaryStream.hasNext()); assert.eq(primaryStream.next().fullDocument, {_id: 1, updated: true}); +const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongosDB); + for (let rs of [st.rs0, st.rs1]) { const primaryDB = rs.getPrimary().getDB(dbName); // Test that the change stream itself goes to the primary. There might be more than one if @@ -71,10 +74,14 @@ for (let rs of [st.rs0, st.rs1]) { {profileDB: primaryDB, filter: {'originatingCommand.comment': changeStreamComment}}); // Test that the update lookup goes to the primary as well. - profilerHasSingleMatchingEntryOrThrow({ - profileDB: primaryDB, - filter: {op: "query", ns: mongosColl.getFullName(), "command.comment": changeStreamComment} - }); + let filter = { + op: isChangeStreamOptimized ? "command" : "query", + ns: mongosColl.getFullName(), + "command.comment": changeStreamComment + }; + filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName(); + + profilerHasSingleMatchingEntryOrThrow({profileDB: primaryDB, filter: filter}); } primaryStream.close(); @@ -103,18 +110,18 @@ for (let rs of [st.rs0, st.rs1]) { {profileDB: secondaryDB, filter: {'originatingCommand.comment': changeStreamComment}}); // Test that the update lookup goes to the secondary as well. - profilerHasSingleMatchingEntryOrThrow({ - profileDB: secondaryDB, - filter: { - op: "query", - ns: mongosColl.getFullName(), - "command.comment": changeStreamComment, - // We need to filter out any profiler entries with a stale config - this is the - // first read on this secondary with a readConcern specified, so it is the first - // read on this secondary that will enforce shard version. - errCode: {$ne: ErrorCodes.StaleConfig} - } - }); + let filter = { + op: isChangeStreamOptimized ? "command" : "query", + ns: mongosColl.getFullName(), + "command.comment": changeStreamComment, + // We need to filter out any profiler entries with a stale config - this is the + // first read on this secondary with a readConcern specified, so it is the first + // read on this secondary that will enforce shard version. + errCode: {$ne: ErrorCodes.StaleConfig} + }; + filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName(); + + profilerHasSingleMatchingEntryOrThrow({profileDB: secondaryDB, filter: filter}); } secondaryStream.close(); diff --git a/jstests/sharding/change_stream_update_lookup_read_concern.js b/jstests/sharding/change_stream_update_lookup_read_concern.js index 2a588f5b1d7..c4442510979 100644 --- a/jstests/sharding/change_stream_update_lookup_read_concern.js +++ b/jstests/sharding/change_stream_update_lookup_read_concern.js @@ -8,8 +8,9 @@ (function() { "use strict"; -load("jstests/libs/profiler.js"); // For profilerHas*OrThrow() helpers. -load("jstests/replsets/rslib.js"); // For reconfig(). +load('jstests/libs/change_stream_util.js'); // For isChangeStreamOptimizationEnabled(). +load("jstests/libs/profiler.js"); // For profilerHas*OrThrow() helpers. +load("jstests/replsets/rslib.js"); // For reconfig(). // For stopServerReplication() and restartServerReplication(). load("jstests/libs/write_concern_util.js"); @@ -60,7 +61,8 @@ assert.commandWorked(st.s.adminCommand( const mongosDB = st.s0.getDB(jsTestName()); const mongosColl = mongosDB[jsTestName()]; -// Shard the collection to ensure the change stream will perform update lookup from mongos. +const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongosDB); + assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()})); assert.commandWorked( mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}})); @@ -107,22 +109,24 @@ profilerHasAtLeastOneMatchingEntryOrThrow( {profileDB: closestSecondaryDB, filter: {"originatingCommand.comment": changeStreamComment}}); // Test that the update lookup goes to the secondary as well. +let filter = { + op: isChangeStreamOptimized ? "command" : "query", + ns: mongosColl.getFullName(), + "command.comment": changeStreamComment, + // We need to filter out any profiler entries with a stale config - this is the first read on + // this secondary with a readConcern specified, so it is the first read on this secondary that + // will enforce shard version. + errCode: {$ne: ErrorCodes.StaleConfig} +}; +filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName(); +filter[isChangeStreamOptimized ? "command.pipeline.0.$match._id" : "command.filter._id"] = 1; + profilerHasSingleMatchingEntryOrThrow({ profileDB: closestSecondaryDB, - filter: { - op: "query", - ns: mongosColl.getFullName(), - "command.filter._id": 1, - "command.comment": changeStreamComment, - // We need to filter out any profiler entries with a stale config - this is the first - // read on this secondary with a readConcern specified, so it is the first read on this - // secondary that will enforce shard version. - errCode: {$ne: ErrorCodes.StaleConfig} - }, + filter: filter, errorMsgFilter: {ns: mongosColl.getFullName()}, errorMsgProj: {ns: 1, op: 1, command: 1}, }); - // Now add a new secondary which is "closer" (add the "closestSecondary" tag to that secondary, // and remove it from the old node with that tag) to force update lookups target a different // node than the change stream itself. @@ -182,11 +186,11 @@ const joinResumeReplicationShell = function() { return changeStreamDB .currentOp({ - op: "query", - // Note the namespace here happens to be the database.$cmd, - // because we're blocked waiting for the read concern, which - // happens before we get to the command processing level and - // adjust the currentOp namespace to include the collection name. + op: ${isChangeStreamOptimized} ? "command" : "query", + // Note the namespace here happens to be database.$cmd, because + // we're blocked waiting for the read concern, which happens + // before we get to the command processing level and adjust the + // currentOp namespace to include the collection name. ns: "${mongosDB.getName()}.$cmd", "command.comment": "${changeStreamComment}", }) @@ -206,18 +210,18 @@ assert.docEq(latestChange.fullDocument, {_id: 1, updatedCount: 2}); joinResumeReplicationShell(); // Test that the update lookup goes to the new closest secondary. -profilerHasSingleMatchingEntryOrThrow({ - profileDB: newClosestSecondaryDB, - filter: { - op: "query", - ns: mongosColl.getFullName(), - "command.comment": changeStreamComment, - // We need to filter out any profiler entries with a stale config - this is the first - // read on this secondary with a readConcern specified, so it is the first read on this - // secondary that will enforce shard version. - errCode: {$ne: ErrorCodes.StaleConfig} - } -}); +filter = { + op: isChangeStreamOptimized ? "command" : "query", + ns: mongosColl.getFullName(), + "command.comment": changeStreamComment, + // We need to filter out any profiler entries with a stale config - this is the first read on + // this secondary with a readConcern specified, so it is the first read on this secondary that + // will enforce shard version. + errCode: {$ne: ErrorCodes.StaleConfig} +}; +filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName(); + +profilerHasSingleMatchingEntryOrThrow({profileDB: newClosestSecondaryDB, filter: filter}); changeStream.close(); st.stop(); |