summaryrefslogtreecommitdiff
path: root/jstests/sharding/bump_transaction_prevents_extra_deletion_task_write.js
blob: 4c130b42071b123ad37e7c5bc0f2c6568aec08ba (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
/*
 *  Tests that the deletion task is not written if the donor recovers after the decision is
 * recorded.
 */

TestData.skipCheckingUUIDsConsistentAcrossCluster = true;

(function() {
"use strict";

load("jstests/libs/fail_point_util.js");
load('jstests/libs/parallel_shell_helpers.js');

const dbName = "test";

// Create 2 shards with 3 replicas each.
let st = new ShardingTest({shards: {rs0: {nodes: 3}, rs1: {nodes: 3}}});

assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: st.shard0.shardName}));

function getNewNs(dbName) {
    if (typeof getNewNs.counter == 'undefined') {
        getNewNs.counter = 0;
    }
    getNewNs.counter++;
    const collName = "ns" + getNewNs.counter;
    return [collName, dbName + "." + collName];
}

function moveChunkParallel(ns, toShard) {
    return startParallelShell(funWithArgs(function(ns, toShard) {
                                  db.adminCommand({moveChunk: ns, find: {x: 50}, to: toShard});
                              }, ns, toShard), st.s.port);
}

function sendRecvChunkStart(conn, ns, id, sessionId, lsid, from, to) {
    let cmd = {
        _recvChunkStart: ns,
        uuid: id,
        lsid: lsid,
        txnNumber: NumberLong(0),
        sessionId: sessionId,
        from: from.host,
        fromShardName: from.shardName,
        toShardName: to.shardName,
        min: {x: 50.0},
        max: {x: MaxKey},
        shardKeyPattern: {x: 1.0},
        resumableRangeDeleterDisabled: false
    };

    return conn.getDB("admin").runCommand(cmd);
}

function sendRecvChunkStatus(conn, ns, sessionId) {
    let cmd = {
        _recvChunkStatus: ns,
        waitForSteadyOrDone: true,
        sessionId: sessionId,
    };

    return conn.getDB("admin").runCommand(cmd);
}

(() => {
    jsTestLog(
        "Test that recovering a migration coordination ensures a delayed _recvChunkStart does not \
         cause the recipient to re-insert a range deletion task");

    const [collName, ns] = getNewNs(dbName);

    assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
    assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 50}}));

    // Insert documents into both chunks on shard0.
    let testColl = st.s.getDB(dbName).getCollection(collName);
    for (let i = 0; i < 100; i++) {
        testColl.insert({x: i});
    }

    let donorPrimary = st.rs0.getPrimary();
    let failpoint = configureFailPoint(donorPrimary, 'moveChunkHangAtStep3');

    // Move chunk [50, inf) to shard1.
    const awaitShell = moveChunkParallel(ns, st.shard1.shardName);

    failpoint.wait();

    // Get the migration doc from the donor.
    let migrationDocColl = donorPrimary.getDB("config").migrationCoordinators;
    let migrationDoc = migrationDocColl.findOne();
    jsTestLog("migration doc: " + tojson(migrationDoc));

    // Step down current primary.
    assert.commandWorked(donorPrimary.adminCommand({replSetStepDown: 60, force: 1}));

    failpoint.off();
    awaitShell();

    // Recovery should have occurred. Get new primary and verify migration doc is deleted.
    donorPrimary = st.rs0.getPrimary();
    migrationDocColl = donorPrimary.getDB("config").migrationCoordinators;

    assert.soon(() => {
        return migrationDocColl.find().itcount() == 0;
    });

    let recipientPrimary = st.rs1.getPrimary();

    // Wait until the deletion task has been processed on recipient before sending the second
    // recvChunkStart.
    assert.soon(() => {
        return recipientPrimary.getDB("config").rangeDeletions.find().itcount() == 0;
    });

    // Simulate that the recipient received a delayed _recvChunkStart message by sending one
    // directly to the recipient, and ensure that the _recvChunkStart fails with TransactionTooOld
    // without inserting a new range deletion task.Since the business logic of
    //_recvChunkStart is executed asynchronously, use _recvChunkStatus to check the
    // result of the _recvChunkStart.
    assert.commandWorked(sendRecvChunkStart(recipientPrimary,
                                            ns,
                                            migrationDoc._id,
                                            migrationDoc.migrationSessionId,
                                            migrationDoc.lsid,
                                            st.shard0,
                                            st.shard1));

    assert.soon(() => {
        let result = sendRecvChunkStatus(recipientPrimary, ns, migrationDoc.migrationSessionId);
        jsTestLog("recvChunkStatus: " + tojson(result));

        return result.state === "fail" &&
            result.errmsg.startsWith("migrate failed: TransactionTooOld:");
    });

    // Verify deletion task doesn't exist on recipient.
    assert.eq(recipientPrimary.getDB("config").rangeDeletions.find().itcount(), 0);
})();

(() => {
    jsTestLog(
        "Test that completing a migration coordination at the end of moveChunk ensures a delayed \
         _recvChunkStart does not cause the recipient to re - insert a range deletion task");

    const [collName, ns] = getNewNs(dbName);

    assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {x: 1}}));
    assert.commandWorked(st.s.adminCommand({split: ns, middle: {x: 50}}));

    // Insert documents into both chunks on shard0.
    let testColl = st.s.getDB(dbName).getCollection(collName);
    for (let i = 0; i < 100; i++) {
        testColl.insert({x: i});
    }

    let donorPrimary = st.rs0.getPrimary();
    let failpoint = configureFailPoint(donorPrimary, 'moveChunkHangAtStep3');

    // Move chunk [50, inf) to shard1.
    const awaitShell = moveChunkParallel(ns, st.shard1.shardName);

    failpoint.wait();

    // Get the migration doc from the donor.
    let migrationDocColl = donorPrimary.getDB("config").migrationCoordinators;
    let migrationDoc = migrationDocColl.findOne();
    jsTestLog("migration doc: " + tojson(migrationDoc));

    failpoint.off();
    awaitShell();

    // Simulate that the recipient received a delayed _recvChunkStart message by sending one
    // directly to the recipient, and ensure that the _recvChunkStart fails with TransactionTooOld
    // without inserting a new range deletion task.Since the business logic of
    //_recvChunkStart is executed asynchronously, use _recvChunkStatus to check the
    // result of the _recvChunkStart.
    let recipientPrimary = st.rs1.getPrimary();
    assert.commandWorked(sendRecvChunkStart(recipientPrimary,
                                            ns,
                                            migrationDoc._id,
                                            migrationDoc.migrationSessionId,
                                            migrationDoc.lsid,
                                            st.shard0,
                                            st.shard1));

    assert.soon(() => {
        let result = sendRecvChunkStatus(recipientPrimary, ns, migrationDoc.migrationSessionId);
        jsTestLog("recvChunkStatus: " + tojson(result));

        return result.state === "fail" &&
            result.errmsg.startsWith("migrate failed: TransactionTooOld:");
    });

    // Verify deletion task doesn't exist on recipient.
    assert.eq(recipientPrimary.getDB("config").rangeDeletions.find().itcount(), 0);
})();

st.stop();
})();