summaryrefslogtreecommitdiff
path: root/jstests/sharding/merge_with_chunk_migrations.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/sharding/merge_with_chunk_migrations.js')
-rw-r--r--jstests/sharding/merge_with_chunk_migrations.js222
1 files changed, 110 insertions, 112 deletions
diff --git a/jstests/sharding/merge_with_chunk_migrations.js b/jstests/sharding/merge_with_chunk_migrations.js
index 2b9ba4256fa..461fe57cf8d 100644
--- a/jstests/sharding/merge_with_chunk_migrations.js
+++ b/jstests/sharding/merge_with_chunk_migrations.js
@@ -1,47 +1,47 @@
// Tests that the $merge aggregation stage is resilient to chunk migrations in both the source and
// output collection during execution.
(function() {
- 'use strict';
+'use strict';
- load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode.
+load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode.
- const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
+const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
- const mongosDB = st.s.getDB(jsTestName());
- const sourceColl = mongosDB["source"];
- const targetColl = mongosDB["target"];
+const mongosDB = st.s.getDB(jsTestName());
+const sourceColl = mongosDB["source"];
+const targetColl = mongosDB["target"];
- function setAggHang(mode) {
- assert.commandWorked(st.shard0.adminCommand(
- {configureFailPoint: "hangBeforeDocumentSourceCursorLoadBatch", mode: mode}));
- assert.commandWorked(st.shard1.adminCommand(
- {configureFailPoint: "hangBeforeDocumentSourceCursorLoadBatch", mode: mode}));
+function setAggHang(mode) {
+ assert.commandWorked(st.shard0.adminCommand(
+ {configureFailPoint: "hangBeforeDocumentSourceCursorLoadBatch", mode: mode}));
+ assert.commandWorked(st.shard1.adminCommand(
+ {configureFailPoint: "hangBeforeDocumentSourceCursorLoadBatch", mode: mode}));
+}
+
+function runMergeWithMode(whenMatchedMode, whenNotMatchedMode, shardedColl) {
+ assert.commandWorked(targetColl.remove({}));
+
+ // For modes 'whenNotMatchedMode:fail/discard', the $merge will not insert the expected
+ // documents, causing the assertion below to fail. To avoid that, we match the documents in
+ // target collection with the documents in source.
+ if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
+ assert.commandWorked(targetColl.insert({_id: 0, shardKey: -1}));
+ assert.commandWorked(targetColl.insert({_id: 1, shardKey: 1}));
}
- function runMergeWithMode(whenMatchedMode, whenNotMatchedMode, shardedColl) {
- assert.commandWorked(targetColl.remove({}));
-
- // For modes 'whenNotMatchedMode:fail/discard', the $merge will not insert the expected
- // documents, causing the assertion below to fail. To avoid that, we match the documents in
- // target collection with the documents in source.
- if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
- assert.commandWorked(targetColl.insert({_id: 0, shardKey: -1}));
- assert.commandWorked(targetColl.insert({_id: 1, shardKey: 1}));
- }
-
- // Set the failpoint to hang in the first call to DocumentSourceCursor's getNext().
- setAggHang("alwaysOn");
-
- let comment = whenMatchedMode + "_" + whenNotMatchedMode + "_" + shardedColl.getName();
-
- const mergeSpec = {
- into: targetColl.getName(),
- whenMatched: whenMatchedMode,
- whenNotMatched: whenNotMatchedMode
- };
- // The $_internalInhibitOptimization stage is added to the pipeline to prevent the pipeline
- // from being optimized away after it's been split. Otherwise, we won't hit the failpoint.
- let outFn = `
+ // Set the failpoint to hang in the first call to DocumentSourceCursor's getNext().
+ setAggHang("alwaysOn");
+
+ let comment = whenMatchedMode + "_" + whenNotMatchedMode + "_" + shardedColl.getName();
+
+ const mergeSpec = {
+ into: targetColl.getName(),
+ whenMatched: whenMatchedMode,
+ whenNotMatched: whenNotMatchedMode
+ };
+ // The $_internalInhibitOptimization stage is added to the pipeline to prevent the pipeline
+ // from being optimized away after it's been split. Otherwise, we won't hit the failpoint.
+ let outFn = `
const sourceDB = db.getSiblingDB(jsTestName());
const sourceColl = sourceDB["${sourceColl.getName()}"];
sourceColl.aggregate([
@@ -51,43 +51,42 @@
{comment: "${comment}"});
`;
- // Start the $merge aggregation in a parallel shell.
- let mergeShell = startParallelShell(outFn, st.s.port);
-
- // Wait for the parallel shell to hit the failpoint.
- assert.soon(
- () =>
- mongosDB.currentOp({op: "command", "command.comment": comment}).inprog.length == 1,
- () => tojson(mongosDB.currentOp().inprog));
-
- // Migrate the chunk on shard1 to shard0.
- assert.commandWorked(st.s.adminCommand(
- {moveChunk: shardedColl.getFullName(), find: {shardKey: 1}, to: st.shard0.shardName}));
-
- // Unset the failpoint to unblock the $merge and join with the parallel shell.
- setAggHang("off");
- mergeShell();
-
- // Verify that the $merge succeeded.
- assert.eq(2, targetColl.find().itcount());
-
- // Now both chunks are on shard0. Run a similar test except migrate the chunks back to
- // shard1 in the middle of execution.
- assert.commandWorked(targetColl.remove({}));
-
- // For modes 'whenNotMatchedMode:fail/discard', the $merge will not insert the expected
- // documents, causing the assertion below to fail. To avoid that, we match the documents in
- // target collection with the documents in source.
- if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
- assert.commandWorked(targetColl.insert({_id: 0, shardKey: -1}));
- assert.commandWorked(targetColl.insert({_id: 1, shardKey: 1}));
- }
-
- setAggHang("alwaysOn");
- comment = comment + "_2";
- // The $_internalInhibitOptimization stage is added to the pipeline to prevent the pipeline
- // from being optimized away after it's been split. Otherwise, we won't hit the failpoint.
- outFn = `
+ // Start the $merge aggregation in a parallel shell.
+ let mergeShell = startParallelShell(outFn, st.s.port);
+
+ // Wait for the parallel shell to hit the failpoint.
+ assert.soon(
+ () => mongosDB.currentOp({op: "command", "command.comment": comment}).inprog.length == 1,
+ () => tojson(mongosDB.currentOp().inprog));
+
+ // Migrate the chunk on shard1 to shard0.
+ assert.commandWorked(st.s.adminCommand(
+ {moveChunk: shardedColl.getFullName(), find: {shardKey: 1}, to: st.shard0.shardName}));
+
+ // Unset the failpoint to unblock the $merge and join with the parallel shell.
+ setAggHang("off");
+ mergeShell();
+
+ // Verify that the $merge succeeded.
+ assert.eq(2, targetColl.find().itcount());
+
+ // Now both chunks are on shard0. Run a similar test except migrate the chunks back to
+ // shard1 in the middle of execution.
+ assert.commandWorked(targetColl.remove({}));
+
+ // For modes 'whenNotMatchedMode:fail/discard', the $merge will not insert the expected
+ // documents, causing the assertion below to fail. To avoid that, we match the documents in
+ // target collection with the documents in source.
+ if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
+ assert.commandWorked(targetColl.insert({_id: 0, shardKey: -1}));
+ assert.commandWorked(targetColl.insert({_id: 1, shardKey: 1}));
+ }
+
+ setAggHang("alwaysOn");
+ comment = comment + "_2";
+ // The $_internalInhibitOptimization stage is added to the pipeline to prevent the pipeline
+ // from being optimized away after it's been split. Otherwise, we won't hit the failpoint.
+ outFn = `
const sourceDB = db.getSiblingDB(jsTestName());
const sourceColl = sourceDB["${sourceColl.getName()}"];
sourceColl.aggregate([
@@ -96,56 +95,55 @@
],
{comment: "${comment}"});
`;
- mergeShell = startParallelShell(outFn, st.s.port);
+ mergeShell = startParallelShell(outFn, st.s.port);
- // Wait for the parallel shell to hit the failpoint.
- assert.soon(
- () =>
- mongosDB.currentOp({op: "command", "command.comment": comment}).inprog.length == 1,
- () => tojson(mongosDB.currentOp().inprog));
+ // Wait for the parallel shell to hit the failpoint.
+ assert.soon(
+ () => mongosDB.currentOp({op: "command", "command.comment": comment}).inprog.length == 1,
+ () => tojson(mongosDB.currentOp().inprog));
- assert.commandWorked(st.s.adminCommand(
- {moveChunk: shardedColl.getFullName(), find: {shardKey: -1}, to: st.shard1.shardName}));
- assert.commandWorked(st.s.adminCommand(
- {moveChunk: shardedColl.getFullName(), find: {shardKey: 1}, to: st.shard1.shardName}));
+ assert.commandWorked(st.s.adminCommand(
+ {moveChunk: shardedColl.getFullName(), find: {shardKey: -1}, to: st.shard1.shardName}));
+ assert.commandWorked(st.s.adminCommand(
+ {moveChunk: shardedColl.getFullName(), find: {shardKey: 1}, to: st.shard1.shardName}));
- // Unset the failpoint to unblock the $merge and join with the parallel shell.
- setAggHang("off");
- mergeShell();
+ // Unset the failpoint to unblock the $merge and join with the parallel shell.
+ setAggHang("off");
+ mergeShell();
- // Verify that the $merge succeeded.
- assert.eq(2, targetColl.find().itcount());
+ // Verify that the $merge succeeded.
+ assert.eq(2, targetColl.find().itcount());
- // Reset the chunk distribution.
- assert.commandWorked(st.s.adminCommand(
- {moveChunk: shardedColl.getFullName(), find: {shardKey: -1}, to: st.shard0.shardName}));
- }
+ // Reset the chunk distribution.
+ assert.commandWorked(st.s.adminCommand(
+ {moveChunk: shardedColl.getFullName(), find: {shardKey: -1}, to: st.shard0.shardName}));
+}
- // Shard the source collection with shard key {shardKey: 1} and split into 2 chunks.
- st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
+// Shard the source collection with shard key {shardKey: 1} and split into 2 chunks.
+st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
- // Write a document to each chunk of the source collection.
- assert.commandWorked(sourceColl.insert({_id: 0, shardKey: -1}));
- assert.commandWorked(sourceColl.insert({_id: 1, shardKey: 1}));
+// Write a document to each chunk of the source collection.
+assert.commandWorked(sourceColl.insert({_id: 0, shardKey: -1}));
+assert.commandWorked(sourceColl.insert({_id: 1, shardKey: 1}));
- withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
- runMergeWithMode(whenMatchedMode, whenNotMatchedMode, sourceColl);
- });
+withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
+ runMergeWithMode(whenMatchedMode, whenNotMatchedMode, sourceColl);
+});
- // Run a similar test with chunk migrations on the output collection instead.
- sourceColl.drop();
- assert.commandWorked(targetColl.remove({}));
- // Shard the output collection with shard key {shardKey: 1} and split into 2 chunks.
- st.shardColl(targetColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
+// Run a similar test with chunk migrations on the output collection instead.
+sourceColl.drop();
+assert.commandWorked(targetColl.remove({}));
+// Shard the output collection with shard key {shardKey: 1} and split into 2 chunks.
+st.shardColl(targetColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
- // Write two documents in the source collection that should target the two chunks in the target
- // collection.
- assert.commandWorked(sourceColl.insert({_id: 0, shardKey: -1}));
- assert.commandWorked(sourceColl.insert({_id: 1, shardKey: 1}));
+// Write two documents in the source collection that should target the two chunks in the target
+// collection.
+assert.commandWorked(sourceColl.insert({_id: 0, shardKey: -1}));
+assert.commandWorked(sourceColl.insert({_id: 1, shardKey: 1}));
- withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
- runMergeWithMode(whenMatchedMode, whenNotMatchedMode, targetColl);
- });
+withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
+ runMergeWithMode(whenMatchedMode, whenNotMatchedMode, targetColl);
+});
- st.stop();
+st.stop();
})();