1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
|
// Helper functions for testing index builds.
load("jstests/libs/fail_point_util.js");
load("jstests/libs/parallel_shell_helpers.js");
load("jstests/libs/uuid_util.js");
var IndexBuildTest = class {
/**
* Starts an index build in a separate mongo shell process with given options.
* Ensures the index build worked or failed with one of the expected failures.
*/
static startIndexBuild(conn, ns, keyPattern, options, expectedFailures) {
options = options || {};
expectedFailures = expectedFailures || [];
if (Array.isArray(keyPattern)) {
return startParallelShell(
'const coll = db.getMongo().getCollection("' + ns + '");' +
'assert.commandWorkedOrFailedWithCode(coll.createIndexes(' +
JSON.stringify(keyPattern) + ', ' + tojson(options) + '), ' +
JSON.stringify(expectedFailures) + ');',
conn.port);
} else {
return startParallelShell('const coll = db.getMongo().getCollection("' + ns + '");' +
'assert.commandWorkedOrFailedWithCode(coll.createIndex(' +
tojson(keyPattern) + ', ' + tojson(options) + '), ' +
JSON.stringify(expectedFailures) + ');',
conn.port);
}
}
/**
* Returns the op id for the running index build on the provided 'collectionName' and
* 'indexName', or any index build if either is undefined. Returns -1 if there is no current
* index build.
* Accepts optional filter that can be used to customize the db.currentOp() query.
*/
static getIndexBuildOpId(database, collectionName, indexName, filter) {
let pipeline = [{$currentOp: {allUsers: true, idleConnections: true}}];
if (filter) {
pipeline.push({$match: filter});
}
const result = database.getSiblingDB("admin")
.aggregate(pipeline, {readConcern: {level: "local"}})
.toArray();
let indexBuildOpId = -1;
let indexBuildObj = {};
let indexBuildNamespace = "";
result.forEach(function(op) {
if (op.op != 'command') {
return;
}
const cmdBody = op.command;
if (cmdBody.createIndexes === undefined) {
return;
}
// If no collection is provided, return any index build.
if (!collectionName || cmdBody.createIndexes === collectionName) {
cmdBody.indexes.forEach((index) => {
if (!indexName || index.name === indexName) {
indexBuildOpId = op.opid;
indexBuildObj = index;
indexBuildNamespace = op.ns;
}
});
}
});
if (indexBuildOpId != -1) {
jsTestLog("found in progress index build: " + tojson(indexBuildObj) + " on namespace " +
indexBuildNamespace + " opid: " + indexBuildOpId);
}
return indexBuildOpId;
}
/**
* Wait for index build to start and return its op id.
* Accepts optional filter that can be used to customize the db.currentOp() query.
* The filter may be necessary in situations when the index build is delegated to a thread pool
* managed by the IndexBuildsCoordinator and it is necessary to differentiate between the
* client connection thread and the IndexBuildsCoordinator thread actively building the index.
*/
static waitForIndexBuildToStart(database, collectionName, indexName, filter) {
let opId;
assert.soon(function() {
return (opId = IndexBuildTest.getIndexBuildOpId(
database, collectionName, indexName, filter)) !== -1;
}, "Index build operation not found after starting via parallelShell");
return opId;
}
/**
* Wait for index build to begin its collection scan phase and return its op id.
*/
static waitForIndexBuildToScanCollection(database, collectionName, indexName) {
// The collection scan is the only phase of an index build that uses a progress meter.
// Since the progress meter can be detected in the db.currentOp() output, we will use this
// information to determine when we are scanning the collection during the index build.
const filter = {
progress: {$exists: true},
};
return IndexBuildTest.waitForIndexBuildToStart(database, collectionName, indexName, filter);
}
/**
* Wait for all index builds to stop and return its op id.
*/
static waitForIndexBuildToStop(database, collectionName, indexName) {
assert.soon(function() {
return IndexBuildTest.getIndexBuildOpId(database, collectionName, indexName) === -1;
}, "Index build operations still running after unblocking or killOp");
}
/**
* Checks the db.currentOp() output for the index build with opId.
*
* An optional 'onOperationFn' callback accepts an operation to perform any additional checks.
*/
static assertIndexBuildCurrentOpContents(database, opId, onOperationFn) {
const inprog = database.currentOp({opid: opId, "$all": true}).inprog;
assert.eq(1,
inprog.length,
'unable to find opid ' + opId +
' in currentOp() result: ' + tojson(database.currentOp()));
const op = inprog[0];
assert.eq(opId, op.opid, 'db.currentOp() returned wrong index build info: ' + tojson(op));
if (onOperationFn) {
onOperationFn(op);
}
}
/**
* Runs listIndexes command on collection.
* If 'options' is provided, these will be sent along with the command request.
* Asserts that all the indexes on this collection fit within the first batch of results.
* Returns a map of index specs keyed by name.
*/
static assertIndexes(coll, numIndexes, readyIndexes, notReadyIndexes, options) {
notReadyIndexes = notReadyIndexes || [];
options = options || {};
let res = assert.commandWorked(coll.runCommand("listIndexes", options));
assert.eq(numIndexes,
res.cursor.firstBatch.length,
'unexpected number of indexes in collection: ' + tojson(res));
// First batch contains all the indexes in the collection.
assert.eq(0, res.cursor.id);
// A map of index specs keyed by index name.
const indexMap = res.cursor.firstBatch.reduce((m, spec) => {
if (spec.hasOwnProperty('buildUUID')) {
m[spec.spec.name] = spec;
} else {
m[spec.name] = spec;
}
return m;
}, {});
// Check ready indexes.
for (let name of readyIndexes) {
assert(indexMap.hasOwnProperty(name),
'ready index ' + name + ' missing from listIndexes result: ' + tojson(res));
const spec = indexMap[name];
assert(!spec.hasOwnProperty('buildUUID'),
'unexpected buildUUID field in ' + name + ' index spec: ' + tojson(spec));
}
// Check indexes that are not ready.
for (let name of notReadyIndexes) {
assert(indexMap.hasOwnProperty(name),
'not-ready index ' + name + ' missing from listIndexes result: ' + tojson(res));
const spec = indexMap[name];
if (options.includeBuildUUIDs) {
assert(spec.hasOwnProperty('spec'),
'expected spec field in ' + name + ': ' + tojson(spec));
assert(spec.hasOwnProperty('buildUUID'),
'expected buildUUID field in ' + name + ': ' + tojson(spec));
} else {
assert(!spec.hasOwnProperty('buildUUID'),
'unexpected buildUUID field in ' + name + ' index spec: ' + tojson(spec));
}
}
return indexMap;
}
/**
* Prevent subsequent index builds from running to completion.
*/
static pauseIndexBuilds(conn) {
assert.commandWorked(conn.adminCommand(
{configureFailPoint: 'hangAfterStartingIndexBuild', mode: 'alwaysOn'}));
}
/**
* Unblock current and subsequent index builds.
*/
static resumeIndexBuilds(conn) {
assert.commandWorked(
conn.adminCommand({configureFailPoint: 'hangAfterStartingIndexBuild', mode: 'off'}));
}
/**
* Returns true if majority commit quorum is supported by two phase index builds.
*/
static indexBuildCommitQuorumEnabled(conn) {
return assert
.commandWorked(conn.adminCommand({getParameter: 1, enableIndexBuildCommitQuorum: 1}))
.enableIndexBuildCommitQuorum;
}
};
const ResumableIndexBuildTest = class {
/**
* Returns whether resumable index builds are supported.
*/
static resumableIndexBuildsEnabled(conn) {
return assert
.commandWorked(conn.adminCommand({getParameter: 1, enableResumableIndexBuilds: 1}))
.enableResumableIndexBuilds;
}
/**
* Restarts the given node, ensuring that the the index build with name indexName has its state
* written to disk upon shutdown and is completed upon startup.
*/
static restart(rst, conn, coll, indexName, failPointName) {
clearRawMongoProgramOutput();
const buildUUID = extractUUIDFromObject(
IndexBuildTest
.assertIndexes(coll, 2, ["_id_"], [indexName], {includeBuildUUIDs: true})[indexName]
.buildUUID);
const disableFailPoint = function(failPointName, buildUUID) {
// Wait for the log message that the index build has failed due to the node being shut
// down.
checkLog.containsJson(db.getMongo(), 20449, {
buildUUID: function(uuid) {
return uuid["uuid"]["$uuid"] === buildUUID;
},
error: function(error) {
return error.code === ErrorCodes.InterruptedDueToReplStateChange;
}
});
// Once the index build has failed, disable the failpoint so that shutdown can proceed.
assert.commandWorked(db.adminCommand({configureFailPoint: failPointName, mode: "off"}));
};
const awaitDisableFailPoint =
startParallelShell(funWithArgs(disableFailPoint, failPointName, buildUUID), conn.port);
rst.stop(conn);
awaitDisableFailPoint();
// Ensure that the resumable index build state was written to disk upon clean shutdown.
assert(RegExp("4841502.*" + buildUUID).test(rawMongoProgramOutput()));
rst.start(conn, {noCleanData: true});
// Ensure that the index build was completed upon the node starting back up.
checkLog.containsJson(conn, 20663, {
buildUUID: function(uuid) {
return uuid["uuid"]["$uuid"] === buildUUID;
},
namespace: coll.getFullName()
});
IndexBuildTest.assertIndexes(coll, 2, ["_id_", indexName]);
}
/**
* Runs the resumable index build test specified by the provided failpoint information and
* index spec on the provided replica set and namespace. Document(s) specified by
* insertIntoSideWritesTable will be inserted after the bulk load phase so that they are
* inserted into the side writes table and processed during the drain writes phase.
*/
static run(
rst, dbName, collName, indexSpec, failPointName, failPointData, insertIntoSideWritesTable) {
const primary = rst.getPrimary();
const coll = primary.getDB(dbName).getCollection(collName);
const indexName = "resumable_index_build";
const fp = configureFailPoint(primary, failPointName, failPointData);
const createIndex = function(collName, indexSpec, indexName) {
assert.commandFailedWithCode(
db.getCollection(collName).createIndex(indexSpec, {name: indexName}),
ErrorCodes.InterruptedDueToReplStateChange);
};
const awaitCreateIndex = startParallelShell(
funWithArgs(createIndex, coll.getName(), indexSpec, indexName), primary.port);
if (insertIntoSideWritesTable) {
const sideWritesFp =
configureFailPoint(primary, "hangAfterIndexBuildDumpsInsertsFromBulk");
sideWritesFp.wait();
assert.commandWorked(coll.insert(insertIntoSideWritesTable));
sideWritesFp.off();
}
fp.wait();
ResumableIndexBuildTest.restart(rst, primary, coll, indexName, failPointName);
awaitCreateIndex();
assert.commandWorked(coll.dropIndex(indexName));
}
};
|