diff options
author | jannaerin <golden.janna@gmail.com> | 2018-09-10 12:43:56 -0400 |
---|---|---|
committer | jannaerin <golden.janna@gmail.com> | 2018-11-15 12:10:02 -0500 |
commit | dbafa9a288019008fd7a4ac975c298ec0bf40534 (patch) | |
tree | 78e09249619177a53d2b2780b98b86916c2d4f7a /jstests/libs/txns | |
parent | 866d4ba84171d7cc88876e2d0d8ae4eeb04ff019 (diff) | |
download | mongo-dbafa9a288019008fd7a4ac975c298ec0bf40534.tar.gz |
SERVER-36311 Add stepdowns, crashes, and shutdowns to replica_sets_multi_stmt_txn_jscore_passthrough suite
Diffstat (limited to 'jstests/libs/txns')
-rw-r--r-- | jstests/libs/txns/txn_override.js | 438 | ||||
-rw-r--r-- | jstests/libs/txns/txn_passthrough_runner.js | 3 |
2 files changed, 365 insertions, 76 deletions
diff --git a/jstests/libs/txns/txn_override.js b/jstests/libs/txns/txn_override.js index 1ff53fcce4c..9697f90649f 100644 --- a/jstests/libs/txns/txn_override.js +++ b/jstests/libs/txns/txn_override.js @@ -1,3 +1,4 @@ + /** * Override to run consecutive operations inside the same transaction. When an operation that * cannot be run inside of a transaction is encountered, the active transaction is committed @@ -7,7 +8,9 @@ (function() { 'use strict'; + load("jstests/libs/override_methods/read_and_write_concern_helpers.js"); load('jstests/libs/override_methods/override_helpers.js'); + load("jstests/libs/retryable_writes_util.js"); const runCommandOriginal = Mongo.prototype.runCommand; @@ -43,7 +46,49 @@ kInactive: 'inactive', }; + // Array to hold pairs of (commandObj, makeFuncArgs) that will be iterated + // over when retrying a command run in a txn on a network error. + let ops = []; + + // Used to indicate whether the operation is being re-run, so we will not add + // it to our ops array multple times. + let retryOp = false; + + // Set the max number of operations to run in a transaction. Once we've + // hit this number of operations, we will commit the transaction. This is to + // prevent having to retry an extremely long running transaction. + const maxOpsInTransaction = 10; + + // The last operation we logged upon failure. To avoid logging a command that + // fails multiple times in a row each time it fails, we use this check if we've + // just logged this command. This allows us to log failing commands to help with + // debugging, but helps to avoid spamming logs. + let lastLoggedOp; + + // The last TransientTransactionError on a commitTransaction that caused us to retry + // the entire transaction. For help with debugging. + let transientErrorToLog; + + function logFailedCommandAndError(cmdObj, cmdName, res) { + if (cmdObj !== lastLoggedOp) { + try { + jsTestLog("Failed on cmd: " + tojson(cmdObj) + " with error: " + tojson(res)); + } catch (e) { + jsTestLog("Failed on cmd: " + cmdName + " with error: " + tojson(res)); + } + lastLoggedOp = cmdObj; + + if (transientErrorToLog) { + jsTestLog("Error that caused retry of transaction " + tojson(transientErrorToLog)); + } + } + } + function commandSupportsTxn(dbName, cmdName, cmdObj) { + if (cmdName === 'commitTransaction' || cmdName === 'abortTransaction') { + return true; + } + if (!kCmdsSupportingTransactions.has(cmdName)) { return false; } @@ -57,6 +102,11 @@ return false; } } + + if (cmdObj.lsid === undefined) { + return false; + } + return true; } @@ -92,16 +142,170 @@ } } + function appendReadAndWriteConcern(conn, dbName, commandName, commandObj) { + if (TestData.retryingOnNetworkError) { + return; + } + + let shouldForceReadConcern = kCommandsSupportingReadConcern.has(commandName); + let shouldForceWriteConcern = kCommandsSupportingWriteConcern.has(commandName); + + if (commandObj.hasOwnProperty("autocommit")) { + shouldForceReadConcern = false; + if (commandObj.startTransaction === true) { + shouldForceReadConcern = true; + } + if (!kCommandsSupportingWriteConcernInTransaction.has(commandName)) { + shouldForceWriteConcern = false; + } + } else if (commandName === "aggregate") { + if (OverrideHelpers.isAggregationWithListLocalCursorsStage(commandName, commandObj)) { + // The $listLocalCursors stage can only be used with readConcern={level: + // "local"}. + shouldForceReadConcern = false; + } + + if (OverrideHelpers.isAggregationWithListLocalSessionsStage(commandName, commandObj)) { + // The $listLocalSessions stage can only be used with readConcern={level: + // "local"}. + shouldForceReadConcern = false; + } + + if (OverrideHelpers.isAggregationWithOutStage(commandName, commandObj)) { + // The $out stage can only be used with readConcern={level: "local"}. + shouldForceReadConcern = false; + } else { + // A writeConcern can only be used with a $out stage. + shouldForceWriteConcern = false; + } + + if (commandObj.explain) { + // Attempting to specify a readConcern while explaining an aggregation would + // always return an error prior to SERVER-30582 and it is otherwise only + // compatible with readConcern={level: "local"}. + shouldForceReadConcern = false; + } + } else if (OverrideHelpers.isMapReduceWithInlineOutput(commandName, commandObj)) { + // A writeConcern can only be used with non-inline output. + shouldForceWriteConcern = false; + } + + if (shouldForceReadConcern) { + let readConcernLevel; + if (commandObj.startTransaction === true) { + readConcernLevel = "snapshot"; + } else { + readConcernLevel = "majority"; + } + + if (commandObj.readConcern && commandObj.readConcern.level !== readConcernLevel) { + throw new Error("refusing to override existing readConcern " + + commandObj.readConcern.level + " with readConcern " + + readConcernLevel); + } else { + commandObj.readConcern = {level: readConcernLevel}; + + const driverSession = conn.getDB(dbName).getSession(); + const operationTime = driverSession.getOperationTime(); + if (operationTime !== undefined) { + commandObj.readConcern.afterClusterTime = operationTime; + } + } + } + + if (shouldForceWriteConcern) { + if (commandObj.hasOwnProperty("writeConcern")) { + let writeConcern = commandObj.writeConcern; + if (typeof writeConcern !== "object" || writeConcern === null || + (writeConcern.hasOwnProperty("w") && + bsonWoCompare({_: writeConcern.w}, {_: "majority"}) !== 0)) { + throw new Error("Cowardly refusing to override write concern of command: " + + tojson(commandObj)); + } + } + + // Use a "signature" value that won't typically match a value assigned in normal + // use. This way the wtimeout set by this override is distinguishable in the server + // logs. + commandObj.writeConcern = {w: "majority", wtimeout: 5 * 60 * 1000 + 456}; + } + } + + function retryOnImplicitCollectionCreationIfNeeded( + conn, dbName, commandName, commandObj, func, makeFuncArgs, res, txnOptions) { + if (kCmdsThatInsert.has(commandName)) { + // If the command inserted data and is not supported in a transaction, we assume it + // failed because the collection did not exist. We will create the collection and + // retry the command. If the collection did exist, we'll return the original + // response because it failed for a different reason. Tests that expect collections + // to not exist will have to be skipped. + if (res.code === ErrorCodes.OperationNotSupportedInTransaction) { + const createCmdRes = runCommandOriginal.call(conn, + dbName, + { + create: commandObj[commandName], + lsid: commandObj.lsid, + writeConcern: {w: 'majority'}, + }, + 0); + + if (createCmdRes.ok !== 1) { + if (createCmdRes.code !== ErrorCodes.NamespaceExists) { + // The collection still does not exist. So we just return the original + // response to the caller, + logFailedCommandAndError(commandObj, commandName, createCmdRes); + return res; + } + } else { + assert.commandWorked(createCmdRes); + } + } else { + // If the insert command failed for any other reason, we return the original + // response without retrying. + logFailedCommandAndError(commandObj, commandName, res); + return res; + } + // We aborted the transaction, so we need to re-run every op in the transaction, + // rather than just the current op. + for (let op of ops) { + retryOp = true; + res = runCommandInTransactionIfNeeded( + conn, op.dbName, op.cmdName, op.cmdObj, func, op.makeFuncArgs); + + if (res.ok !== 1) { + logFailedCommandAndError(commandObj, commandName, res); + abortTransaction(conn, commandObj.lsid, txnOptions.txnNumber); + return res; + } + } + } + + return res; + } + + function updateAndGossipClusterTime(conn, dbName, commitRes, commandObj) { + // Update the latest cluster time on the session manually after we commit so + // that we will not read too early in the next transaction. At this point, we've + // already run through the original processCommand path where we filled in the + // clusterTime, so we will not update it otherwise. + conn.getDB(dbName).getSession().processCommandResponse_forTesting(commitRes); + + // Gossip the later cluster time when we retry the command. + if (commandObj.$clusterTime) { + commandObj.$clusterTime = commitRes.$clusterTime; + } + } + function commitTransaction(conn, lsid, txnNumber) { - const res = runCommandOriginal.call(conn, - 'admin', - { - commitTransaction: 1, - autocommit: false, lsid, txnNumber, - }, - 0); + const res = conn.adminCommand({ + commitTransaction: 1, + autocommit: false, lsid, txnNumber, + }); assert.commandWorked(res); conn.txnOverrideState = TransactionStates.kInactive; + ops = []; + + return res; } function abortTransaction(conn, lsid, txnNumber) { @@ -116,11 +320,10 @@ txnNumber: txnNumber, }, 0); - conn.txnOverrideState = TransactionStates.kInactive; } - function continueTransaction(conn, txnOptions, cmdName, cmdObj) { + function continueTransaction(conn, txnOptions, dbName, cmdName, cmdObj, makeFuncArgs) { if (conn.txnOverrideState === TransactionStates.kInactive) { // First command in a transaction. txnOptions.txnNumber = new NumberLong(txnOptions.txnNumber + 1); @@ -128,12 +331,6 @@ cmdObj.startTransaction = true; - if (cmdObj.readConcern && cmdObj.readConcern.level !== 'snapshot') { - throw new Error("refusing to override existing readConcern"); - } else { - cmdObj.readConcern = {level: 'snapshot'}; - } - conn.txnOverrideState = TransactionStates.kActive; } @@ -142,16 +339,21 @@ cmdObj.txnNumber = txnOptions.txnNumber; cmdObj.stmtId = txnOptions.stmtId; cmdObj.autocommit = false; - } - - function runCommandWithTransactions(conn, dbName, commandName, commandObj, func, makeFuncArgs) { - const driverSession = conn.getDB(dbName).getSession(); - if (driverSession.getSessionId() === null) { - // Sessions is explicitly disabled for this command. So we skip overriding it to - // use transactions. - return func.apply(conn, makeFuncArgs(commandObj)); + delete cmdObj.writeConcern; + + // We only want to add this op to the ops array if we have not already added it. If + // retryingOnNetworkError is true, this op will already have been added. If retryOp + // is false, this op is a write command that we are retrying thus this op has already + // been added to the ops array. + if (!TestData.retryingOnNetworkError && !retryOp) { + ops.push({dbName, cmdName, cmdObj, makeFuncArgs}); } + appendReadAndWriteConcern(conn, dbName, cmdName, cmdObj); + } + + function runCommandInTransactionIfNeeded( + conn, dbName, commandName, commandObj, func, makeFuncArgs) { let cmdObjUnwrapped = commandObj; let cmdNameUnwrapped = commandName; @@ -165,70 +367,160 @@ commandSupportsTxn(dbName, cmdNameUnwrapped, cmdObjUnwrapped); const txnOptions = getTxnOptionsForClient(conn); + if (commandSupportsTransaction) { + if (cmdNameUnwrapped === "commitTransaction") { + appendReadAndWriteConcern(conn, dbName, cmdNameUnwrapped, cmdObjUnwrapped); + cmdObjUnwrapped.txnNumber = txnOptions.txnNumber; + } else { + // Commit the transaction if we've run `maxOpsInTransaction` commands as a part of + // this transaction to avoid having to retry really long running transactions. + if ((TestData.retryingOnNetworkError === false) && + (ops.length >= maxOpsInTransaction) && + (conn.txnOverrideState === TransactionStates.kActive)) { + let commitRes = + commitTransaction(conn, cmdObjUnwrapped.lsid, txnOptions.txnNumber); + updateAndGossipClusterTime(conn, dbName, commitRes, cmdObjUnwrapped); + } - if (!commandSupportsTransaction) { + continueTransaction( + conn, txnOptions, dbName, cmdNameUnwrapped, cmdObjUnwrapped, makeFuncArgs); + retryOp = false; + } + } else { if (conn.txnOverrideState === TransactionStates.kActive) { - commitTransaction(conn, commandObj.lsid, txnOptions.txnNumber); + let commitRes = commitTransaction(conn, cmdObjUnwrapped.lsid, txnOptions.txnNumber); + updateAndGossipClusterTime(conn, dbName, commitRes, cmdObjUnwrapped); + } else { + ops = []; } - } else { - continueTransaction(conn, txnOptions, cmdNameUnwrapped, cmdObjUnwrapped); + appendReadAndWriteConcern(conn, dbName, cmdNameUnwrapped, cmdObjUnwrapped); + if (commandName === 'drop' || commandName === 'convertToCapped') { + // Convert all collection drops to w:majority so they won't prevent subsequent + // operations in transactions from failing when failing to acquire collection locks. + if (!cmdObjUnwrapped.writeConcern) { + cmdObjUnwrapped.writeConcern = {}; + } + cmdObjUnwrapped.writeConcern.w = 'majority'; + } + } + + let res = func.apply(conn, makeFuncArgs(commandObj)); + + if ((res.ok !== 1) && (conn.txnOverrideState === TransactionStates.kActive)) { + abortTransaction(conn, cmdObjUnwrapped.lsid, txnOptions.txnNumber); + res = retryOnImplicitCollectionCreationIfNeeded(conn, + dbName, + cmdNameUnwrapped, + cmdObjUnwrapped, + func, + makeFuncArgs, + res, + txnOptions); + } + + return res; + } + + function retryEntireTransaction(conn, txnNumber, lsid, func) { + jsTestLog("Retrying entire transaction on TransientTransactionError for aborted txn " + + "with txnNum: " + txnNumber + " and lsid " + tojson(lsid)); + // Set the transactionState to inactive so continueTransaction() will bump the + // txnNum. + conn.txnOverrideState = TransactionStates.kInactive; + + // Re-run every command in the ops array. + assert.gt(ops.length, 0); + + let res; + for (let op of ops) { + res = runCommandInTransactionIfNeeded( + conn, op.dbName, op.cmdName, op.cmdObj, func, op.makeFuncArgs); + + if (res.hasOwnProperty('errorLabels') && + res.errorLabels.includes('TransientTransactionError')) { + return retryEntireTransaction(conn, txnNumber, lsid, func); + } } - if (commandName === 'drop' || commandName === 'convertToCapped') { - // Convert all collection drops to w:majority so they won't prevent subsequent - // operations in transactions from failing when failing to acquire collection locks. - if (!cmdObjUnwrapped.writeConcern) { - cmdObjUnwrapped.writeConcern = {}; + return res; + } + + function retryCommitTransaction(conn, dbName, commandName, commandObj, func, makeFuncArgs) { + let res; + let retryCommit = false; + jsTestLog("Retrying commitTransaction for txnNum: " + commandObj.txnNumber + " and lsid: " + + commandObj.lsid); + do { + res = runCommandInTransactionIfNeeded( + conn, dbName, "commitTransaction", commandObj, func, makeFuncArgs); + + if (res.writeConcernError) { + retryCommit = true; + continue; + } + + if (res.hasOwnProperty('errorLabels') && + res.errorLabels.includes('TransientTransactionError')) { + transientErrorToLog = res; + retryCommit = true; + res = retryEntireTransaction(conn, commandObj.txnNumber, commandObj.lsid, func); + } else if (res.ok === 1) { + retryCommit = false; } - cmdObjUnwrapped.writeConcern.w = 'majority'; + } while (retryCommit); + + return res; + } + + function runCommandOnNetworkErrorRetry( + conn, dbName, commandName, commandObj, func, makeFuncArgs) { + transientErrorToLog = null; + // If the ops array is empty, we failed on a command not being run in a + // transaction and need to retry just this command. + if (ops.length === 0) { + // Set the transactionState to inactive so continueTransaction() will bump the + // txnNum. + conn.txnOverrideState = TransactionStates.kInactive; + return runCommandInTransactionIfNeeded( + conn, dbName, commandName, commandObj, func, makeFuncArgs); } - let res = func.apply(conn, makeFuncArgs(commandObj)); + if (commandName === "commitTransaction") { + return retryCommitTransaction( + conn, dbName, commandName, commandObj, func, makeFuncArgs); + } - if (res.ok !== 1) { - abortTransaction(conn, commandObj.lsid, txnOptions.txnNumber); - if (kCmdsThatInsert.has(cmdNameUnwrapped)) { - // If the command inserted data and is not supported in a transaction, we assume it - // failed because the collection did not exist. We will create the collection and - // retry the command. If the collection did exist, we'll return the original - // response because it failed for a different reason. Tests that expect collections - // to not exist will have to be skipped. - if (res.code === ErrorCodes.OperationNotSupportedInTransaction) { - const createCmdRes = - runCommandOriginal.call(conn, - dbName, - { - create: cmdObjUnwrapped[cmdNameUnwrapped], - lsid: commandObj.lsid, - writeConcern: {w: 'majority'}, - }, - 0); - - if (createCmdRes.ok !== 1) { - if (createCmdRes.code !== ErrorCodes.NamespaceExists) { - // The collection still does not exist. So we just return the original - // response to the caller, - return res; - } - } else { - assert.commandWorked(createCmdRes); - } - } else { - // If the insert command failed for any other reason, we return the original - // response without retrying. - return res; - } + return retryEntireTransaction(conn, commandObj.txnNumber, commandObj.lsid, func); + } + + function runCommandWithTransactionRetries( + conn, dbName, commandName, commandObj, func, makeFuncArgs) { + const driverSession = conn.getDB(dbName).getSession(); + if (driverSession.getSessionId() === null) { + // Sessions is explicitly disabled for this command. So we skip overriding it to + // use transactions. + return func.apply(conn, makeFuncArgs(commandObj)); + } - continueTransaction(conn, txnOptions, cmdNameUnwrapped, cmdObjUnwrapped); + let res; + if (TestData.retryingOnNetworkError !== true) { + res = runCommandInTransactionIfNeeded( + conn, dbName, commandName, commandObj, func, makeFuncArgs); - res = func.apply(conn, makeFuncArgs(commandObj)); - if (res.ok !== 1) { - abortTransaction(conn, commandObj.lsid, txnOptions.txnNumber); + if (commandName === "commitTransaction") { + while (res.writeConcernError) { + res = runCommandInTransactionIfNeeded( + conn, dbName, commandName, commandObj, func, makeFuncArgs); } } + + return res; } + res = runCommandOnNetworkErrorRetry( + conn, dbName, commandName, commandObj, func, makeFuncArgs); + return res; } @@ -238,5 +530,5 @@ "startParalleShell()"); }; - OverrideHelpers.overrideRunCommand(runCommandWithTransactions); -})();
\ No newline at end of file + OverrideHelpers.overrideRunCommand(runCommandWithTransactionRetries); +})(); diff --git a/jstests/libs/txns/txn_passthrough_runner.js b/jstests/libs/txns/txn_passthrough_runner.js index 3acc3184d54..1e2640cd11b 100644 --- a/jstests/libs/txns/txn_passthrough_runner.js +++ b/jstests/libs/txns/txn_passthrough_runner.js @@ -1,9 +1,6 @@ (function() { 'use strict'; - load('jstests/libs/override_methods/enable_sessions.js'); - load('jstests/libs/txns/txn_override.js'); - const testFile = TestData.multiStmtTxnTestFile; try { |