summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/router_transaction_current_op.js
blob: e1f74cad3e749f85e56fedce21d1358d13a4ae96 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
// Verifies currentOp returns the expected fields for idle and active transactions in basic cases.
// More cases are covered in unit tests.
// @tags: [uses_transactions, uses_atclustertime]
(function() {
"use strict";

load("jstests/libs/curop_helpers.js");   // For waitForCurOpByFailPoint().
load("jstests/libs/parallelTester.js");  // for Thread.

function verifyCurrentOpFields(res, isActive) {
    // Verify top level fields relevant to transactions. Note this does not include every field, so
    // the number of fields in the response shouldn't be asserted on.

    const expectedFields = [
        "type",
        "host",
        "desc",
        "connectionId",
        "client",
        "appName",
        "clientMetadata",
        "active",
        "lsid",
        "transaction",
    ];

    assert.hasFields(res, expectedFields, tojson(res));

    if (isActive) {
        assert.eq(res.type, "op", tojson(res));
    } else {
        assert.eq(res.type, "idleSession", tojson(res));
        assert.eq(res.desc, "inactive transaction", tojson(res));
    }

    // Verify the transactions sub object.

    const transaction = res.transaction;
    const expectedTransactionsFields = [
        "parameters",
        "startWallClockTime",
        "timeOpenMicros",
        "timeActiveMicros",
        "timeInactiveMicros",
        "globalReadTimestamp",
        "numParticipants",
        "participants",
        "numNonReadOnlyParticipants",
        "numReadOnlyParticipants",
        // Commit hasn't started so don't expect 'commitStartWallClockTime' or 'commitType'.
    ];

    assert.hasFields(transaction, expectedTransactionsFields, tojson(transaction));
    assert.eq(
        expectedTransactionsFields.length, Object.keys(transaction).length, tojson(transaction));

    // Verify transaction parameters sub object.

    const parameters = transaction.parameters;
    const expectedParametersFields = [
        "txnNumber",
        "txnRetryCounter",
        "autocommit",
        "readConcern",
    ];

    assert.hasFields(parameters, expectedParametersFields, tojson(parameters));
    assert.eq(expectedParametersFields.length, Object.keys(parameters).length, tojson(parameters));

    // Verify participants sub array.

    const participants = transaction.participants;
    const expectedParticipantFields = [
        "name",
        "coordinator",
        // 'readOnly' will not be set until a response has been received from that participant, so
        // it will not be present for the active transaction because of the failpoint and is handled
        // specially.
    ];

    participants.forEach((participant) => {
        assert.hasFields(participant, expectedParticipantFields, tojson(participant));
        if (isActive) {
            // 'readOnly' should not be set.
            assert.eq(expectedParticipantFields.length,
                      Object.keys(participant).length,
                      tojson(participant));
        } else {
            // 'readOnly' should always be set for the inactive transaction.
            assert.hasFields(participant, ["readOnly"], tojson(participant));
            assert.eq(expectedParticipantFields.length + 1,  // +1 for readOnly.
                      Object.keys(participant).length,
                      tojson(participant));
        }
    });
}

function getCurrentOpForFilter(st, matchFilter) {
    const res = st.s.getDB("admin")
                    .aggregate([{$currentOp: {localOps: true}}, {$match: matchFilter}])
                    .toArray();
    assert.eq(1, res.length, res);
    return res[0];
}

const dbName = "test";
const collName = "foo";
const st = new ShardingTest({shards: 1, config: 1});

const session = st.s.startSession();
const sessionDB = session.getDatabase(dbName);

// Insert a document to set up a collection.
assert.commandWorked(sessionDB[collName].insert({x: 1}));

jsTest.log("Inactive transaction.");
(() => {
    session.startTransaction({readConcern: {level: "snapshot"}});
    assert.eq(1, sessionDB[collName].find({x: 1}).itcount());

    const res = getCurrentOpForFilter(st, {"lsid.id": session.getSessionId().id});
    verifyCurrentOpFields(res, false /* isActive */);

    assert.commandWorked(session.abortTransaction_forTesting());
})();

jsTest.log("Active transaction.");
(() => {
    assert.commandWorked(st.rs0.getPrimary().adminCommand(
        {configureFailPoint: "waitInFindBeforeMakingBatch", mode: "alwaysOn"}));

    const txnThread = new Thread(function(host, dbName, collName) {
        const mongosConn = new Mongo(host);
        const threadSession = mongosConn.startSession();

        threadSession.startTransaction({readConcern: {level: "snapshot"}});
        assert.commandWorked(threadSession.getDatabase(dbName).runCommand(
            {find: collName, filter: {}, comment: "active_txn_find"}));

        assert.commandWorked(threadSession.abortTransaction_forTesting());
        threadSession.endSession();
    }, st.s.host, dbName, collName);
    txnThread.start();

    // Wait until we know the failpoint has been reached.
    waitForCurOpByFailPointNoNS(st.rs0.getPrimary().getDB("admin"), "waitInFindBeforeMakingBatch");

    // We don't know the id of the session started by the parallel thread, so use the find's comment
    // to get its currentOp output.
    const res = getCurrentOpForFilter(st, {"command.comment": "active_txn_find"});
    verifyCurrentOpFields(res, true /* isActive */);

    assert.commandWorked(st.rs0.getPrimary().adminCommand(
        {configureFailPoint: "waitInFindBeforeMakingBatch", mode: "off"}));
    txnThread.join();
})();

session.endSession();
st.stop();
}());