summaryrefslogtreecommitdiff
path: root/jstests/sharding/cancel_coordinate_txn_commit_with_tickets_exhausted.js
blob: 05ed2ad6145f964630db2049fa149257f88c0a14 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
/**
 * The test checks that the TransactionCoordinator will not crash if the transaction is aborted when
 * attempting to commit a transaction.
 *
 * Step 1. Run and commit a transaction in order to initialize TransactionCoordinator.
 *
 * Step 2. Run `kNumWriteTickets` remove operations in parallel. So that they take up all of the
 * WiredTiger tickets.
 *
 * Step 3. Run a transaction in parallel, but do not attempt to commit it until
 * all of the remove operations have taken WiredTiger tickets. Step 4. Wait for the transaction to
 * reach the `deletingCoordinatorDoc` state.
 *
 * Step 5. Turn off the `hangWithLockDuringBatchRemoveFp`
 * and join the parallel remove operations and transaction thread.
 *
 * @tags: [
 *   requires_fcv_70,
 *   uses_multi_shard_transaction,
 *   uses_transactions,
 * ]
 */

(function() {
"use strict";

load("jstests/libs/fail_point_util.js");
load('jstests/libs/parallelTester.js');
load("jstests/sharding/libs/create_sharded_collection_util.js");

const kNumWriteTickets = 10;
const st = new ShardingTest({
    mongos: 1,
    config: 1,
    shards: 2,
    rs: {nodes: 1},
    rsOptions: {
        setParameter: {
            // This test requires a fixed ticket pool size.
            storageEngineConcurrencyAdjustmentAlgorithm: "fixedConcurrentTransactions",
            wiredTigerConcurrentWriteTransactions: kNumWriteTickets,
            // Lower transactionLifetimeLimitSeconds to cause TransactionCoordinators which haven't
            // yet made their commit or abort decision to time out and abort the transaction.
            transactionLifetimeLimitSeconds: 20,
        }
    }
});

const dbName = "test";
const collName = "mycoll";
const failpointName = 'hangWithLockDuringBatchRemove';
const sourceCollection = st.s.getDB(dbName).getCollection(collName);
const txnCoordinator = st.rs1.getPrimary();
const insertLatch = new CountDownLatch(1);

CreateShardedCollectionUtil.shardCollectionWithChunks(sourceCollection, {key: 1}, [
    {min: {key: MinKey}, max: {key: 0}, shard: st.shard0.shardName},
    {min: {key: 0}, max: {key: MaxKey}, shard: st.shard1.shardName},
]);

// The ShardServerCatalogCacheLoader thread may perform deletes in the background on the
// config.cache.chunks.test.mycoll collection. Its deletes are also eligible to be blocked by the
// hangWithLockDuringBatchRemove failpoint and will prevent a removeOperation thread from acquiring
// a WiredTiger write ticket. We wait for it to finish any delete operations by explicitly running
// the _flushRoutingTableCacheUpdates command.
assert.commandWorked(
    txnCoordinator.adminCommand({_flushRoutingTableCacheUpdates: sourceCollection.getFullName()}));

const removeOperationThreads = Array.from({length: kNumWriteTickets}).map(() => {
    return new Thread(function removeOperation(host, dbName, collName, insertLatch) {
        const conn = new Mongo(host);
        const testDB = conn.getDB(dbName);
        const coll = testDB.getCollection(collName);
        insertLatch.await();
        assert.commandWorked(coll.remove({key: 200}, {justOne: true}));
    }, st.s.host, dbName, collName, insertLatch);
});

const session = st.s.startSession({causalConsistency: false});
const sessionCollection = session.getDatabase(dbName).getCollection(collName);

// A two-phase commit transaction is first run to ensure the TransactionCoordinator has recovered
// and persisted a topology time. The transactionThread will run a second two-phase commit
// transaction using the same shard for coordinating the transaction. This ensures the
// transactionThread won't need to persist a topology time. The scenario reported in SERVER-60685
// depended on the TransactionCoordinator being interrupted while persisting the participant list
// which happens after waiting for the topology time to become durable.
session.startTransaction();
assert.commandWorked(sessionCollection.insert({key: 400}));
assert.commandWorked(sessionCollection.insert({key: -400}));
assert.commandWorked(session.commitTransaction_forTesting());

const hangWithLockDuringBatchRemoveFp = configureFailPoint(txnCoordinator, failpointName);

const transactionThread = new Thread(
    function runTwoPhaseCommitTxnAndTimeOutBeforeWritingParticipantList(
        host, dbName, collName, failpointName, totalWriteTickets, insertLatch) {
        const conn = new Mongo(host);
        const session = conn.startSession({causalConsistency: false});
        const sessionCollection = session.getDatabase(dbName).getCollection(collName);
        session.startTransaction();
        assert.commandWorked(sessionCollection.insert({key: 400}));
        assert.commandWorked(sessionCollection.insert({key: -400}));
        insertLatch.countDown();

        const currentOp = (pipeline = []) =>
            conn.getDB("admin").aggregate([{$currentOp: {}}, ...pipeline]).toArray();
        assert.soon(() => {
            const removeOperations = currentOp([
                {$match: {op: "remove", failpointMsg: failpointName, ns: `${dbName}.${collName}`}}
            ]);
            return removeOperations.length === totalWriteTickets;
        }, () => `Timed out waiting for the remove operations: ${tojson(currentOp())}`);

        // After here all of the WiredTiger write tickets should be taken.
        assert.commandFailedWithCode(session.commitTransaction_forTesting(),
                                     ErrorCodes.NoSuchTransaction);
    },
    st.s.host,
    dbName,
    collName,
    failpointName,
    kNumWriteTickets,
    insertLatch);

transactionThread.start();

removeOperationThreads.forEach(thread => thread.start());

let twoPhaseCommitCoordinatorServerStatus;
assert.soon(
    () => {
        twoPhaseCommitCoordinatorServerStatus =
            txnCoordinator.getDB(dbName).serverStatus().twoPhaseCommitCoordinator;
        const {deletingCoordinatorDoc, waitingForDecisionAcks, writingDecision} =
            twoPhaseCommitCoordinatorServerStatus.currentInSteps;
        return deletingCoordinatorDoc.toNumber() === 1 || waitingForDecisionAcks.toNumber() === 1 ||
            writingDecision.toNumber() === 1;
    },
    () => `Failed to find 1 total transactions in a state past kWaitingForVotes: ${
        tojson(twoPhaseCommitCoordinatorServerStatus)}`);

hangWithLockDuringBatchRemoveFp.off();

transactionThread.join();
removeOperationThreads.forEach((thread) => {
    thread.join();
});

st.stop();
})();