summaryrefslogtreecommitdiff
path: root/jstests/sharding
diff options
context:
space:
mode:
authorDrew Paroski <drew.paroski@mongodb.com>2021-06-25 19:06:33 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-07-08 05:53:42 +0000
commit50315039df4c36d9cde809eaaf56c439ee767340 (patch)
tree5d0b6dec8597ad9d841ce3d46ca0289e6f2edf26 /jstests/sharding
parentfd4bca02d3ec8ceb03e8ff62d22469e14b093889 (diff)
downloadmongo-50315039df4c36d9cde809eaaf56c439ee767340.tar.gz
SERVER-55309 Re-order post-image lookup stages when expanding $changeStream
Diffstat (limited to 'jstests/sharding')
-rw-r--r--jstests/sharding/change_stream_lookup_post_image_chunk_migration.js164
-rw-r--r--jstests/sharding/change_stream_read_preference.js41
-rw-r--r--jstests/sharding/change_stream_update_lookup_read_concern.js66
3 files changed, 223 insertions, 48 deletions
diff --git a/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js b/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js
new file mode 100644
index 00000000000..9e20d81c806
--- /dev/null
+++ b/jstests/sharding/change_stream_lookup_post_image_chunk_migration.js
@@ -0,0 +1,164 @@
+// Tests that change stream with 'fullDocument: updateLookup' works correctly when chunks are
+// migrated after updates are performed.
+// @tags: [
+// requires_majority_read_concern,
+// uses_change_streams,
+// ]
+(function() {
+'use strict';
+
+load('jstests/libs/change_stream_util.js'); // For isChangeStreamOptimizationEnabled().
+load('jstests/libs/profiler.js'); // For various profiler helpers.
+
+const st = new ShardingTest({
+ shards: 3,
+ rs: {nodes: 1, setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}}
+});
+
+const dbName = jsTestName();
+const collName = jsTestName();
+const ns = dbName + "." + collName;
+
+const mongos = st.s0;
+const mongosColl = mongos.getDB(dbName).getCollection(collName);
+
+const shard0 = st.shard0;
+const shard1 = st.shard1;
+const shard2 = st.shard2;
+
+const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongos.getDB(dbName));
+
+// Enable sharding to inform mongos of the database, and make sure all chunks start on shard 0.
+assert.commandWorked(mongos.adminCommand({enableSharding: dbName}));
+st.ensurePrimaryShard(dbName, shard0.shardName);
+
+// Create a change stream before any insert or update operations are performed.
+const csBeforeSharding = mongosColl.watch([], {fullDocument: "updateLookup"});
+
+// Insert four documents, and then read these 'insert' events from the change stream.
+let next;
+for (const id of [0, 1, 2, 3]) {
+ assert.commandWorked(mongosColl.insert({_id: id}));
+ assert.soon(() => csBeforeSharding.hasNext());
+ next = csBeforeSharding.next();
+ assert.eq(next.operationType, "insert");
+ assert.eq(next.fullDocument, {_id: id});
+}
+
+// Save a resume token after the 'insert' operations and then close the change stream.
+const resumeTokenBeforeUpdates = next._id;
+csBeforeSharding.close();
+
+function checkUpdateEvents(changeStream, csComment, idShardPairs) {
+ for (const [id, shard] of idShardPairs) {
+ assert.soon(() => changeStream.hasNext());
+ const next = changeStream.next();
+ assert.eq(next.operationType, "update");
+ assert.eq(next.fullDocument, {_id: id, updated: true});
+
+ // Test that each update lookup goes to the appropriate shard.
+ const filter = {
+ op: isChangeStreamOptimized ? "command" : "query",
+ ns: ns,
+ "command.comment": csComment,
+ errCode: {$ne: ErrorCodes.StaleConfig}
+ };
+ filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = collName;
+ filter[isChangeStreamOptimized ? "command.pipeline.0.$match._id" : "command.filter._id"] =
+ id;
+
+ profilerHasSingleMatchingEntryOrThrow({profileDB: shard.getDB(dbName), filter: filter});
+ }
+}
+
+// Update two of the documents.
+assert.commandWorked(mongosColl.update({_id: 0}, {$set: {updated: true}}));
+assert.commandWorked(mongosColl.update({_id: 2}, {$set: {updated: true}}));
+
+// Drop the 'profile' tables and then enable profiling on all shards.
+for (const shard of [shard0, shard1, shard2]) {
+ const db = shard.getDB(dbName);
+ assert.commandWorked(db.setProfilingLevel(0));
+ db.system.profile.drop();
+ assert.commandWorked(db.setProfilingLevel(2));
+}
+
+// Now, actually shard the collection.
+jsTestLog("Sharding collection");
+assert.commandWorked(mongos.adminCommand({shardCollection: ns, key: {_id: 1}}));
+
+// Split the collection into two chunks: [MinKey, 2) and [2, MaxKey].
+assert.commandWorked(mongos.adminCommand({split: ns, middle: {_id: 2}}));
+
+jsTestLog("Migrating [MinKey, 2) to shard1 and [2, MaxKey] to shard2.");
+
+for (const [id, shard] of [[0, shard1], [2, shard2]]) {
+ const dest = shard.shardName;
+ assert.commandWorked(
+ mongos.adminCommand({moveChunk: ns, find: {_id: id}, to: dest, _waitForDelete: true}));
+}
+
+// After sharding the collection and moving the documents to different shards, create a change
+// stream with a resume token from before the collection was sharded.
+const commentAfterSharding = "change stream after sharding";
+const csAfterSharding = mongosColl.watch([], {
+ resumeAfter: resumeTokenBeforeUpdates,
+ fullDocument: "updateLookup",
+ comment: commentAfterSharding
+});
+
+// Next two events in csAfterSharding should be the two 'update' operations.
+checkUpdateEvents(csAfterSharding, commentAfterSharding, [[0, shard1], [2, shard2]]);
+
+csAfterSharding.close();
+
+// Update the other two documents.
+assert.commandWorked(mongosColl.update({_id: 1}, {$set: {updated: true}}));
+assert.commandWorked(mongosColl.update({_id: 3}, {$set: {updated: true}}));
+
+// Add a new shard to the cluster
+const rs3 = new ReplSetTest({
+ name: "shard3",
+ nodes: 1,
+ setParameter: {periodicNoopIntervalSecs: 1, writePeriodicNoops: true}
+});
+rs3.startSet({shardsvr: ''});
+rs3.initiate();
+const shard3 = rs3.getPrimary();
+shard3.shardName = "shard3";
+assert.commandWorked(mongos.adminCommand({addShard: rs3.getURL(), name: shard3.shardName}));
+
+// Drop the 'profile' tables and then enable profiling on all shards.
+for (const shard of [shard0, shard1, shard2, shard3]) {
+ const db = shard.getDB(dbName);
+ assert.commandWorked(db.setProfilingLevel(0));
+ db.system.profile.drop();
+ assert.commandWorked(db.setProfilingLevel(2));
+}
+
+jsTestLog("Migrating [MinKey, 2) to shard2 and [2, MaxKey] to shard3.");
+
+for (const [id, shard] of [[0, shard2], [2, shard3]]) {
+ const dest = shard.shardName;
+ assert.commandWorked(
+ mongos.adminCommand({moveChunk: ns, find: {_id: id}, to: dest, _waitForDelete: true}));
+}
+
+// After adding the new shard and migrating the documents to the new shard, create a change stream
+// with a resume token from before the collection was sharded.
+const commentAfterAddShard = "change stream after addShard";
+const csAfterAddShard = mongosColl.watch([], {
+ resumeAfter: resumeTokenBeforeUpdates,
+ fullDocument: "updateLookup",
+ comment: commentAfterAddShard
+});
+
+// Next four events in csAfterAddShard should be the four 'update' operations.
+checkUpdateEvents(
+ csAfterAddShard, commentAfterAddShard, [[0, shard2], [2, shard3], [1, shard2], [3, shard3]]);
+
+csAfterAddShard.close();
+
+st.stop();
+rs3.stopSet();
+})();
diff --git a/jstests/sharding/change_stream_read_preference.js b/jstests/sharding/change_stream_read_preference.js
index 752f1726c2e..2db8fb8323b 100644
--- a/jstests/sharding/change_stream_read_preference.js
+++ b/jstests/sharding/change_stream_read_preference.js
@@ -7,7 +7,8 @@
(function() {
"use strict";
-load('jstests/libs/profiler.js'); // For various profiler helpers.
+load('jstests/libs/change_stream_util.js'); // For isChangeStreamOptimizationEnabled().
+load('jstests/libs/profiler.js'); // For various profiler helpers.
const st = new ShardingTest({
name: "change_stream_read_pref",
@@ -61,6 +62,8 @@ assert.eq(primaryStream.next().fullDocument, {_id: -1, updated: true});
assert.soon(() => primaryStream.hasNext());
assert.eq(primaryStream.next().fullDocument, {_id: 1, updated: true});
+const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongosDB);
+
for (let rs of [st.rs0, st.rs1]) {
const primaryDB = rs.getPrimary().getDB(dbName);
// Test that the change stream itself goes to the primary. There might be more than one if
@@ -71,10 +74,14 @@ for (let rs of [st.rs0, st.rs1]) {
{profileDB: primaryDB, filter: {'originatingCommand.comment': changeStreamComment}});
// Test that the update lookup goes to the primary as well.
- profilerHasSingleMatchingEntryOrThrow({
- profileDB: primaryDB,
- filter: {op: "query", ns: mongosColl.getFullName(), "command.comment": changeStreamComment}
- });
+ let filter = {
+ op: isChangeStreamOptimized ? "command" : "query",
+ ns: mongosColl.getFullName(),
+ "command.comment": changeStreamComment
+ };
+ filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName();
+
+ profilerHasSingleMatchingEntryOrThrow({profileDB: primaryDB, filter: filter});
}
primaryStream.close();
@@ -103,18 +110,18 @@ for (let rs of [st.rs0, st.rs1]) {
{profileDB: secondaryDB, filter: {'originatingCommand.comment': changeStreamComment}});
// Test that the update lookup goes to the secondary as well.
- profilerHasSingleMatchingEntryOrThrow({
- profileDB: secondaryDB,
- filter: {
- op: "query",
- ns: mongosColl.getFullName(),
- "command.comment": changeStreamComment,
- // We need to filter out any profiler entries with a stale config - this is the
- // first read on this secondary with a readConcern specified, so it is the first
- // read on this secondary that will enforce shard version.
- errCode: {$ne: ErrorCodes.StaleConfig}
- }
- });
+ let filter = {
+ op: isChangeStreamOptimized ? "command" : "query",
+ ns: mongosColl.getFullName(),
+ "command.comment": changeStreamComment,
+ // We need to filter out any profiler entries with a stale config - this is the
+ // first read on this secondary with a readConcern specified, so it is the first
+ // read on this secondary that will enforce shard version.
+ errCode: {$ne: ErrorCodes.StaleConfig}
+ };
+ filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName();
+
+ profilerHasSingleMatchingEntryOrThrow({profileDB: secondaryDB, filter: filter});
}
secondaryStream.close();
diff --git a/jstests/sharding/change_stream_update_lookup_read_concern.js b/jstests/sharding/change_stream_update_lookup_read_concern.js
index 2a588f5b1d7..c4442510979 100644
--- a/jstests/sharding/change_stream_update_lookup_read_concern.js
+++ b/jstests/sharding/change_stream_update_lookup_read_concern.js
@@ -8,8 +8,9 @@
(function() {
"use strict";
-load("jstests/libs/profiler.js"); // For profilerHas*OrThrow() helpers.
-load("jstests/replsets/rslib.js"); // For reconfig().
+load('jstests/libs/change_stream_util.js'); // For isChangeStreamOptimizationEnabled().
+load("jstests/libs/profiler.js"); // For profilerHas*OrThrow() helpers.
+load("jstests/replsets/rslib.js"); // For reconfig().
// For stopServerReplication() and restartServerReplication().
load("jstests/libs/write_concern_util.js");
@@ -60,7 +61,8 @@ assert.commandWorked(st.s.adminCommand(
const mongosDB = st.s0.getDB(jsTestName());
const mongosColl = mongosDB[jsTestName()];
-// Shard the collection to ensure the change stream will perform update lookup from mongos.
+const isChangeStreamOptimized = isChangeStreamOptimizationEnabled(mongosDB);
+
assert.commandWorked(mongosDB.adminCommand({enableSharding: mongosDB.getName()}));
assert.commandWorked(
mongosDB.adminCommand({shardCollection: mongosColl.getFullName(), key: {_id: 1}}));
@@ -107,22 +109,24 @@ profilerHasAtLeastOneMatchingEntryOrThrow(
{profileDB: closestSecondaryDB, filter: {"originatingCommand.comment": changeStreamComment}});
// Test that the update lookup goes to the secondary as well.
+let filter = {
+ op: isChangeStreamOptimized ? "command" : "query",
+ ns: mongosColl.getFullName(),
+ "command.comment": changeStreamComment,
+ // We need to filter out any profiler entries with a stale config - this is the first read on
+ // this secondary with a readConcern specified, so it is the first read on this secondary that
+ // will enforce shard version.
+ errCode: {$ne: ErrorCodes.StaleConfig}
+};
+filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName();
+filter[isChangeStreamOptimized ? "command.pipeline.0.$match._id" : "command.filter._id"] = 1;
+
profilerHasSingleMatchingEntryOrThrow({
profileDB: closestSecondaryDB,
- filter: {
- op: "query",
- ns: mongosColl.getFullName(),
- "command.filter._id": 1,
- "command.comment": changeStreamComment,
- // We need to filter out any profiler entries with a stale config - this is the first
- // read on this secondary with a readConcern specified, so it is the first read on this
- // secondary that will enforce shard version.
- errCode: {$ne: ErrorCodes.StaleConfig}
- },
+ filter: filter,
errorMsgFilter: {ns: mongosColl.getFullName()},
errorMsgProj: {ns: 1, op: 1, command: 1},
});
-
// Now add a new secondary which is "closer" (add the "closestSecondary" tag to that secondary,
// and remove it from the old node with that tag) to force update lookups target a different
// node than the change stream itself.
@@ -182,11 +186,11 @@ const joinResumeReplicationShell =
function() {
return changeStreamDB
.currentOp({
- op: "query",
- // Note the namespace here happens to be the database.$cmd,
- // because we're blocked waiting for the read concern, which
- // happens before we get to the command processing level and
- // adjust the currentOp namespace to include the collection name.
+ op: ${isChangeStreamOptimized} ? "command" : "query",
+ // Note the namespace here happens to be database.$cmd, because
+ // we're blocked waiting for the read concern, which happens
+ // before we get to the command processing level and adjust the
+ // currentOp namespace to include the collection name.
ns: "${mongosDB.getName()}.$cmd",
"command.comment": "${changeStreamComment}",
})
@@ -206,18 +210,18 @@ assert.docEq(latestChange.fullDocument, {_id: 1, updatedCount: 2});
joinResumeReplicationShell();
// Test that the update lookup goes to the new closest secondary.
-profilerHasSingleMatchingEntryOrThrow({
- profileDB: newClosestSecondaryDB,
- filter: {
- op: "query",
- ns: mongosColl.getFullName(),
- "command.comment": changeStreamComment,
- // We need to filter out any profiler entries with a stale config - this is the first
- // read on this secondary with a readConcern specified, so it is the first read on this
- // secondary that will enforce shard version.
- errCode: {$ne: ErrorCodes.StaleConfig}
- }
-});
+filter = {
+ op: isChangeStreamOptimized ? "command" : "query",
+ ns: mongosColl.getFullName(),
+ "command.comment": changeStreamComment,
+ // We need to filter out any profiler entries with a stale config - this is the first read on
+ // this secondary with a readConcern specified, so it is the first read on this secondary that
+ // will enforce shard version.
+ errCode: {$ne: ErrorCodes.StaleConfig}
+};
+filter[isChangeStreamOptimized ? "command.aggregate" : "command.find"] = mongosColl.getName();
+
+profilerHasSingleMatchingEntryOrThrow({profileDB: newClosestSecondaryDB, filter: filter});
changeStream.close();
st.stop();