summaryrefslogtreecommitdiff
path: root/jstests/concurrency/fsm_libs/worker_thread.js
blob: 0d70add109d3ad7fb2b2129daaefe49de5336562 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
'use strict';

load('jstests/concurrency/fsm_libs/assert.js');
load('jstests/concurrency/fsm_libs/cluster.js');       // for Cluster.isStandalone
load('jstests/concurrency/fsm_libs/parse_config.js');  // for parseConfig
load('jstests/libs/specific_secondary_reader_mongo.js');

var workerThread = (function() {

    // workloads = list of workload filenames
    // args.tid = the thread identifier
    // args.data = map of workload -> 'this' parameter passed to the FSM state functions
    // args.host = the address to make a new connection to
    // args.latch = CountDownLatch instance for starting all threads
    // args.dbName = the database name
    // args.collName = the collection name
    // args.cluster = connection strings for all cluster nodes (see cluster.js for format)
    // args.clusterOptions = the configuration of the cluster
    // args.seed = seed for the random number generator
    // args.globalAssertLevel = the global assertion level to use
    // args.errorLatch = CountDownLatch instance that threads count down when they error
    // args.sessionOptions = the options to start a session with
    // args.testData = TestData object
    // run = callback that takes a map of workloads to their associated $config
    function main(workloads, args, run) {
        var myDB;
        var configs = {};
        var connectionString = 'mongodb://' + args.host + '/?appName=tid:' + args.tid;
        if (typeof args.replSetName !== 'undefined') {
            connectionString += '&replicaSet=' + args.replSetName;
        }

        globalAssertLevel = args.globalAssertLevel;

        // The global 'TestData' object may still be undefined if the concurrency suite isn't being
        // run by resmoke.py (e.g. if it is being run via a parallel shell in the backup/restore
        // tests).
        TestData = (args.testData !== undefined) ? args.testData : {};

        try {
            if (typeof db !== 'undefined') {
                // The implicit database connection created within the thread's scope
                // is unneeded, so forcibly clean it up.
                db = null;
                gc();
            }

            let mongo;
            if (TestData.pinningSecondary) {
                mongo = new SpecificSecondaryReaderMongo(connectionString, args.secondaryHost);
            } else {
                mongo = new Mongo(connectionString);
            }

            if (typeof args.sessionOptions !== 'undefined') {
                let initialClusterTime;
                let initialOperationTime;

                // JavaScript objects backed by C++ objects (e.g. BSON values from a command
                // response) do not serialize correctly when passed through the ScopedThread
                // constructor. To work around this behavior, we instead pass a stringified form
                // of the JavaScript object through the ScopedThread constructor and use eval()
                // to rehydrate it.
                if (typeof args.sessionOptions.initialClusterTime === 'string') {
                    initialClusterTime = eval('(' + args.sessionOptions.initialClusterTime + ')');

                    // The initialClusterTime property was removed from SessionOptions in a
                    // later revision of the Driver's specification, so we remove the property
                    // and call advanceClusterTime() ourselves.
                    delete args.sessionOptions.initialClusterTime;
                }

                if (typeof args.sessionOptions.initialOperationTime === 'string') {
                    initialOperationTime =
                        eval('(' + args.sessionOptions.initialOperationTime + ')');

                    // The initialOperationTime property was removed from SessionOptions in a
                    // later revision of the Driver's specification, so we remove the property
                    // and call advanceOperationTime() ourselves.
                    delete args.sessionOptions.initialOperationTime;
                }

                const session = mongo.startSession(args.sessionOptions);
                const readPreference = session.getOptions().getReadPreference();
                if (readPreference && readPreference.mode === 'secondary') {
                    // Unset the explicit read preference so set_read_preference_secondary.js can do
                    // the right thing based on the DB.
                    session.getOptions().setReadPreference(undefined);

                    // We load() set_read_preference_secondary.js in order to avoid running
                    // commands against the "admin" and "config" databases via mongos with
                    // readPreference={mode: "secondary"} when there's only a single node in
                    // the CSRS.
                    load('jstests/libs/override_methods/set_read_preference_secondary.js');
                }

                if (typeof initialClusterTime !== 'undefined') {
                    session.advanceClusterTime(initialClusterTime);
                }

                if (typeof initialOperationTime !== 'undefined') {
                    session.advanceOperationTime(initialOperationTime);
                }

                myDB = session.getDatabase(args.dbName);
            } else {
                myDB = mongo.getDB(args.dbName);
            }

            {
                let connectionDesc = '';
                // In sharded environments, mongos is acting as a proxy for the mongo shell and
                // therefore has a different outbound port than the 'whatsmyuri' command returns.
                if (!Cluster.isSharded(args.clusterOptions)) {
                    let res = assert.commandWorked(myDB.runCommand({whatsmyuri: 1}));
                    const myUri = res.you;

                    res = assert.commandWorked(myDB.adminCommand({currentOp: 1, client: myUri}));
                    connectionDesc = ', conn:' + res.inprog[0].desc;
                }

                const printOriginal = print;
                print = function() {
                    const printArgs = Array.from(arguments);
                    const prefix = '[tid:' + args.tid + connectionDesc + ']';
                    printArgs.unshift(prefix);
                    return printOriginal.apply(this, printArgs);
                };
            }

            if (Cluster.isReplication(args.clusterOptions)) {
                if (args.clusterOptions.hasOwnProperty('sharded') &&
                    args.clusterOptions.sharded.hasOwnProperty('stepdownOptions') &&
                    args.clusterOptions.sharded.stepdownOptions.shardStepdown) {
                    const newOptions = {
                        alwaysInjectTransactionNumber: true,
                        defaultReadConcernLevel: "majority",
                        logRetryAttempts: true,
                        overrideRetryAttempts: 3
                    };
                    Object.assign(TestData, newOptions);

                    load('jstests/libs/override_methods/auto_retry_on_network_error.js');

                    // After a step-up to primary, background index builds started on a secondary
                    // may not be complete. Use this override to ensure causality.
                    load('jstests/libs/override_methods/causally_consistent_index_builds.js');
                }

                // Operations that run after a "dropDatabase" command has been issued may fail with
                // a "DatabaseDropPending" error response if they would create a new collection on
                // that database while we're waiting for a majority of nodes in the replica set to
                // confirm it has been dropped. We load the
                // implicitly_retry_on_database_drop_pending.js file to make it so that the clients
                // started by the concurrency framework automatically retry their operation in the
                // face of this particular error response.
                load('jstests/libs/override_methods/implicitly_retry_on_database_drop_pending.js');
            }

            if (TestData.defaultReadConcernLevel || TestData.defaultWriteConcern) {
                load('jstests/libs/override_methods/set_read_and_write_concerns.js');
            }

            workloads.forEach(function(workload) {
                load(workload);                     // for $config
                var config = parseConfig($config);  // to normalize

                // Copy any modifications that were made to $config.data
                // during the setup function of the workload (see caveat
                // below).

                // XXX: Changing the order of extend calls causes problems
                // for workloads that reference $super.
                // Suppose you have workloads A and B, where workload B extends
                // workload A. The $config.data of workload B can define a
                // function that closes over the $config object of workload A
                // (known as $super to workload B). This reference is lost when
                // the config object is serialized to BSON, which results in
                // undefined variables in the derived workload.
                var data = Object.extend({}, args.data[workload], true);
                data = Object.extend(data, config.data, true);

                // Object.extend() defines all properties added to the destination object as
                // configurable, enumerable, and writable. To prevent workloads from changing
                // the iterations and threadCount properties in their state functions, we redefine
                // them here as non-configurable, non-enumerable, and non-writable.
                Object.defineProperties(data, {
                    'iterations': {
                        configurable: false,
                        enumerable: false,
                        writable: false,
                        value: data.iterations
                    },
                    'threadCount': {
                        configurable: false,
                        enumerable: false,
                        writable: false,
                        value: data.threadCount
                    }
                });

                data.tid = args.tid;
                configs[workload] = {
                    data: data,
                    db: myDB,
                    collName: args.collName,
                    cluster: args.cluster,
                    iterations: data.iterations,
                    passConnectionCache: config.passConnectionCache,
                    startState: config.startState,
                    states: config.states,
                    transitions: config.transitions
                };
            });

            args.latch.countDown();

            // Converts any exceptions to a return status. In order for the
            // parent thread to call countDown() on our behalf, we must throw
            // an exception. Nothing prior to (and including) args.latch.countDown()
            // should be wrapped in a try/catch statement.
            try {
                args.latch.await();  // wait for all threads to start

                Random.setRandomSeed(args.seed);
                run(configs);
                return {ok: 1};
            } catch (e) {
                args.errorLatch.countDown();
                return {
                    ok: 0,
                    err: e.toString(),
                    stack: e.stack,
                    tid: args.tid,
                    workloads: workloads,
                };
            }
        } finally {
            // Avoid retention of connection object
            configs = null;
            myDB = null;
            gc();
        }
    }

    return {main: main};
})();