summaryrefslogtreecommitdiff
path: root/jstests/replsets/rollback_with_coalesced_txn_table_updates_during_oplog_application.js
blob: c480313fa9a21bef1cb49ddd1d5480fe5bbe703c (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
/**
 * Tests that the rollback procedure will update the 'config.transactions' table to be consistent
 * with the node data at the 'stableTimestamp', specifically in the case where multiple derived ops
 * to the 'config.transactions' table were coalesced into a single operation during secondary oplog
 * application.
 * We also test that if a node crashes after oplog truncation during rollback, the update made to
 * the 'config.transactions' table is persisted on startup.
 *
 * @tags: [requires_persistence, disabled_due_to_server_58295]
 */

(function() {
load("jstests/libs/fail_point_util.js");
load("jstests/libs/write_concern_util.js");

const oplogApplierBatchSize = 100;

function runTest(crashAfterRollbackTruncation) {
    jsTestLog(`Running test with crashAfterRollbackTruncation = ${crashAfterRollbackTruncation}`);
    const rst = new ReplSetTest({
        nodes: {
            n0: {},
            // Set the 'syncdelay' to 1s to speed up checkpointing. Also explicitly set the batch
            // size for oplog application to ensure the number of retryable write statements being
            // made majority committed isn't a multiple of it.
            n1: {syncdelay: 1, setParameter: {replBatchLimitOperations: oplogApplierBatchSize}},
            // Set the bgSyncOplogFetcherBatchSize to 1 oplog entry to guarantee replication
            // progress with the stopReplProducerOnDocument failpoint.
            n2: {setParameter: {bgSyncOplogFetcherBatchSize: 1}},
            n3: {setParameter: {bgSyncOplogFetcherBatchSize: 1}},
            n4: {setParameter: {bgSyncOplogFetcherBatchSize: 1}},
        },
        // Force secondaries to sync from the primary to guarantee replication progress with the
        // stopReplProducerOnDocument failpoint. Also disable primary catchup because some
        // replicated retryable write statements are intentionally not being made majority
        // committed.
        settings: {chainingAllowed: false, catchUpTimeoutMillis: 0},
    });
    rst.startSet();
    rst.initiate();

    const primary = rst.getPrimary();
    const ns = "test.retryable_write_partial_rollback";
    assert.commandWorked(
        primary.getCollection(ns).insert({_id: 0, counter: 0}, {writeConcern: {w: 5}}));
    // The default WC is majority and this test can't satisfy majority writes.
    assert.commandWorked(primary.adminCommand(
        {setDefaultRWConcern: 1, defaultWriteConcern: {w: 1}, writeConcern: {w: "majority"}}));
    rst.awaitReplication();

    let [secondary1, secondary2, secondary3, secondary4] = rst.getSecondaries();

    // Disable replication on all of the secondaries to manually control the replication progress.
    const stopReplProducerFailpoints = [secondary1, secondary2, secondary3, secondary4].map(
        conn => configureFailPoint(conn, 'stopReplProducer'));

    // While replication is still entirely disabled, additionally disable replication partway into
    // the retryable write on all but the first secondary. The idea is that while secondary1 will
    // apply all of the oplog entries in a single batch, the other secondaries will only apply up to
    // counterMajorityCommitted oplog entries.
    const counterTotal = oplogApplierBatchSize;
    const counterMajorityCommitted = counterTotal - 2;
    const stopReplProducerOnDocumentFailpoints = [secondary2, secondary3, secondary4].map(
        conn => configureFailPoint(conn,
                                   'stopReplProducerOnDocument',
                                   {document: {"diff.u.counter": counterMajorityCommitted + 1}}));

    const lsid = ({id: UUID()});

    assert.commandWorked(primary.getCollection(ns).runCommand("update", {
        updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
        lsid,
        txnNumber: NumberLong(1),
    }));

    const stmtMajorityCommitted = primary.getCollection("local.oplog.rs")
                                      .findOne({ns, "o.diff.u.counter": counterMajorityCommitted});
    assert.neq(null, stmtMajorityCommitted);

    for (const fp of stopReplProducerFailpoints) {
        fp.off();

        // Wait for the secondary to have applied through the counterMajorityCommitted retryable
        // write statement. We do this for each secondary individually, starting with secondary1, to
        // guarantee that secondary1 will advance its stable_timestamp when learning of the other
        // secondaries also having applied through counterMajorityCommitted.
        assert.soon(() => {
            const {optimes: {appliedOpTime, durableOpTime}} =
                assert.commandWorked(fp.conn.adminCommand({replSetGetStatus: 1}));

            print(`${fp.conn.host}: ${tojsononeline({
                appliedOpTime,
                durableOpTime,
                stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts
            })}`);

            return bsonWoCompare(appliedOpTime.ts, stmtMajorityCommitted.ts) >= 0 &&
                bsonWoCompare(durableOpTime.ts, stmtMajorityCommitted.ts) >= 0;
        });
    }

    // Wait for secondary1 to have advanced its stable_timestamp.
    assert.soon(() => {
        const {lastStableRecoveryTimestamp} =
            assert.commandWorked(secondary1.adminCommand({replSetGetStatus: 1}));

        print(`${secondary1.host}: ${tojsononeline({
            lastStableRecoveryTimestamp,
            stmtMajorityCommittedTimestamp: stmtMajorityCommitted.ts
        })}`);

        return bsonWoCompare(lastStableRecoveryTimestamp, stmtMajorityCommitted.ts) >= 0;
    });

    // Step up one of the other secondaries and do a write which becomes majority committed to force
    // secondary1 to go into rollback.
    rst.freeze(secondary1);
    let hangAfterTruncate = configureFailPoint(secondary1, 'hangAfterOplogTruncationInRollback');
    assert.commandWorked(secondary2.adminCommand({replSetStepUp: 1}));
    rst.freeze(primary);
    rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary2);

    for (const fp of stopReplProducerOnDocumentFailpoints) {
        fp.off();
    }

    // Wait for secondary2 to be a writable primary.
    rst.getPrimary();

    // Do a write which becomes majority committed and wait for secondary1 to complete its rollback.
    assert.commandWorked(
        secondary2.getCollection("test.dummy").insert({}, {writeConcern: {w: 'majority'}}));

    // Wait for rollback to finish truncating oplog.
    // Entering rollback will close connections so we expect some network errors while waiting.
    assert.soonNoExcept(() => {
        hangAfterTruncate.wait();
        return true;
    }, `failed to wait for fail point ${hangAfterTruncate.failPointName}`);

    if (crashAfterRollbackTruncation) {
        // Crash the node after it performs oplog truncation.
        rst.stop(secondary1, 9, {allowedExitCode: MongoRunner.EXIT_SIGKILL});
        secondary1 = rst.restart(secondary1, {
            "noReplSet": false,
            setParameter: 'failpoint.stopReplProducer=' + tojson({mode: 'alwaysOn'})
        });
        rst.waitForState(secondary1, ReplSetTest.State.SECONDARY);
        secondary1.setSecondaryOk();
        // On startup, we expect to see the update persisted in the 'config.transactions' table.
        let restoredDoc =
            secondary1.getCollection('config.transactions').findOne({"_id.id": lsid.id});
        assert.neq(null, restoredDoc);
        secondary1.adminCommand({configureFailPoint: "stopReplProducer", mode: "off"});
    } else {
        // Lift the failpoint to let rollback complete and wait for state to change to SECONDARY.
        hangAfterTruncate.off();
        rst.waitForState(secondary1, ReplSetTest.State.SECONDARY);
    }

    // Reconnect to secondary1 after it completes its rollback and step it up to be the new primary.
    rst.awaitNodesAgreeOnPrimary(undefined, undefined, secondary2);
    assert.commandWorked(secondary1.adminCommand({replSetFreeze: 0}));
    rst.stepUp(secondary1);

    const docBeforeRetry = secondary1.getCollection(ns).findOne({_id: 0});
    assert.eq(docBeforeRetry, {_id: 0, counter: counterMajorityCommitted});

    assert.commandWorked(secondary1.getCollection(ns).runCommand("update", {
        updates: Array.from({length: counterTotal}, () => ({q: {_id: 0}, u: {$inc: {counter: 1}}})),
        lsid,
        txnNumber: NumberLong(1),
        writeConcern: {w: 5},
    }));

    const docAfterRetry = secondary1.getCollection(ns).findOne({_id: 0});
    assert.eq(docAfterRetry, {_id: 0, counter: counterTotal});

    rst.stopSet();
}

// Test the general scenario where we perform the appropriate update to the 'config.transactions'
// table during rollback.
runTest(false);
// Extends the test to crash the secondary in the middle of rollback right after oplog truncation.
// We assert that the update made to the 'config.transactions' table persisted on startup.
runTest(true);
})();