summaryrefslogtreecommitdiff
path: root/jstests/sharding/txn_addingParticipantParameter.js
blob: 5b6f6dfc350a6f78e057072bdca77162afd64a05 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
/**
 * Tests that additional participants can be added to an existing transaction when the
 * 'featureFlagAdditionalParticipants' is enabled.
 */

(function() {
'use strict';

load("jstests/libs/feature_flag_util.js");  // for FeatureFlagUtil.isEnabled
load("jstests/libs/fail_point_util.js");
load('jstests/sharding/libs/sharded_transactions_helpers.js');

const dbName = "test";
const collName = "foo";
const ns = dbName + "." + collName;

const shard0Name = TestData.configShard ? "config" : "txn_addingParticipantParameter-rs0";
const shard1Name = "txn_addingParticipantParameter-rs1";
const shard2Name = "txn_addingParticipantParameter-rs2";
const shard3Name = "txn_addingParticipantParameter-rs3";

const checkParticipantListMatches = function(
    coordinatorConn, lsid, txnNumber, expectedParticipantList) {
    let coordDoc = coordinatorConn.getDB("config")
                       .getCollection("transaction_coordinators")
                       .findOne({"_id.lsid.id": lsid.id, "_id.txnNumber": txnNumber});
    assert.neq(null, coordDoc);
    assert.sameMembers(coordDoc.participants, expectedParticipantList);
};

const runCommitThroughMongosInParallelShellExpectAbort = function(st, lsid) {
    const runCommitExpectAbortCode = "assert.commandFailedWithCode(db.adminCommand({" +
        "commitTransaction: 1," +
        "lsid: " + tojson(lsid) + "," +
        "txnNumber: NumberLong(1)," +
        "stmtId: NumberInt(0)," +
        "autocommit: false," +
        "})," +
        "ErrorCodes.NoSuchTransaction);";
    return startParallelShell(runCommitExpectAbortCode, st.s.port);
};

const runCommitThroughMongosInParallelShellExpectSuccess = function(st, lsid) {
    const runCommitExpectSuccessCode = "assert.commandWorked(db.adminCommand({" +
        "commitTransaction: 1," +
        "lsid: " + tojson(lsid) + "," +
        "txnNumber: NumberLong(1)," +
        "stmtId: NumberInt(0)," +
        "autocommit: false," +
        "}));";
    return startParallelShell(runCommitExpectSuccessCode, st.s.port);
};

const testAddingParticipant = function(turnFailPointOn, expectedParticipantList, fpData = {}) {
    let st = new ShardingTest({shards: 4, causallyConsistent: true});

    // SERVER-67748
    const featureFlagAdditionalParticipants = FeatureFlagUtil.isEnabled(
        st.configRS.getPrimary().getDB('admin'), "AdditionalParticipants");
    if (!featureFlagAdditionalParticipants) {
        jsTestLog("Skipping as featureFlagAdditionalParticipants is  not enabled");
        st.stop();
        return;
    }

    let coordinator = st.shard0;
    let participant1 = st.shard1;
    let shard2 = st.shard2;

    let lsid = {id: UUID()};

    const session = st.s.startSession();
    const sessionDB = session.getDatabase(dbName);

    if (turnFailPointOn) {
        // Turn on failpoint so that the shard1 writes the participant paramenter in its
        // response body to transaction router.
        configureFailPoint(
            participant1, "includeAdditionalParticipantInResponse", fpData, "alwaysOn");
    }

    // Create a sharded collection with a chunk on each shard:
    // shard0: [-inf, 0)
    // shard1: [0, 10)
    // shard2: [10, +inf)
    assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
    assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: coordinator.shardName}));
    assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
    assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
    assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 10}}));
    assert.commandWorked(
        st.s.adminCommand({moveChunk: ns, find: {_id: 0}, to: participant1.shardName}));
    assert.commandWorked(st.s.adminCommand({moveChunk: ns, find: {_id: 10}, to: shard2.shardName}));

    // These forced refreshes are not strictly necessary; they just prevent extra TXN log lines
    // from the shards starting, aborting, and restarting the transaction due to needing to
    // refresh after the transaction has started.
    assert.commandWorked(coordinator.adminCommand({_flushRoutingTableCacheUpdates: ns}));
    assert.commandWorked(participant1.adminCommand({_flushRoutingTableCacheUpdates: ns}));
    assert.commandWorked(shard2.adminCommand({_flushRoutingTableCacheUpdates: ns}));
    st.refreshCatalogCacheForNs(st.s, ns);

    // Start a new transaction by inserting a document onto shard0 and shard1.
    assert.commandWorked(sessionDB.runCommand({
        insert: collName,
        documents: [{_id: -5}, {_id: 5}],
        lsid: lsid,
        txnNumber: NumberLong(1),
        stmtId: NumberInt(0),
        startTransaction: true,
        autocommit: false,
    }));

    let awaitResult;
    if (turnFailPointOn) {
        // Since feature flag was enabled, we expect shard2 was added to the list of participants
        // stored on the coordinator and the commit to fail because of this new participant.
        awaitResult = runCommitThroughMongosInParallelShellExpectAbort(st, lsid);
    } else {
        awaitResult = runCommitThroughMongosInParallelShellExpectSuccess(st, lsid);
    }

    // Turn on failpoints so that the coordinator hangs after each write it does, so that the
    // test can check that the write happened correctly.
    const hangBeforeWaitingForParticipantListWriteConcernFp = configureFailPoint(
        coordinator, "hangBeforeWaitingForParticipantListWriteConcern", {}, "alwaysOn");

    // Check that the coordinator wrote the participant list.
    hangBeforeWaitingForParticipantListWriteConcernFp.wait();
    checkParticipantListMatches(coordinator, lsid, 1, expectedParticipantList);
    hangBeforeWaitingForParticipantListWriteConcernFp.off();

    awaitResult();

    if (turnFailPointOn) {
        // Check that the transaction aborted as expected.
        jsTest.log("Verify that the transaction was aborted on all shards.");
        assert.eq(0, st.s.getDB(dbName).getCollection(collName).find().itcount());
    }

    st.s.getDB(dbName).getCollection(collName).drop();
    st.stop();
};

jsTestLog("===Additional Participants Fail Point is OFF===");

let expectedParticipantListNormal = [shard0Name, shard1Name];
testAddingParticipant(false, expectedParticipantListNormal);

jsTestLog("===Additional Participants Fail Point is ON===");

print("Adding one additional participant:");
const fpDataOne = {
    "cmdName": "insert",
    "ns": ns,
    "shardId": ["txn_addingParticipantParameter-rs2"]
};
let expectedParticipantListOne = [shard0Name, shard1Name, shard2Name];
testAddingParticipant(true, expectedParticipantListOne, fpDataOne);

print("Adding multiple additional participants:");
const fpDataMultiple = {
    "cmdName": "insert",
    "ns": ns,
    "shardId": ["txn_addingParticipantParameter-rs2", "txn_addingParticipantParameter-rs3"]
};
let expectedParticipantListMultiple = [shard0Name, shard1Name, shard2Name, shard3Name];
testAddingParticipant(true, expectedParticipantListMultiple, fpDataMultiple);
})();