summaryrefslogtreecommitdiff
path: root/jstests/sharding/merge_with_drop_shard.js
blob: 442f4c89a4c7d41a129ce0339e7fee36d3729705 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
// Tests that the $merge aggregation stage is resilient to drop shard in both the source and
// output collection during execution.
(function() {
    'use strict';

    load("jstests/aggregation/extras/merge_helpers.js");  // For withEachMergeMode.

    const st = new ShardingTest({shards: 2, rs: {nodes: 1}});

    const mongosDB = st.s.getDB(jsTestName());
    const sourceColl = mongosDB["source"];
    const targetColl = mongosDB["target"];

    assert.commandWorked(st.s.getDB("admin").runCommand({enableSharding: mongosDB.getName()}));
    st.ensurePrimaryShard(mongosDB.getName(), st.shard1.name);

    function setAggHang(mode) {
        assert.commandWorked(st.shard0.adminCommand(
            {configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch", mode: mode}));
        assert.commandWorked(st.shard1.adminCommand(
            {configureFailPoint: "hangWhileBuildingDocumentSourceMergeBatch", mode: mode}));
    }

    function removeShard(shard) {
        // We need the balancer to drain all the chunks out of the shard that is being removed.
        assert.commandWorked(st.startBalancer());
        st.waitForBalancer(true, 60000);
        var res = st.s.adminCommand({removeShard: shard.shardName});
        assert.commandWorked(res);
        assert.eq('started', res.state);
        assert.soon(function() {
            res = st.s.adminCommand({removeShard: shard.shardName});
            assert.commandWorked(res);
            return ('completed' === res.state);
        }, "removeShard never completed for shard " + shard.shardName);

        // Drop the test database on the removed shard so it does not interfere with addShard later.
        assert.commandWorked(shard.getDB(mongosDB.getName()).dropDatabase());

        st.configRS.awaitLastOpCommitted();
        assert.commandWorked(st.s.adminCommand({flushRouterConfig: 1}));
        assert.commandWorked(st.stopBalancer());
        st.waitForBalancer(false, 60000);
    }

    function addShard(shard) {
        assert.commandWorked(st.s.adminCommand({addShard: shard}));
        assert.commandWorked(st.s.adminCommand(
            {moveChunk: sourceColl.getFullName(), find: {shardKey: 0}, to: shard}));
    }
    function runMergeWithMode(
        whenMatchedMode, whenNotMatchedMode, shardedColl, dropShard, expectFailCode) {
        // Set the failpoint to hang in the first call to DocumentSourceCursor's getNext().
        setAggHang("alwaysOn");

        let comment =
            whenMatchedMode + "_" + whenNotMatchedMode + "_" + shardedColl.getName() + "_1";
        let outFn = `
            const sourceDB = db.getSiblingDB(jsTestName());
            const sourceColl = sourceDB["${sourceColl.getName()}"];
            let cmdRes = sourceDB.runCommand({
                aggregate: "${sourceColl.getName()}",
                pipeline: [{$merge: {
                    into: "${targetColl.getName()}",
                    whenMatched: ${tojson(whenMatchedMode)},
                    whenNotMatched: "${whenNotMatchedMode}"
                }}],
                cursor: {},
                comment: "${comment}"
            });
            
            if (${expectFailCode} !== undefined) {
                assert.commandFailedWithCode(cmdRes, ${expectFailCode});
            } else {
                assert.commandWorked(cmdRes);
            }
        `;

        // Start the $merge aggregation in a parallel shell.
        let mergeShell = startParallelShell(outFn, st.s.port);

        // Wait for the parallel shell to hit the failpoint.
        assert.soon(
            () => mongosDB
                      .currentOp({
                          $or: [
                              {op: "command", "command.comment": comment},
                              {op: "getmore", "cursor.originatingCommand.comment": comment}
                          ]
                      })
                      .inprog.length >= 1,
            () => tojson(mongosDB.currentOp().inprog));

        if (dropShard) {
            removeShard(st.shard0);
        } else {
            addShard(st.rs0.getURL());
        }
        // Unset the failpoint to unblock the $merge and join with the parallel shell.
        setAggHang("off");
        mergeShell();

        assert.eq(2, targetColl.find().itcount());
    }

    // Shard the source collection with shard key {shardKey: 1} and split into 2 chunks.
    st.shardColl(sourceColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());

    // Shard the output collection with shard key {shardKey: 1} and split into 2 chunks.
    st.shardColl(targetColl.getName(), {shardKey: 1}, {shardKey: 0}, false, mongosDB.getName());

    // Write two documents in the source collection that should target the two chunks in the target
    // collection.
    assert.commandWorked(sourceColl.insert({shardKey: -1, _id: 0}));
    assert.commandWorked(sourceColl.insert({shardKey: 1, _id: 1}));

    withEachMergeMode(({whenMatchedMode, whenNotMatchedMode}) => {
        assert.commandWorked(targetColl.remove({}));

        // Match the data from source into target so that we don't fail the assertion for
        // 'whenNotMatchedMode:fail/discard'.
        if (whenNotMatchedMode == "fail" || whenNotMatchedMode == "discard") {
            assert.commandWorked(targetColl.insert({shardKey: -1, _id: 0}));
            assert.commandWorked(targetColl.insert({shardKey: 1, _id: 1}));
        }

        runMergeWithMode(whenMatchedMode, whenNotMatchedMode, targetColl, true, undefined);
        runMergeWithMode(whenMatchedMode,
                         whenNotMatchedMode,
                         targetColl,
                         false,
                         whenMatchedMode == "fail" ? ErrorCodes.DuplicateKey : undefined);
    });

    st.stop();
})();