1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
|
load("jstests/libs/fail_point_util.js");
load('jstests/libs/parallel_shell_helpers.js');
function getNewNs(dbName) {
if (typeof getNewNs.counter == 'undefined') {
getNewNs.counter = 0;
}
getNewNs.counter++;
const collName = "ns" + getNewNs.counter;
return [collName, dbName + "." + collName];
}
function runMoveChunkMakeDonorStepDownAfterFailpoint(st,
dbName,
failpointName,
shouldMakeMigrationFailToCommitOnConfig,
expectAbortDecisionWithCode) {
const [collName, ns] = getNewNs(dbName);
jsTest.log("Running migration, making donor step down after failpoint " + failpointName +
"; shouldMakeMigrationFailToCommitOnConfig is " +
shouldMakeMigrationFailToCommitOnConfig + "; expectAbortDecisionWithCode is " +
expectAbortDecisionWithCode + "; ns is " + ns);
// Wait for mongos to see a primary node on the primary shard, because mongos does not retry
// writes on NotPrimary errors, and we are about to insert docs through mongos.
awaitRSClientHosts(st.s, st.rs0.getPrimary(), {ok: true, ismaster: true});
// Insert some docs into the collection so that the migration leaves orphans on either the
// donor or recipient, depending on the decision.
const numDocs = 1000;
var bulk = st.s.getDB(dbName).getCollection(collName).initializeUnorderedBulkOp();
for (var i = 0; i < numDocs; i++) {
bulk.insert({_id: i});
}
assert.commandWorked(bulk.execute());
// Shard the collection.
assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
if (shouldMakeMigrationFailToCommitOnConfig) {
// Turn on a failpoint to make the migration commit fail on the config server.
assert.commandWorked(st.configRS.getPrimary().adminCommand(
{configureFailPoint: "migrationCommitVersionError", mode: "alwaysOn"}));
}
jsTest.log("Run the moveChunk asynchronously and wait for " + failpointName + " to be hit.");
let failpointHandle = configureFailPoint(st.rs0.getPrimary(), failpointName);
const awaitResult = startParallelShell(
funWithArgs(function(ns, toShardName, expectAbortDecisionWithCode) {
if (expectAbortDecisionWithCode) {
assert.commandFailedWithCode(
db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName}),
expectAbortDecisionWithCode);
} else {
assert.commandWorked(
db.adminCommand({moveChunk: ns, find: {_id: 0}, to: toShardName}));
}
}, ns, st.shard1.shardName, expectAbortDecisionWithCode), st.s.port);
failpointHandle.wait();
jsTest.log("Make the donor primary step down.");
assert.commandWorked(
st.rs0.getPrimary().adminCommand({replSetStepDown: 10 /* stepDownSecs */, force: true}));
failpointHandle.off();
jsTest.log("Allow the moveChunk to finish.");
awaitResult();
if (expectAbortDecisionWithCode) {
jsTest.log("Expect abort decision, so wait for recipient to clean up the orphans.");
assert.soon(() => {
return 0 === st.rs1.getPrimary().getDB(dbName).getCollection(collName).count();
});
} else {
jsTest.log("Expect commit decision, so wait for donor to clean up the orphans.");
assert.soon(() => {
return 0 === st.rs0.getPrimary().getDB(dbName).getCollection(collName).count();
});
}
// Wait for mongos to see a new primary of rs0 before running the count command, because mongos
// will only wait 20 seconds to see a new primary from within the count command, and it may take
// longer for a new primary to be elected if both replica set nodes run for election at the same
// time (and therefore both lose the first election).
awaitRSClientHosts(st.s, st.rs0.getPrimary(), {ok: true, ismaster: true});
// The data should still be present on the shard that owns the chunk.
assert.eq(numDocs, st.s.getDB(dbName).getCollection(collName).count());
jsTest.log("Wait for the donor to delete the migration coordinator doc");
assert.soon(() => {
return 0 ===
st.rs0.getPrimary().getDB("config").getCollection("migrationCoordinators").count();
});
if (shouldMakeMigrationFailToCommitOnConfig) {
// Turn off the failpoint on the config server before returning.
assert.commandWorked(st.configRS.getPrimary().adminCommand(
{configureFailPoint: "migrationCommitVersionError", mode: "off"}));
}
}
|