diff options
Diffstat (limited to 'jstests/sharding/txn_two_phase_commit_basic.js')
-rw-r--r-- | jstests/sharding/txn_two_phase_commit_basic.js | 469 |
1 files changed, 235 insertions, 234 deletions
diff --git a/jstests/sharding/txn_two_phase_commit_basic.js b/jstests/sharding/txn_two_phase_commit_basic.js index 09f4f1bf0cf..535cbe294b7 100644 --- a/jstests/sharding/txn_two_phase_commit_basic.js +++ b/jstests/sharding/txn_two_phase_commit_basic.js @@ -6,249 +6,250 @@ */ (function() { - 'use strict'; - - load('jstests/sharding/libs/sharded_transactions_helpers.js'); - - const dbName = "test"; - const collName = "foo"; - const ns = dbName + "." + collName; - - let st = new ShardingTest({shards: 3, causallyConsistent: true}); - - let coordinator = st.shard0; - let participant1 = st.shard1; - let participant2 = st.shard2; - - let expectedParticipantList = - [participant1.shardName, participant2.shardName, coordinator.shardName]; - - let lsid = {id: UUID()}; - let txnNumber = 0; - - const checkParticipantListMatches = function( - coordinatorConn, lsid, txnNumber, expectedParticipantList) { - let coordDoc = coordinatorConn.getDB("config") - .getCollection("transaction_coordinators") - .findOne({"_id.lsid.id": lsid.id, "_id.txnNumber": txnNumber}); - assert.neq(null, coordDoc); - assert.sameMembers(coordDoc.participants, expectedParticipantList); - }; - - const checkDecisionIs = function(coordinatorConn, lsid, txnNumber, expectedDecision) { - let coordDoc = coordinatorConn.getDB("config") - .getCollection("transaction_coordinators") - .findOne({"_id.lsid.id": lsid.id, "_id.txnNumber": txnNumber}); - assert.neq(null, coordDoc); - assert.eq(expectedDecision, coordDoc.decision.decision); - if (expectedDecision === "commit") { - assert.neq(null, coordDoc.decision.commitTimestamp); - } else { - assert.eq(null, coordDoc.decision.commitTimestamp); - } - }; - - const checkDocumentDeleted = function(coordinatorConn, lsid, txnNumber) { - let coordDoc = coordinatorConn.getDB("config") - .getCollection("transaction_coordinators") - .findOne({"_id.lsid.id": lsid.id, "_id.txnNumber": txnNumber}); - return null === coordDoc; - }; - - const runCommitThroughMongosInParallelShellExpectSuccess = function() { - const runCommitExpectSuccessCode = "assert.commandWorked(db.adminCommand({" + - "commitTransaction: 1," + "lsid: " + tojson(lsid) + "," + "txnNumber: NumberLong(" + - txnNumber + ")," + "stmtId: NumberInt(0)," + "autocommit: false," + "}));"; - return startParallelShell(runCommitExpectSuccessCode, st.s.port); - }; - - const runCommitThroughMongosInParallelShellExpectAbort = function() { - const runCommitExpectSuccessCode = "assert.commandFailedWithCode(db.adminCommand({" + - "commitTransaction: 1," + "lsid: " + tojson(lsid) + "," + "txnNumber: NumberLong(" + - txnNumber + ")," + "stmtId: NumberInt(0)," + "autocommit: false," + "})," + - "ErrorCodes.NoSuchTransaction);"; - return startParallelShell(runCommitExpectSuccessCode, st.s.port); - }; - - const startSimulatingNetworkFailures = function(connArray) { - connArray.forEach(function(conn) { - assert.commandWorked(conn.adminCommand({ - configureFailPoint: "failCommand", - mode: {times: 10}, - data: { - errorCode: ErrorCodes.NotMaster, - failCommands: - ["prepareTransaction", "abortTransaction", "commitTransaction"] - } - })); - assert.commandWorked(conn.adminCommand({ - configureFailPoint: - "participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic", - mode: {times: 5} - })); - assert.commandWorked(conn.adminCommand({ - configureFailPoint: "participantReturnNetworkErrorForAbortAfterExecutingAbortLogic", - mode: {times: 5} - })); - assert.commandWorked(conn.adminCommand({ - configureFailPoint: - "participantReturnNetworkErrorForCommitAfterExecutingCommitLogic", - mode: {times: 5} - })); - }); - }; - - const stopSimulatingNetworkFailures = function(connArray) { - connArray.forEach(function(conn) { - assert.commandWorked(conn.adminCommand({ - configureFailPoint: "failCommand", - mode: "off", - })); - assert.commandWorked(conn.adminCommand({ - configureFailPoint: - "participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic", - mode: "off" - })); - assert.commandWorked(conn.adminCommand({ - configureFailPoint: "participantReturnNetworkErrorForAbortAfterExecutingAbortLogic", - mode: "off" - })); - assert.commandWorked(conn.adminCommand({ - configureFailPoint: - "participantReturnNetworkErrorForCommitAfterExecutingCommitLogic", - mode: "off" - })); - }); - }; - - const setUp = function() { - // Create a sharded collection with a chunk on each shard: - // shard0: [-inf, 0) - // shard1: [0, 10) - // shard2: [10, +inf) - assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); - assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: coordinator.shardName})); - assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); - assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); - assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 10}})); - assert.commandWorked( - st.s.adminCommand({moveChunk: ns, find: {_id: 0}, to: participant1.shardName})); - assert.commandWorked( - st.s.adminCommand({moveChunk: ns, find: {_id: 10}, to: participant2.shardName})); - - // These forced refreshes are not strictly necessary; they just prevent extra TXN log lines - // from the shards starting, aborting, and restarting the transaction due to needing to - // refresh after the transaction has started. - assert.commandWorked(coordinator.adminCommand({_flushRoutingTableCacheUpdates: ns})); - assert.commandWorked(participant1.adminCommand({_flushRoutingTableCacheUpdates: ns})); - assert.commandWorked(participant2.adminCommand({_flushRoutingTableCacheUpdates: ns})); - - // Start a new transaction by inserting a document onto each shard. - assert.commandWorked(st.s.getDB(dbName).runCommand({ - insert: collName, - documents: [{_id: -5}, {_id: 5}, {_id: 15}], - lsid: lsid, - txnNumber: NumberLong(txnNumber), - stmtId: NumberInt(0), - startTransaction: true, - autocommit: false, +'use strict'; + +load('jstests/sharding/libs/sharded_transactions_helpers.js'); + +const dbName = "test"; +const collName = "foo"; +const ns = dbName + "." + collName; + +let st = new ShardingTest({shards: 3, causallyConsistent: true}); + +let coordinator = st.shard0; +let participant1 = st.shard1; +let participant2 = st.shard2; + +let expectedParticipantList = + [participant1.shardName, participant2.shardName, coordinator.shardName]; + +let lsid = {id: UUID()}; +let txnNumber = 0; + +const checkParticipantListMatches = function( + coordinatorConn, lsid, txnNumber, expectedParticipantList) { + let coordDoc = coordinatorConn.getDB("config") + .getCollection("transaction_coordinators") + .findOne({"_id.lsid.id": lsid.id, "_id.txnNumber": txnNumber}); + assert.neq(null, coordDoc); + assert.sameMembers(coordDoc.participants, expectedParticipantList); +}; + +const checkDecisionIs = function(coordinatorConn, lsid, txnNumber, expectedDecision) { + let coordDoc = coordinatorConn.getDB("config") + .getCollection("transaction_coordinators") + .findOne({"_id.lsid.id": lsid.id, "_id.txnNumber": txnNumber}); + assert.neq(null, coordDoc); + assert.eq(expectedDecision, coordDoc.decision.decision); + if (expectedDecision === "commit") { + assert.neq(null, coordDoc.decision.commitTimestamp); + } else { + assert.eq(null, coordDoc.decision.commitTimestamp); + } +}; + +const checkDocumentDeleted = function(coordinatorConn, lsid, txnNumber) { + let coordDoc = coordinatorConn.getDB("config") + .getCollection("transaction_coordinators") + .findOne({"_id.lsid.id": lsid.id, "_id.txnNumber": txnNumber}); + return null === coordDoc; +}; + +const runCommitThroughMongosInParallelShellExpectSuccess = function() { + const runCommitExpectSuccessCode = "assert.commandWorked(db.adminCommand({" + + "commitTransaction: 1," + + "lsid: " + tojson(lsid) + "," + + "txnNumber: NumberLong(" + txnNumber + ")," + + "stmtId: NumberInt(0)," + + "autocommit: false," + + "}));"; + return startParallelShell(runCommitExpectSuccessCode, st.s.port); +}; + +const runCommitThroughMongosInParallelShellExpectAbort = function() { + const runCommitExpectSuccessCode = "assert.commandFailedWithCode(db.adminCommand({" + + "commitTransaction: 1," + + "lsid: " + tojson(lsid) + "," + + "txnNumber: NumberLong(" + txnNumber + ")," + + "stmtId: NumberInt(0)," + + "autocommit: false," + + "})," + + "ErrorCodes.NoSuchTransaction);"; + return startParallelShell(runCommitExpectSuccessCode, st.s.port); +}; + +const startSimulatingNetworkFailures = function(connArray) { + connArray.forEach(function(conn) { + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "failCommand", + mode: {times: 10}, + data: { + errorCode: ErrorCodes.NotMaster, + failCommands: ["prepareTransaction", "abortTransaction", "commitTransaction"] + } })); - }; - - const testCommitProtocol = function(shouldCommit, simulateNetworkFailures) { - jsTest.log("Testing two-phase " + (shouldCommit ? "commit" : "abort") + - " protocol with simulateNetworkFailures: " + simulateNetworkFailures); - - txnNumber++; - setUp(); - - if (!shouldCommit) { - // Manually abort the transaction on one of the participants, so that the participant - // fails to prepare. - assert.commandWorked(participant2.adminCommand({ - abortTransaction: 1, - lsid: lsid, - txnNumber: NumberLong(txnNumber), - stmtId: NumberInt(0), - autocommit: false, - })); - } - - if (simulateNetworkFailures) { - startSimulatingNetworkFailures([participant1, participant2]); - } - - // Turn on failpoints so that the coordinator hangs after each write it does, so that the - // test can check that the write happened correctly. - assert.commandWorked(coordinator.adminCommand({ - configureFailPoint: "hangBeforeWaitingForParticipantListWriteConcern", - mode: "alwaysOn", + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic", + mode: {times: 5} })); - assert.commandWorked(coordinator.adminCommand({ - configureFailPoint: "hangBeforeWaitingForDecisionWriteConcern", - mode: "alwaysOn", + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "participantReturnNetworkErrorForAbortAfterExecutingAbortLogic", + mode: {times: 5} })); - - // Run commitTransaction through a parallel shell. - let awaitResult; - if (shouldCommit) { - awaitResult = runCommitThroughMongosInParallelShellExpectSuccess(); - } else { - awaitResult = runCommitThroughMongosInParallelShellExpectAbort(); - } - - // Check that the coordinator wrote the participant list. - waitForFailpoint("Hit hangBeforeWaitingForParticipantListWriteConcern failpoint", - txnNumber); - checkParticipantListMatches(coordinator, lsid, txnNumber, expectedParticipantList); - assert.commandWorked(coordinator.adminCommand({ - configureFailPoint: "hangBeforeWaitingForParticipantListWriteConcern", - mode: "off", + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "participantReturnNetworkErrorForCommitAfterExecutingCommitLogic", + mode: {times: 5} })); + }); +}; - // Check that the coordinator wrote the decision. - waitForFailpoint("Hit hangBeforeWaitingForDecisionWriteConcern failpoint", txnNumber); - checkParticipantListMatches(coordinator, lsid, txnNumber, expectedParticipantList); - checkDecisionIs(coordinator, lsid, txnNumber, (shouldCommit ? "commit" : "abort")); - assert.commandWorked(coordinator.adminCommand({ - configureFailPoint: "hangBeforeWaitingForDecisionWriteConcern", +const stopSimulatingNetworkFailures = function(connArray) { + connArray.forEach(function(conn) { + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "failCommand", mode: "off", })); - - // Check that the coordinator deleted its persisted state. - awaitResult(); + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic", + mode: "off" + })); + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "participantReturnNetworkErrorForAbortAfterExecutingAbortLogic", + mode: "off" + })); + assert.commandWorked(conn.adminCommand({ + configureFailPoint: "participantReturnNetworkErrorForCommitAfterExecutingCommitLogic", + mode: "off" + })); + }); +}; + +const setUp = function() { + // Create a sharded collection with a chunk on each shard: + // shard0: [-inf, 0) + // shard1: [0, 10) + // shard2: [10, +inf) + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: coordinator.shardName})); + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); + assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); + assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 10}})); + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 0}, to: participant1.shardName})); + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 10}, to: participant2.shardName})); + + // These forced refreshes are not strictly necessary; they just prevent extra TXN log lines + // from the shards starting, aborting, and restarting the transaction due to needing to + // refresh after the transaction has started. + assert.commandWorked(coordinator.adminCommand({_flushRoutingTableCacheUpdates: ns})); + assert.commandWorked(participant1.adminCommand({_flushRoutingTableCacheUpdates: ns})); + assert.commandWorked(participant2.adminCommand({_flushRoutingTableCacheUpdates: ns})); + + // Start a new transaction by inserting a document onto each shard. + assert.commandWorked(st.s.getDB(dbName).runCommand({ + insert: collName, + documents: [{_id: -5}, {_id: 5}, {_id: 15}], + lsid: lsid, + txnNumber: NumberLong(txnNumber), + stmtId: NumberInt(0), + startTransaction: true, + autocommit: false, + })); +}; + +const testCommitProtocol = function(shouldCommit, simulateNetworkFailures) { + jsTest.log("Testing two-phase " + (shouldCommit ? "commit" : "abort") + + " protocol with simulateNetworkFailures: " + simulateNetworkFailures); + + txnNumber++; + setUp(); + + if (!shouldCommit) { + // Manually abort the transaction on one of the participants, so that the participant + // fails to prepare. + assert.commandWorked(participant2.adminCommand({ + abortTransaction: 1, + lsid: lsid, + txnNumber: NumberLong(txnNumber), + stmtId: NumberInt(0), + autocommit: false, + })); + } + + if (simulateNetworkFailures) { + startSimulatingNetworkFailures([participant1, participant2]); + } + + // Turn on failpoints so that the coordinator hangs after each write it does, so that the + // test can check that the write happened correctly. + assert.commandWorked(coordinator.adminCommand({ + configureFailPoint: "hangBeforeWaitingForParticipantListWriteConcern", + mode: "alwaysOn", + })); + assert.commandWorked(coordinator.adminCommand({ + configureFailPoint: "hangBeforeWaitingForDecisionWriteConcern", + mode: "alwaysOn", + })); + + // Run commitTransaction through a parallel shell. + let awaitResult; + if (shouldCommit) { + awaitResult = runCommitThroughMongosInParallelShellExpectSuccess(); + } else { + awaitResult = runCommitThroughMongosInParallelShellExpectAbort(); + } + + // Check that the coordinator wrote the participant list. + waitForFailpoint("Hit hangBeforeWaitingForParticipantListWriteConcern failpoint", txnNumber); + checkParticipantListMatches(coordinator, lsid, txnNumber, expectedParticipantList); + assert.commandWorked(coordinator.adminCommand({ + configureFailPoint: "hangBeforeWaitingForParticipantListWriteConcern", + mode: "off", + })); + + // Check that the coordinator wrote the decision. + waitForFailpoint("Hit hangBeforeWaitingForDecisionWriteConcern failpoint", txnNumber); + checkParticipantListMatches(coordinator, lsid, txnNumber, expectedParticipantList); + checkDecisionIs(coordinator, lsid, txnNumber, (shouldCommit ? "commit" : "abort")); + assert.commandWorked(coordinator.adminCommand({ + configureFailPoint: "hangBeforeWaitingForDecisionWriteConcern", + mode: "off", + })); + + // Check that the coordinator deleted its persisted state. + awaitResult(); + assert.soon(function() { + return checkDocumentDeleted(coordinator, lsid, txnNumber); + }); + + if (simulateNetworkFailures) { + stopSimulatingNetworkFailures([participant1, participant2]); + } + + // Check that the transaction committed or aborted as expected. + if (!shouldCommit) { + jsTest.log("Verify that the transaction was aborted on all shards."); + assert.eq(0, st.s.getDB(dbName).getCollection(collName).find().itcount()); + } else { + jsTest.log("Verify that the transaction was committed on all shards."); + // Use assert.soon(), because although coordinateCommitTransaction currently blocks + // until the commit process is fully complete, it will eventually be changed to only + // block until the decision is *written*, at which point the test can pass the + // operationTime returned by coordinateCommitTransaction as 'afterClusterTime' in the + // read to ensure the read sees the transaction's writes (TODO SERVER-37165). assert.soon(function() { - return checkDocumentDeleted(coordinator, lsid, txnNumber); + return 3 === st.s.getDB(dbName).getCollection(collName).find().itcount(); }); + } + + st.s.getDB(dbName).getCollection(collName).drop(); +}; - if (simulateNetworkFailures) { - stopSimulatingNetworkFailures([participant1, participant2]); - } - - // Check that the transaction committed or aborted as expected. - if (!shouldCommit) { - jsTest.log("Verify that the transaction was aborted on all shards."); - assert.eq(0, st.s.getDB(dbName).getCollection(collName).find().itcount()); - } else { - jsTest.log("Verify that the transaction was committed on all shards."); - // Use assert.soon(), because although coordinateCommitTransaction currently blocks - // until the commit process is fully complete, it will eventually be changed to only - // block until the decision is *written*, at which point the test can pass the - // operationTime returned by coordinateCommitTransaction as 'afterClusterTime' in the - // read to ensure the read sees the transaction's writes (TODO SERVER-37165). - assert.soon(function() { - return 3 === st.s.getDB(dbName).getCollection(collName).find().itcount(); - }); - } - - st.s.getDB(dbName).getCollection(collName).drop(); - }; - - testCommitProtocol(false /* test abort */, false /* no network failures */); - testCommitProtocol(true /* test commit */, false /* no network failures */); - testCommitProtocol(false /* test abort */, true /* with network failures */); - testCommitProtocol(true /* test commit */, true /* with network failures */); - - st.stop(); +testCommitProtocol(false /* test abort */, false /* no network failures */); +testCommitProtocol(true /* test commit */, false /* no network failures */); +testCommitProtocol(false /* test abort */, true /* with network failures */); +testCommitProtocol(true /* test commit */, true /* with network failures */); +st.stop(); })(); |