diff options
Diffstat (limited to 'jstests/replsets/interrupted_batch_insert.js')
-rw-r--r-- | jstests/replsets/interrupted_batch_insert.js | 126 |
1 files changed, 126 insertions, 0 deletions
diff --git a/jstests/replsets/interrupted_batch_insert.js b/jstests/replsets/interrupted_batch_insert.js new file mode 100644 index 00000000000..b55214af05f --- /dev/null +++ b/jstests/replsets/interrupted_batch_insert.js @@ -0,0 +1,126 @@ +// Tests the scenario described in SERVER-2753. +// 1. Send a single insert command with a large number of documents and the {ordered: true} option. +// 2. Force the thread processing the insert command to hang inbetween insert batches. (Inserts are +// typically split into batches of 64, and the server yields locks between batches.) +// 3. Disconnect the original primary from the network, forcing another node to step up. +// 4. Insert a single document on the new primary. +// 5. Return the original primary to the network and force it to step up by disconnecting the +// primary that replaced it. The original primary has to roll back any batches from step 1 +// that were inserted locally but did not get majority committed before the insert in step 4. +// 6. Unpause the thread performing the insert from step 1. If it continues to +// insert batches even though there was a rollback, those inserts will +// violate the {ordered: true} option. + +load('jstests/libs/parallelTester.js'); +load("jstests/replsets/rslib.js"); + +(function() { + "use strict"; + + var name = "interrupted_batch_insert"; + var replTest = new ReplSetTest({name: name, nodes: 3, useBridge: true}); + var nodes = replTest.nodeList(); + + var conns = replTest.startSet(); + replTest.initiate({ + _id: name, + members: [ + {_id: 0, host: nodes[0]}, + {_id: 1, host: nodes[1]}, + {_id: 2, host: nodes[2], priority: 0} + ] + }); + + // The test starts with node 0 as the primary. + replTest.waitForState(replTest.nodes[0], ReplSetTest.State.PRIMARY); + var primary = replTest.nodes[0]; + var collName = primary.getDB("db")[name].getFullName(); + + var getParameterResult = + primary.getDB("admin").runCommand({getParameter: 1, internalInsertMaxBatchSize: 1}); + assert.commandWorked(getParameterResult); + const batchSize = getParameterResult.internalInsertMaxBatchSize; + + // Prevent node 1 from getting any data from the node 0 oplog. + conns[0].disconnect(conns[1]); + + // Allow the primary to insert the first 5 batches of documents. After that, the fail point + // activates, and the client thread hangs until the fail point gets turned off. + assert.commandWorked(primary.getDB("db").adminCommand( + {configureFailPoint: "hangDuringBatchInsert", mode: {skip: 5}})); + + // In a background thread, issue an insert command to the primary that will insert 10 batches of + // documents. + var worker = new ScopedThread((host, collName, numToInsert) => { + // Insert elements [{idx: 0}, {idx: 1}, ..., {idx: numToInsert - 1}]. + const docsToInsert = Array.from({length: numToInsert}, (_, i) => { + return {idx: i}; + }); + var coll = new Mongo(host).getCollection(collName); + assert.throws( + () => coll.insert(docsToInsert, + {writeConcern: {w: "majority", wtimeout: 5000}, ordered: true}), + [], + "network error"); + }, primary.host, collName, 10 * batchSize); + worker.start(); + + // Wait long enough to guarantee that all 5 batches of inserts have executed and the primary is + // hung on the "hangDuringBatchInsert" fail point. + sleep(1000); + + // Make sure the insert command is, in fact, running in the background. + assert.eq(primary.getDB("db").currentOp({"command.insert": name, active: true}).inprog.length, + 1); + + // Completely isolate the current primary (node 0), forcing it to step down. + conns[0].disconnect(conns[2]); + + // Wait for node 1, the only other eligible node, to become the new primary. + replTest.waitForState(replTest.nodes[1], ReplSetTest.State.PRIMARY); + + // Wait for node 2 to acknowledge node 1 as the new primary. + replTest.awaitSyncSource(replTest.nodes[2], replTest.nodes[1]); + + // Issue a write to the new primary. + var collOnNewPrimary = replTest.nodes[1].getCollection(collName); + assert.writeOK(collOnNewPrimary.insert({singleDoc: 1}, {writeConcern: {w: "majority"}})); + + // Isolate node 1, forcing it to step down as primary, and reconnect node 0, allowing it to step + // up again. + conns[0].reconnect(conns[2]); + conns[1].disconnect(conns[2]); + + // Wait for node 0 to become primary again. + replTest.waitForState(primary, ReplSetTest.State.PRIMARY); + + // Wait until node 2 recognizes node 0 as primary. + replTest.awaitSyncSource(replTest.nodes[2], primary); + + // Allow the batch insert to continue. + assert.commandWorked(primary.getDB("db").adminCommand( + {configureFailPoint: "hangDuringBatchInsert", mode: "off"})); + + // Wait until the insert command is done. + assert.soon( + () => + primary.getDB("db").currentOp({"command.insert": name, active: true}).inprog.length === + 0); + + worker.join(); + + var docs = primary.getDB("db")[name].find({idx: {$exists: 1}}).sort({idx: 1}).toArray(); + + // Any discontinuity in the "idx" values is an error. If an "idx" document failed to insert, all + // the of "idx" documents after it should also have failed to insert, because the insert + // specified {ordered: 1}. Note, if none of the inserts were successful, that's fine. + docs.forEach((element, index) => { + assert.eq(element.idx, index); + }); + + // Reconnect the remaining disconnected nodes, so we can exit. + conns[0].reconnect(conns[1]); + conns[1].reconnect(conns[2]); + + replTest.stopSet(15); +}()); |