summaryrefslogtreecommitdiff
path: root/jstests/noPassthrough/timeseries_retry_writes.js
blob: b4b43f3721c9dc460f123f126e25aff0dfc943a5 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
/**
 * Tests retrying of time-series insert operations.
 * @tags: [
 *   requires_replication,
 * ]
 */
(function() {
'use strict';

const rst = new ReplSetTest({
    nodes: [
        {},
        {
            // Disallow elections on secondary.
            rsConfig: {
                priority: 0,
                votes: 0,
            },
        },
    ]
});
const nodes = rst.startSet();
rst.initiate();

const primary = rst.getPrimary();

const timeFieldName = 'time';
let collCount = 0;

let retriedCommandsCount = 0;
let retriedStatementsCount = 0;

/**
 * Accepts three arrays of measurements. The first set of measurements is used to create a new
 * bucket. The second and third sets of measurements are used to append to the bucket that was just
 * created. We should see one bucket created in the time-series collection.
 */
const runTest = function(docsInsert, docsUpdateA, docsUpdateB) {
    const session = primary.startSession({retryWrites: true});
    const testDB = session.getDatabase('test');

    const coll = testDB.getCollection('t_' + collCount++);
    const bucketsColl = testDB.getCollection('system.buckets.' + coll.getName());
    coll.drop();

    jsTestLog('Running test: collection: ' + coll.getFullName() + '; bucket collection: ' +
              bucketsColl.getFullName() + '; initial measurements: ' + tojson(docsInsert) +
              '; measurements to append A: ' + tojson(docsUpdateA) +
              '; measurements to append B: ' + tojson(docsUpdateB));

    assert.commandWorked(
        testDB.createCollection(coll.getName(), {timeseries: {timeField: timeFieldName}}));
    assert.contains(bucketsColl.getName(), testDB.getCollectionNames());

    // For retryable writes, the server uses 'txnNumber' as the key to look up previously executed
    // operations in the sesssion.
    assert.commandWorked(
        testDB.runCommand({
            insert: coll.getName(),
            documents: docsInsert,
            lsid: session.getSessionId(),
            txnNumber: NumberLong(0),
        }),
        'failed to create bucket with initial docs (first write): ' + tojson(docsInsert));
    assert.commandWorked(
        testDB.runCommand({
            insert: coll.getName(),
            documents: docsInsert,
            lsid: session.getSessionId(),
            txnNumber: NumberLong(0),
        }),
        'failed to create bucket with initial docs (retry write): ' + tojson(docsInsert));

    assert.commandWorked(testDB.runCommand({
        insert: coll.getName(),
        documents: docsUpdateA,
        lsid: session.getSessionId(),
        txnNumber: NumberLong(1),
    }),
                         'failed to append docs A to bucket (first write): ' + tojson(docsUpdateA));
    assert.commandWorked(testDB.runCommand({
        insert: coll.getName(),
        documents: docsUpdateA,
        lsid: session.getSessionId(),
        txnNumber: NumberLong(1),
    }),
                         'failed to append docs A to bucket (retry write): ' + tojson(docsUpdateA));

    assert.commandWorked(testDB.runCommand({
        insert: coll.getName(),
        documents: docsUpdateB,
        lsid: session.getSessionId(),
        txnNumber: NumberLong(2),
    }),
                         'failed to append docs B to bucket (first write): ' + tojson(docsUpdateB));
    assert.commandWorked(testDB.runCommand({
        insert: coll.getName(),
        documents: docsUpdateB,
        lsid: session.getSessionId(),
        txnNumber: NumberLong(2),
    }),
                         'failed to append docs B to bucket (retry write): ' + tojson(docsUpdateB));

    // This test case ensures that the batch size error handling is consistent with non-time-series
    // collections.
    assert.commandFailedWithCode(testDB.runCommand({
        insert: coll.getName(),
        documents: [],  // No documents
        lsid: session.getSessionId(),
        txnNumber: NumberLong(4),
    }),
                                 ErrorCodes.InvalidLength);

    const docs = docsInsert.concat(docsUpdateA, docsUpdateB);

    // Check view.
    const viewDocs = coll.find({}).sort({_id: 1}).toArray();
    assert.eq(docs.length, viewDocs.length, viewDocs);
    for (let i = 0; i < docs.length; i++) {
        assert.docEq(docs[i], viewDocs[i], 'unexpected doc from view: ' + i);
    }

    // Check bucket collection.
    const bucketDocs = bucketsColl.find().sort({_id: 1}).toArray();
    assert.eq(1, bucketDocs.length, bucketDocs);

    const bucketDoc = bucketDocs[0];
    jsTestLog('Bucket for test collection: ' + coll.getFullName() +
              ': bucket collection: ' + bucketsColl.getFullName() + ': ' + tojson(bucketDoc));

    // Check bucket.
    assert.eq(docs.length,
              Object.keys(bucketDoc.data[timeFieldName]).length,
              'invalid number of measurements in first bucket: ' + tojson(bucketDoc));

    // Keys in data field should match element indexes in 'docs' array.
    for (let i = 0; i < docs.length; i++) {
        assert(bucketDoc.data[timeFieldName].hasOwnProperty(i.toString()),
               'missing element for index ' + i + ' in data field: ' + tojson(bucketDoc));
        assert.eq(docs[i][timeFieldName],
                  bucketDoc.data[timeFieldName][i.toString()],
                  'invalid time for measurement ' + i + ' in data field: ' + tojson(bucketDoc));
    }

    const transactionsServerStatus = testDB.serverStatus().transactions;
    assert.eq(retriedCommandsCount += 3,
              transactionsServerStatus.retriedCommandsCount,
              'Incorrect statistic in db.serverStatus(): ' + tojson(transactionsServerStatus));
    assert.eq(retriedStatementsCount += docs.length,
              transactionsServerStatus.retriedStatementsCount,
              'Incorrect statistic in db.serverStatus(): ' + tojson(transactionsServerStatus));

    session.endSession();
};

const t = [
    ISODate("2021-01-20T00:00:00.000Z"),
    ISODate("2021-01-20T00:10:00.000Z"),
    ISODate("2021-01-20T00:20:00.000Z"),
    ISODate("2021-01-20T00:30:00.000Z"),
    ISODate("2021-01-20T00:40:00.000Z"),
    ISODate("2021-01-20T00:50:00.000Z"),
];

// One measurement per write operation.
runTest([{_id: 0, time: t[0], x: 0}], [{_id: 1, time: t[1], x: 1}], [{_id: 2, time: t[2], x: 2}]);
runTest([{_id: 0, time: t[0], x: 0}, {_id: 1, time: t[1], x: 1}],
        [{_id: 2, time: t[2], x: 2}, {_id: 3, time: t[3], x: 3}],
        [{_id: 4, time: t[4], x: 4}, {_id: 5, time: t[5], x: 5}]);

rst.stopSet();
})();