summaryrefslogtreecommitdiff
path: root/jstests/sharding/merge_with_move_primary.js
blob: 859d15142b3fe929217253e669d6e6a1c2f8284c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
// Tests that the $merge aggregation stage is resilient to move primary in both the source and
// output collection during execution.
(function() {
'use strict';

load("jstests/aggregation/extras/merge_helpers.js");  // For withEachMergeMode.

const st = new ShardingTest({shards: 2, rs: {nodes: 1}});

const mongosDB = st.s.getDB(jsTestName());
const sourceColl = mongosDB["source"];
const targetColl = mongosDB["target"];

function setAggHang(mode) {
    assert.commandWorked(st.shard0.adminCommand(
        {configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch", mode: mode}));
    assert.commandWorked(st.shard1.adminCommand(
        {configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch", mode: mode}));

    assert.commandWorked(st.shard0.adminCommand(
        {configureFailPoint: "hangWhileBuildingDocumentSourceOutBatch", mode: mode}));
    assert.commandWorked(st.shard1.adminCommand(
        {configureFailPoint: "hangWhileBuildingDocumentSourceOutBatch", mode: mode}));
}

function runPipelineWithStage({stage, shardedColl, expectedfailCode, expectedNumDocs}) {
    // Set the failpoint to hang in the first call to DocumentSourceCursor's getNext().
    setAggHang("alwaysOn");

    // Set the primary shard.
    st.ensurePrimaryShard(mongosDB.getName(), st.shard0.shardName);

    let comment = jsTestName() + "_comment";
    let outFn = `
            const sourceDB = db.getSiblingDB(jsTestName());
            const sourceColl = sourceDB["${sourceColl.getName()}"];
            let cmdRes = sourceDB.runCommand({
                aggregate: "${sourceColl.getName()}",
                pipeline: [${tojson(stage)}],
                cursor: {},
                comment: "${comment}"
            });
            if (${expectedfailCode} !== undefined) {
                assert.commandFailedWithCode(cmdRes, ${expectedfailCode});
            } else {
                assert.commandWorked(cmdRes);
            }
        `;

    // Start the $merge aggregation in a parallel shell.
    let outShell = startParallelShell(outFn, st.s.port);

    // Wait for the parallel shell to hit the failpoint.
    assert.soon(() => mongosDB
                          .currentOp({
                              $or: [
                                  {op: "command", "command.comment": comment},
                                  {op: "getmore", "cursor.originatingCommand.comment": comment}
                              ]
                          })
                          .inprog.length == 1,
                () => tojson(mongosDB.currentOp().inprog));

    // Migrate the primary shard from shard0 to shard1.
    st.ensurePrimaryShard(mongosDB.getName(), st.shard1.shardName);

    // Unset the failpoint to unblock the $merge and join with the parallel shell.
    setAggHang("off");
    outShell();

    // Verify that the $merge succeeded.
    if (expectedfailCode === undefined) {
        assert.eq(expectedNumDocs, targetColl.find().itcount());
    }

    assert.commandWorked(targetColl.remove({}));
}

// The source collection is unsharded.
assert.commandWorked(sourceColl.insert({shardKey: -1}));
assert.commandWorked(sourceColl.insert({shardKey: 1}));

withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
    runPipelineWithStage({
        stage: {
            $merge: {
                into: targetColl.getName(),
                whenMatched: whenMatchedMode,
                whenNotMatched: whenNotMatchedMode
            }
        },
        shardedColl: sourceColl,
        expectedNumDocs: whenNotMatchedMode == "discard" ? 0 : 2,
        expectedfailCode: whenNotMatchedMode == "fail" ? 13113 : undefined
    });
});
sourceColl.drop();

// Shard the source collection with shard key {shardKey: 1} and split into 2 chunks.
st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());

// Write a document to each chunk of the source collection.
assert.commandWorked(sourceColl.insert({shardKey: -1}));
assert.commandWorked(sourceColl.insert({shardKey: 1}));

withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
    runPipelineWithStage({
        stage: {
            $merge: {
                into: targetColl.getName(),
                whenMatched: whenMatchedMode,
                whenNotMatched: whenNotMatchedMode
            }
        },
        shardedColl: sourceColl,
        expectedNumDocs: whenNotMatchedMode == "discard" ? 0 : 2,
        expectedfailCode: whenNotMatchedMode == "fail" ? 13113 : undefined
    });
});

sourceColl.drop();

// Shard the source collection with shard key {shardKey: 1} and split into 2 chunks.
st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());

// Write two documents in the source collection that should target the two chunks in the target
// collection.
assert.commandWorked(sourceColl.insert({shardKey: -1}));
assert.commandWorked(sourceColl.insert({shardKey: 1}));

// Note that the legacy $out is not supported with an existing sharded output collection.
withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
    runPipelineWithStage({
        stage: {
            $merge: {
                into: targetColl.getName(),
                whenMatched: whenMatchedMode,
                whenNotMatched: whenNotMatchedMode
            }
        },
        shardedColl: targetColl,
        expectedNumDocs: whenNotMatchedMode == "discard" ? 0 : 2,
        expectedfailCode: whenNotMatchedMode == "fail" ? 13113 : undefined
    });
});

sourceColl.drop();
targetColl.drop();

// Shard the collections with shard key {shardKey: 1} and split into 2 chunks.
st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());
st.shardColl(targetColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());

// Write two documents in the source collection that should target the two chunks in the target
// collection.
assert.commandWorked(sourceColl.insert({shardKey: -1}));
assert.commandWorked(sourceColl.insert({shardKey: 1}));

withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
    runPipelineWithStage({
        stage: {
            $merge: {
                into: targetColl.getName(),
                whenMatched: whenMatchedMode,
                whenNotMatched: whenNotMatchedMode
            }
        },
        shardedColl: targetColl,
        expectedNumDocs: whenNotMatchedMode == "discard" ? 0 : 2,
        expectedfailCode: whenNotMatchedMode == "fail" ? 13113 : undefined
    });
});

st.stop();
})();