summaryrefslogtreecommitdiff
path: root/jstests/sharding/merge_with_drop_shard.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/sharding/merge_with_drop_shard.js')
-rw-r--r--jstests/sharding/merge_with_drop_shard.js204
1 files changed, 101 insertions, 103 deletions
diff --git a/jstests/sharding/merge_with_drop_shard.js b/jstests/sharding/merge_with_drop_shard.js
index 442f4c89a4c..cc03ea31c42 100644
--- a/jstests/sharding/merge_with_drop_shard.js
+++ b/jstests/sharding/merge_with_drop_shard.js
@@ -1,61 +1,60 @@
// Tests that the $merge aggregation stage is resilient to drop shard in both the source and
// output collection during execution.
(function() {
- 'use strict';
-
- load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode.
-
- const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
-
- const mongosDB = st.s.getDB(jsTestName());
- const sourceColl = mongosDB["source"];
- const targetColl = mongosDB["target"];
-
- assert.commandWorked(st.s.getDB("admin").runCommand({enableSharding: mongosDB.getName()}));
- st.ensurePrimaryShard(mongosDB.getName(), st.shard1.name);
-
- function setAggHang(mode) {
- assert.commandWorked(st.shard0.adminCommand(
- {configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch", mode: mode}));
- assert.commandWorked(st.shard1.adminCommand(
- {configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch", mode: mode}));
- }
-
- function removeShard(shard) {
- // We need the balancer to drain all the chunks out of the shard that is being removed.
- assert.commandWorked(st.startBalancer());
- st.waitForBalancer(true, 60000);
- var res = st.s.adminCommand({removeShard: shard.shardName});
+'use strict';
+
+load("jstests/aggregation/extras/merge_helpers.js"); // For withEachMergeMode.
+
+const st = new ShardingTest({shards: 2, rs: {nodes: 1}});
+
+const mongosDB = st.s.getDB(jsTestName());
+const sourceColl = mongosDB["source"];
+const targetColl = mongosDB["target"];
+
+assert.commandWorked(st.s.getDB("admin").runCommand({enableSharding: mongosDB.getName()}));
+st.ensurePrimaryShard(mongosDB.getName(), st.shard1.name);
+
+function setAggHang(mode) {
+ assert.commandWorked(st.shard0.adminCommand(
+ {configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch", mode: mode}));
+ assert.commandWorked(st.shard1.adminCommand(
+ {configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch", mode: mode}));
+}
+
+function removeShard(shard) {
+ // We need the balancer to drain all the chunks out of the shard that is being removed.
+ assert.commandWorked(st.startBalancer());
+ st.waitForBalancer(true, 60000);
+ var res = st.s.adminCommand({removeShard: shard.shardName});
+ assert.commandWorked(res);
+ assert.eq('started', res.state);
+ assert.soon(function() {
+ res = st.s.adminCommand({removeShard: shard.shardName});
assert.commandWorked(res);
- assert.eq('started', res.state);
- assert.soon(function() {
- res = st.s.adminCommand({removeShard: shard.shardName});
- assert.commandWorked(res);
- return ('completed' === res.state);
- }, "removeShard never completed for shard " + shard.shardName);
-
- // Drop the test database on the removed shard so it does not interfere with addShard later.
- assert.commandWorked(shard.getDB(mongosDB.getName()).dropDatabase());
-
- st.configRS.awaitLastOpCommitted();
- assert.commandWorked(st.s.adminCommand({flushRouterConfig: 1}));
- assert.commandWorked(st.stopBalancer());
- st.waitForBalancer(false, 60000);
- }
-
- function addShard(shard) {
- assert.commandWorked(st.s.adminCommand({addShard: shard}));
- assert.commandWorked(st.s.adminCommand(
- {moveChunk: sourceColl.getFullName(), find: {shardKey: 0}, to: shard}));
- }
- function runMergeWithMode(
- whenMatchedMode, whenNotMatchedMode, shardedColl, dropShard, expectFailCode) {
- // Set the failpoint to hang in the first call to DocumentSourceCursor's getNext().
- setAggHang("alwaysOn");
-
- let comment =
- whenMatchedMode + "_" + whenNotMatchedMode + "_" + shardedColl.getName() + "_1";
- let outFn = `
+ return ('completed' === res.state);
+ }, "removeShard never completed for shard " + shard.shardName);
+
+ // Drop the test database on the removed shard so it does not interfere with addShard later.
+ assert.commandWorked(shard.getDB(mongosDB.getName()).dropDatabase());
+
+ st.configRS.awaitLastOpCommitted();
+ assert.commandWorked(st.s.adminCommand({flushRouterConfig: 1}));
+ assert.commandWorked(st.stopBalancer());
+ st.waitForBalancer(false, 60000);
+}
+
+function addShard(shard) {
+ assert.commandWorked(st.s.adminCommand({addShard: shard}));
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: sourceColl.getFullName(), find: {shardKey: 0}, to: shard}));
+}
+function runMergeWithMode(
+ whenMatchedMode, whenNotMatchedMode, shardedColl, dropShard, expectFailCode) {
+ // Set the failpoint to hang in the first call to DocumentSourceCursor's getNext().
+ setAggHang("alwaysOn");
+
+ let comment = whenMatchedMode + "_" + whenNotMatchedMode + "_" + shardedColl.getName() + "_1";
+ let outFn = `
const sourceDB = db.getSiblingDB(jsTestName());
const sourceColl = sourceDB["${sourceColl.getName()}"];
let cmdRes = sourceDB.runCommand({
@@ -76,61 +75,60 @@
}
`;
- // Start the $merge aggregation in a parallel shell.
- let mergeShell = startParallelShell(outFn, st.s.port);
-
- // Wait for the parallel shell to hit the failpoint.
- assert.soon(
- () => mongosDB
- .currentOp({
- $or: [
- {op: "command", "command.comment": comment},
- {op: "getmore", "cursor.originatingCommand.comment": comment}
- ]
- })
- .inprog.length >= 1,
- () => tojson(mongosDB.currentOp().inprog));
-
- if (dropShard) {
- removeShard(st.shard0);
- } else {
- addShard(st.rs0.getURL());
- }
- // Unset the failpoint to unblock the $merge and join with the parallel shell.
- setAggHang("off");
- mergeShell();
-
- assert.eq(2, targetColl.find().itcount());
+ // Start the $merge aggregation in a parallel shell.
+ let mergeShell = startParallelShell(outFn, st.s.port);
+
+ // Wait for the parallel shell to hit the failpoint.
+ assert.soon(() => mongosDB
+ .currentOp({
+ $or: [
+ {op: "command", "command.comment": comment},
+ {op: "getmore", "cursor.originatingCommand.comment": comment}
+ ]
+ })
+ .inprog.length >= 1,
+ () => tojson(mongosDB.currentOp().inprog));
+
+ if (dropShard) {
+ removeShard(st.shard0);
+ } else {
+ addShard(st.rs0.getURL());
}
+ // Unset the failpoint to unblock the $merge and join with the parallel shell.
+ setAggHang("off");
+ mergeShell();
+
+ assert.eq(2, targetColl.find().itcount());
+}
- // Shard the source collection with shard key {shardKey: 1} and split into 2 chunks.
- st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
+// Shard the source collection with shard key {shardKey: 1} and split into 2 chunks.
+st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
- // Shard the output collection with shard key {shardKey: 1} and split into 2 chunks.
- st.shardColl(targetColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
+// Shard the output collection with shard key {shardKey: 1} and split into 2 chunks.
+st.shardColl(targetColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
- // Write two documents in the source collection that should target the two chunks in the target
- // collection.
- assert.commandWorked(sourceColl.insert({shardKey: -1, _id: 0}));
- assert.commandWorked(sourceColl.insert({shardKey: 1, _id: 1}));
+// Write two documents in the source collection that should target the two chunks in the target
+// collection.
+assert.commandWorked(sourceColl.insert({shardKey: -1, _id: 0}));
+assert.commandWorked(sourceColl.insert({shardKey: 1, _id: 1}));
- withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
- assert.commandWorked(targetColl.remove({}));
+withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
+ assert.commandWorked(targetColl.remove({}));
- // Match the data from source into target so that we don't fail the assertion for
- // 'whenNotMatchedMode:fail/discard'.
- if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
- assert.commandWorked(targetColl.insert({shardKey: -1, _id: 0}));
- assert.commandWorked(targetColl.insert({shardKey: 1, _id: 1}));
- }
+ // Match the data from source into target so that we don't fail the assertion for
+ // 'whenNotMatchedMode:fail/discard'.
+ if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
+ assert.commandWorked(targetColl.insert({shardKey: -1, _id: 0}));
+ assert.commandWorked(targetColl.insert({shardKey: 1, _id: 1}));
+ }
- runMergeWithMode(whenMatchedMode, whenNotMatchedMode, targetColl, true, undefined);
- runMergeWithMode(whenMatchedMode,
- whenNotMatchedMode,
- targetColl,
- false,
- whenMatchedMode == "fail" ? ErrorCodes.DuplicateKey : undefined);
- });
+ runMergeWithMode(whenMatchedMode, whenNotMatchedMode, targetColl, true, undefined);
+ runMergeWithMode(whenMatchedMode,
+ whenNotMatchedMode,
+ targetColl,
+ false,
+ whenMatchedMode == "fail" ? ErrorCodes.DuplicateKey : undefined);
+});
- st.stop();
+st.stop();
})();