summaryrefslogtreecommitdiff
path: root/jstests/replsets/libs/apply_ops_concurrent_non_atomic.js
blob: 7e36cfa667770c392714c1d53e32ab1e8a3ba688 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
/**
 * This test ensures that multiple non-atomic applyOps commands can run concurrently.
 * Prior to SERVER-29802, applyOps would acquire the global lock regardless of the
 * atomicity of the operations (as a whole) being applied.
 *
 * Every instance of ApplyOpsConcurrentNonAtomicTest is configured with an "options" document
 * with the following format:
 * {
 *     ns1: <string>,
 *     ns1: <string>,
 *     requiresDocumentLevelConcurrency: <bool>,
 * }
 *
 * ns1:
 *     Fully qualified namespace of first set of CRUD operations. For simplicity, only insert
 *     operations will be used. The set of documents generated for the inserts into ns1 will have
 *     _id values distinct from those generated for ns2.
 *
 * ns2:
 *     Fully qualified namespace of second set of CRUD operations. This may be the same namespace as
 *     ns1. As with ns1, only insert operations will be used.
 *
 * requiresDocumentLevelConcurrency:
 *     Set to true if this test case can only be run with a storage engine that supports document
 *     level concurrency.
 */
var ApplyOpsConcurrentNonAtomicTest = function(options) {
    'use strict';

    load('jstests/concurrency/fsm_workload_helpers/server_types.js');

    if (!(this instanceof ApplyOpsConcurrentNonAtomicTest)) {
        return new ApplyOpsConcurrentNonAtomicTest(options);
    }

    // Capture the 'this' reference
    var self = this;

    self.options = options;

    /**
     * Logs message using test name as prefix.
     */
    function testLog(message) {
        jsTestLog('ApplyOpsConcurrentNonAtomicTest: ' + message);
    }

    /**
     * Creates an array of insert operations for applyOps into collection 'coll'.
     */
    function generateInsertOps(coll, numOps, id) {
        // Explicit 'use strict' to prevent mozjs from injecting its own "use strict" directive
        // (with incorrect indentation) when we convert this function into a string for
        // startParallelShell().
        'use strict';
        const ops = Array(numOps).fill('ignored').map((unused, i) => {
            return {op: 'i', ns: coll.getFullName(), o: {_id: (id * numOps + i), id: id}};
        });
        return ops;
    }

    /**
     * Runs applyOps in non-atomic mode to insert 'numOps' documents into collection 'coll'.
     */
    function applyOpsInsertNonAtomic(coll, numOps, id) {
        'use strict';
        const ops = generateInsertOps(coll, numOps, id);
        const mydb = coll.getDB();
        assert.commandWorked(mydb.runCommand({applyOps: ops, allowAtomic: false}),
                             'failed to insert documents into ' + coll.getFullName());
    }

    /**
     * Parses 'numOps' and collection namespace from 'options' and runs applyOps to inserted
     * generated documents.
     *
     * options format:
     * {
     *     ns: <string>,
     *     numOps: <int>,
     *     id: <int>,
     * }
     *
     * ns:
     *     Fully qualified namespace of collection to insert documents into.
     *
     * numOps:
     *     Number of insert operations to generate for applyOps command.
     *
     * id:
     *     Index of collection for applyOps. Used with 'numOps' to generate _id values that will not
     *     collide with collections with different indexes.
     */
    function insertFunction(options) {
        'use strict';

        const coll = db.getMongo().getCollection(options.ns);
        const numOps = options.numOps;
        const id = options.id;

        testLog('Starting to apply ' + numOps + ' operations in collection ' + coll.getFullName());
        applyOpsInsertNonAtomic(coll, numOps, id);
        testLog('Successfully applied ' + numOps + ' operations in collection ' +
                coll.getFullName());
    }

    /**
     * Creates a function for startParallelShell() to run that will insert documents into
     * collection 'coll' using applyOps.
     */
    function createInsertFunction(coll, numOps, id) {
        const options = {
            ns: coll.getFullName(),
            numOps: numOps,
            id: id,
        };
        const functionName = 'insertFunction_' + coll.getFullName().replace(/\./g, '_');
        const s =                                                                     //
            '\n\n' +                                                                  //
            'const testLog = ' + testLog + ';\n\n' +                                  //
            'const generateInsertOps = ' + generateInsertOps + ';\n\n' +              //
            'const applyOpsInsertNonAtomic = ' + applyOpsInsertNonAtomic + ';\n\n' +  //
            'const ' + functionName + ' = ' + insertFunction + ';\n\n' +              //
            functionName + '(' + tojson(options) + ');';                              //
        return s;
    }

    /**
     * Returns number of insert operations reported by serverStatus.
     * Depending on the server version, applyOps may increment either 'opcounters' or
     * 'opcountersRepl':
     *     since 3.6: 'opcounters.insert'
     *     3.4 and older: 'opcountersRepl.insert'
     */
    function getInsertOpCount(serverStatus) {
        return (serverStatus.version.substr(0, 3) === "3.4") ? serverStatus.opcountersRepl.insert
                                                             : serverStatus.opcounters.insert;
    }

    /**
     * Runs the test.
     */
    this.run = function() {
        const options = this.options;

        assert(options.ns1, 'collection 1 namespace not provided');
        assert(options.ns2, 'collection 2 namespace not provided');

        const replTest = new ReplSetTest({nodes: 1});
        replTest.startSet();
        replTest.initiate();

        const primary = replTest.getPrimary();
        const adminDb = primary.getDB('admin');

        if (options.requiresDocumentLevelConcurrency &&
            !supportsDocumentLevelConcurrency(adminDb)) {
            testLog('Skipping test because storage engine does not support document level ' +
                    'concurrency.');
            return;
        }

        const coll1 = primary.getCollection(options.ns1);
        const db1 = coll1.getDB();
        const coll2 = primary.getCollection(options.ns2);
        const db2 = coll2.getDB();

        assert.commandWorked(db1.createCollection(coll1.getName()));
        if (coll1.getFullName() !== coll2.getFullName()) {
            assert.commandWorked(db2.createCollection(coll2.getName()));
        }

        // Enable fail point to pause applyOps between operations.
        assert.commandWorked(primary.adminCommand(
            {configureFailPoint: 'applyOpsPauseBetweenOperations', mode: 'alwaysOn'}));

        // This logs each operation being applied.
        const previousLogLevel =
            assert.commandWorked(primary.setLogLevel(3, 'replication')).was.replication.verbosity;

        testLog('Applying operations in collections ' + coll1.getFullName() + ' and ' +
                coll2.getFullName());

        const numOps = 100;
        const insertProcess1 =
            startParallelShell(createInsertFunction(coll1, numOps, 0), replTest.getPort(0));
        const insertProcess2 =
            startParallelShell(createInsertFunction(coll2, numOps, 1), replTest.getPort(0));

        // The fail point will prevent applyOps from advancing past the first operation in each
        // batch of operations. If applyOps is applying both sets of operations concurrently without
        // holding the global lock, the insert opcounter will eventually be incremented to 2.
        try {
            let insertOpCount = 0;
            const expectedFinalOpCount = 2;
            assert.soon(
                function() {
                    const serverStatus = adminDb.serverStatus();
                    insertOpCount = getInsertOpCount(serverStatus);
                    // This assertion may fail if the fail point is not implemented correctly within
                    // applyOps. This allows us to fail fast instead of waiting for the
                    // assert.soon() function to time out.
                    assert.lte(insertOpCount,
                               expectedFinalOpCount,
                               'Expected at most ' + expectedFinalOpCount +
                                   ' documents inserted with fail point enabled. ' +
                                   'Most recent insert operation count = ' + insertOpCount);
                    return insertOpCount === expectedFinalOpCount;
                },
                'Insert operation count did not reach ' + expectedFinalOpCount +
                    ' as expected with fail point enabled. Most recent insert operation count = ' +
                    insertOpCount);
        } finally {
            assert.commandWorked(primary.adminCommand(
                {configureFailPoint: 'applyOpsPauseBetweenOperations', mode: 'off'}));
        }

        insertProcess1();
        insertProcess2();

        testLog('Successfully applied operations in collections ' + coll1.getFullName() + ' and ' +
                coll2.getFullName());

        // Reset log level.
        primary.setLogLevel(previousLogLevel, 'replication');

        const serverStatus = adminDb.serverStatus();
        assert.eq(200,
                  getInsertOpCount(serverStatus),
                  'incorrect number of insert operations in server status after applyOps: ' +
                      tojson(serverStatus));

        replTest.stopSet();
    };
};