summaryrefslogtreecommitdiff
path: root/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js
blob: a5681b54e1e099be97303a866ede9a57f1d939ea (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
/**
 * Validate SERVER-60682: TransactionCoordinator won't starve for a storage ticket to
 * persist its decision.
 *
 * @tags: [
 *   requires_fcv_70,
 *   uses_transactions,
 *   uses_multi_shard_transaction,
 *   uses_prepare_transaction,
 * ]
 */

(function() {
"use strict";

load("jstests/libs/fail_point_util.js");
load('jstests/libs/parallelTester.js');
load("jstests/sharding/libs/create_sharded_collection_util.js");

const kNumWriteTickets = 10;
const st = new ShardingTest({
    mongos: 1,
    config: 1,
    shards: 2,
    rs: {nodes: 1},
    rsOptions: {
        setParameter: {
            // This test requires a fixed ticket pool size.
            storageEngineConcurrencyAdjustmentAlgorithm: "fixedConcurrentTransactions",
            wiredTigerConcurrentWriteTransactions: kNumWriteTickets,
            // Raise maxTransactionLockRequestTimeoutMillis to prevent the transactions in prepare
            // conflict state from aborting early due to being unable to acquire a write ticket.
            // This is needed because we want to reproduce a scenario where the number of
            // transactions in prepare conflict state is greater or equal to the available storage
            // tickets.
            maxTransactionLockRequestTimeoutMillis: 24 * 60 * 60 * 1000,
            // Similarly, we need to keep transactions alive longer than the Evergreen test
            // execution timeout so as to be able to detect failure.
            // While the test environment may already set a large enough default
            // transactionLifetimeLimitSeconds, we nevertheless specify the lifetime to avoid
            // relying on a potentially changing default.
            transactionLifetimeLimitSeconds: 24 * 60 * 60,
        }
    }
});

const sourceCollection = st.s.getCollection("test.mycoll");
CreateShardedCollectionUtil.shardCollectionWithChunks(sourceCollection, {key: 1}, [
    {min: {key: MinKey}, max: {key: 0}, shard: st.shard0.shardName},
    {min: {key: 0}, max: {key: MaxKey}, shard: st.shard1.shardName},
]);

// Insert a document into each shard.
assert.commandWorked(sourceCollection.insert([{key: 200}, {key: -200}]));

// Create a thread which leaves the TransactionCoordinator in a state where prepareTransaction has
// been run on both participant shards and it is about to write the commit decision locally to the
// config.transaction_coordinators collection.
const preparedTxnThread = new Thread(function runTwoPhaseCommitTxn(host, dbName, collName) {
    const conn = new Mongo(host);
    const session = conn.startSession({causalConsistency: false});
    const sessionCollection = session.getDatabase(dbName).getCollection(collName);

    session.startTransaction();
    assert.commandWorked(sessionCollection.update({key: 200}, {$inc: {counter: 1}}));
    assert.commandWorked(sessionCollection.update({key: -200}, {$inc: {counter: 1}}));
    assert.commandWorked(session.commitTransaction_forTesting());
}, st.s.host, sourceCollection.getDB().getName(), sourceCollection.getName());

const txnCoordinator = st.rs1.getPrimary();
const hangBeforeWritingDecisionFp = configureFailPoint(txnCoordinator, "hangBeforeWritingDecision");

preparedTxnThread.start();
hangBeforeWritingDecisionFp.wait();

// Create other threads which will block on a prepare conflict while still holding a write ticket to
// test that the TransactionCoordinator from preparedTxnThread can still complete.
const prepareConflictThreads = [];
for (let i = 0; i < kNumWriteTickets; ++i) {
    const thread = new Thread(function hitPrepareConflictOnCoordinator(host, dbName, collName) {
        const conn = new Mongo(host);
        const session = conn.startSession({causalConsistency: false});
        const sessionCollection = session.getDatabase(dbName).getCollection(collName);

        session.startTransaction();
        // Do a write to ensure the transaction takes a write ticket.
        assert.commandWorked(sessionCollection.insert({key: 300}));
        // Then do a read which will block until the prepare conflict resolves.
        assert.eq({key: 200, counter: 1}, sessionCollection.findOne({key: 200}, {_id: 0}));
        assert.commandWorked(session.commitTransaction_forTesting());
    }, st.s.host, sourceCollection.getDB().getName(), sourceCollection.getName());
    prepareConflictThreads.push(thread);
    thread.start();
}

const currentOp = (pipeline = []) => st.admin.aggregate([{$currentOp: {}}, ...pipeline]).toArray();

assert.soon(() => {
    const ops = currentOp([{$match: {prepareReadConflicts: {$gt: 0}}}]);
    return ops.length >= Math.min(prepareConflictThreads.length, kNumWriteTickets);
}, () => `Failed to find prepare conflicts in $currentOp output: ${tojson(currentOp())}`);

hangBeforeWritingDecisionFp.off();

preparedTxnThread.join();
for (let thread of prepareConflictThreads) {
    thread.join();
}

st.stop();
})();