From d5cda575e31df3d8c4a9fecb072ac0582748b576 Mon Sep 17 00:00:00 2001 From: Jack Mulrow Date: Thu, 24 Feb 2022 15:40:14 +0000 Subject: SERVER-59342 Add killSessions and failover support to update shard key fsm testing --- jstests/concurrency/fsm_libs/fsm.js | 2 +- .../fsm_workload_helpers/auto_retry_transaction.js | 16 +-- .../random_moveChunk_update_shard_key.js | 122 +++++++++++++++++++-- ...dom_moveChunk_update_shard_key_kill_sessions.js | 103 +++++++++++++++++ jstests/hooks/run_dbcheck_background.js | 4 + jstests/libs/killed_session_util.js | 26 +++++ .../override_methods/retry_on_killed_session.js | 64 +++++++++++ src/mongo/db/transaction_participant.cpp | 10 ++ 8 files changed, 327 insertions(+), 20 deletions(-) create mode 100644 jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key_kill_sessions.js create mode 100644 jstests/libs/killed_session_util.js create mode 100644 jstests/libs/override_methods/retry_on_killed_session.js diff --git a/jstests/concurrency/fsm_libs/fsm.js b/jstests/concurrency/fsm_libs/fsm.js index c317258b382..29e4f5737eb 100644 --- a/jstests/concurrency/fsm_libs/fsm.js +++ b/jstests/concurrency/fsm_libs/fsm.js @@ -115,7 +115,7 @@ var fsm = (function() { data = TransactionsUtil.deepCopyObject({}, args.data); data[kIsRunningInsideTransaction] = true; fn.call(data, args.db, args.collName, connCache); - }); + }, {retryOnKilledSession: args.data.retryOnKilledSession}); delete data[kIsRunningInsideTransaction]; args.data = data; } catch (e) { diff --git a/jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js b/jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js index 30fdaa89e5f..76d9a7fc41d 100644 --- a/jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js +++ b/jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js @@ -1,5 +1,7 @@ 'use strict'; +load("jstests/libs/killed_session_util.js"); + var {withTxnAndAutoRetry, isKilledSessionCode} = (function() { /** * Calls 'func' with the print() function overridden to be a no-op. @@ -20,8 +22,7 @@ var {withTxnAndAutoRetry, isKilledSessionCode} = (function() { // Returns if the code is one that could come from a session being killed. function isKilledSessionCode(code) { - return code === ErrorCodes.Interrupted || code === ErrorCodes.CursorKilled || - code === ErrorCodes.CursorNotFound; + return KilledSessionUtil.isKilledSessionCode(code); } // Returns true if the transaction can be retried with a higher transaction number after the @@ -49,10 +50,8 @@ var {withTxnAndAutoRetry, isKilledSessionCode} = (function() { return true; } - if (retryOnKilledSession && - (isKilledSessionCode(e.code) || - (Array.isArray(e.writeErrors) && - e.writeErrors.every(writeError => isKilledSessionCode(writeError.code))))) { + if (retryOnKilledSession && KilledSessionUtil.hasKilledSessionError(e)) { + print("-=-=-=- Retrying transaction after killed session error: " + tojsononeline(e)); return true; } @@ -68,7 +67,10 @@ var {withTxnAndAutoRetry, isKilledSessionCode} = (function() { // If commit fails with a killed session code, the commit must be retried because it is // unknown if the interrupted commit succeeded. This is safe because commitTransaction // is a retryable write. - if (!commitRes.ok && retryOnKilledSession && isKilledSessionCode(commitRes.code)) { + const failedWithInterruption = + !commitRes.ok && KilledSessionUtil.isKilledSessionCode(commitRes.code); + const wcFailedWithInterruption = KilledSessionUtil.hasKilledSessionWCError(commitRes); + if (retryOnKilledSession && (failedWithInterruption || wcFailedWithInterruption)) { print("-=-=-=- Retrying commit after killed session code, sessionId: " + tojsononeline(session.getSessionId()) + ", txnNumber: " + tojsononeline(session.getTxnNumber_forTesting()) + diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js index a70b9e70c6d..528ac4f636a 100644 --- a/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js +++ b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js @@ -7,7 +7,6 @@ * @tags: [ * requires_sharding, * assumes_balancer_off, - * requires_non_retryable_writes, * uses_transactions, * ] */ @@ -152,7 +151,8 @@ var $config = extendWorkload($config, function($config, $super) { $config.data.runInTransactionOrRetryableWrite = function runInTransactionOrRetryableWrite( functionToRun, wrapInTransaction) { if (wrapInTransaction) { - withTxnAndAutoRetry(this.session, functionToRun); + withTxnAndAutoRetry( + this.session, functionToRun, {retryOnKilledSession: this.retryOnKilledSession}); } else { functionToRun(); } @@ -191,8 +191,22 @@ var $config = extendWorkload($config, function($config, $super) { jsTestLog(logString); }; + function assertDocWasUpdated(collection, idToUpdate, currentShardKey, newShardKey, newCounter) { + assertWhenOwnColl.isnull(collection.findOne({_id: idToUpdate, skey: currentShardKey})); + assertWhenOwnColl.eq(collection.findOne({_id: idToUpdate, skey: newShardKey}), + {_id: idToUpdate, skey: newShardKey, counter: newCounter}); + } + + function wasDocUpdated(collection, idToUpdate, currentShardKey) { + const docWithOldShardKey = collection.findOne({_id: idToUpdate, skey: currentShardKey}); + return !docWithOldShardKey; + } + $config.data.findAndModifyShardKey = function findAndModifyShardKey( db, collName, {wrapInTransaction, moveAcrossChunks} = {}) { + // This function uses a different session than the transaction wrapping logic expects. + fsm.forceRunningOutsideTransaction(this); + const collection = this.session.getDatabase(db.getName()).getCollection(collName); const shardKeyField = this.shardKeyField[collName]; @@ -222,6 +236,32 @@ var $config = extendWorkload($config, function($config, $super) { this.expectedCounters[idToUpdate] = counterForId + 1; } catch (e) { + if (e.code === ErrorCodes.IncompleteTransactionHistory && !wrapInTransaction) { + print("Handling IncompleteTransactionHistory error for findAndModify: " + + tojsononeline(e)); + + // With internal transactions enabled, IncompleteTransactionHistory means the + // write succeeded, so we can treat this error as success. + if (this.internalTransactionsEnabled) { + print("Internal transactions are on so assuming the operation succeeded"); + assertDocWasUpdated( + collection, idToUpdate, currentShardKey, newShardKey, counterForId + 1); + this.expectedCounters[idToUpdate] = counterForId + 1; + return; + } + + // With the previous implementation, this could also mean the first attempt at + // handling a WCOS error failed transiently, so we have to detect whether the + // operation succeeded or failed before continuing. + const docWasUpdated = wasDocUpdated(collection, idToUpdate, currentShardKey); + print("Was the document updated? " + docWasUpdated); + if (docWasUpdated) { + // The operation succeeded, so update the in-memory counters. + this.expectedCounters[idToUpdate] = counterForId + 1; + } + return; + } + const msg = e.errmsg ? e.errmsg : e.message; if (this.isUpdateShardKeyErrorAcceptable(e.code, msg, e.errorLabels)) { print("Ignoring acceptable updateShardKey error attempting to update the" + @@ -229,10 +269,8 @@ var $config = extendWorkload($config, function($config, $super) { ": " + e); assertWhenOwnColl.neq( collection.findOne({_id: idToUpdate, skey: currentShardKey}), null); - assertWhenOwnColl.eq( - collection.findOne( - {_id: idToUpdate, skey: newShardKey, counter: counterForId}), - null); + assertWhenOwnColl.eq(collection.findOne({_id: idToUpdate, skey: newShardKey}), + null); return; } throw e; @@ -246,6 +284,9 @@ var $config = extendWorkload($config, function($config, $super) { $config.data.updateShardKey = function updateShardKey( db, collName, {moveAcrossChunks, wrapInTransaction} = {}) { + // This function uses a different session than the transaction wrapping logic expects. + fsm.forceRunningOutsideTransaction(this); + const collection = this.session.getDatabase(db.getName()).getCollection(collName); const shardKeyField = this.shardKeyField[collName]; @@ -273,18 +314,41 @@ var $config = extendWorkload($config, function($config, $super) { const err = updateResult instanceof WriteResult ? updateResult.getWriteError() : updateResult; + if (err.code === ErrorCodes.IncompleteTransactionHistory && !wrapInTransaction) { + print("Handling IncompleteTransactionHistory error for update, caught error: " + + tojsononeline(e) + ", err: " + tojsononeline(err)); + + // TODO SERVER-59186: Once updates that change the shard key use the transaction + // API, expect this to mean the write succeeded when the API is enabled. + + // With the original implementation, this error could mean the write succeeded + // or failed, so we have to detect the outcome before continuing. + const docWasUpdated = wasDocUpdated(collection, idToUpdate, currentShardKey); + print("Was the document updated? " + docWasUpdated); + if (docWasUpdated) { + // The operation succeeded, so update the in-memory counters. + this.expectedCounters[idToUpdate] = counterForId + 1; + } + return; + } + if (this.isUpdateShardKeyErrorAcceptable(err.code, err.errmsg, err.errorLabels)) { print("Ignoring acceptable updateShardKey error attempting to update the" + "document with _id: " + idToUpdate + " and shardKey: " + currentShardKey + ": " + tojson(updateResult)); assertWhenOwnColl.neq( collection.findOne({_id: idToUpdate, skey: currentShardKey}), null); - assertWhenOwnColl.eq( - collection.findOne( - {_id: idToUpdate, skey: newShardKey, counter: counterForId}), - null); + assertWhenOwnColl.eq(collection.findOne({_id: idToUpdate, skey: newShardKey}), + null); return; } + + // Put the write result's code on the thrown exception, if there is one, so it's in + // the expected format for any higher level error handling logic. + if (!e.hasOwnProperty("code") && err.code) { + e.code = err.code; + } + throw e; } }; @@ -350,7 +414,23 @@ var $config = extendWorkload($config, function($config, $super) { $config.states.init = function init(db, collName, connCache) { $super.states.init.apply(this, arguments); - this.session = db.getMongo().startSession({causalConsistency: false, retryWrites: true}); + // With the original update shard key implementation, retrying a retryable write that was + // converted into a distributed transaction will immediately fail with + // IncompleteTransactionHistory. In suites where that transaction may be interrupted during + // two phase commit and the test retries on this, the retry may return the error before the + // transaction has left prepare, so any subsequent non-causally consistent reads may read + // the preimage of the data in prepare. This test expects to read the documents written to + // by the update shard key transaction after this error, so use a causally consistent + // session to guarantee that in these suites. + // + // TODO SERVER-59186: With the new implementation, IncompleteTransactionHistory is only + // returned after the shard owning the preimage document leaves prepare, and since + // coordinateCommitReturnImmediatelyAfterPersistingDecision is false in these suites, any + // subsequent reads should always read the transaction's writes on all shards without causal + // consistency, so use a non causally consistent session with internal transactions. + const shouldUseCausalConsistency = this.runningWithStepdowns || this.retryOnKilledSession; + this.session = db.getMongo().startSession( + {causalConsistency: shouldUseCausalConsistency, retryWrites: true}); // Assign a default counter value to each document owned by this thread. db[collName].find({tid: this.tid}).forEach(doc => { @@ -390,6 +470,12 @@ var $config = extendWorkload($config, function($config, $super) { db.adminCommand({split: ns, middle: {skey: medianIdForThread}})); } db.printShardingStatus(); + + this.internalTransactionsEnabled = + assert + .commandWorked( + db.adminCommand({getParameter: 1, featureFlagInternalTransactions: 1})) + .featureFlagInternalTransactions.value; }; /** @@ -405,6 +491,18 @@ var $config = extendWorkload($config, function($config, $super) { }); }; + const origMoveChunk = $config.states.moveChunk; + $config.states.moveChunk = function moveChunk(db, collName, connCache) { + if (this.internalTransactionsEnabled && + (this.retryOnKilledSession || this.runningWithStepdowns)) { + // TODO SERVER-58758: Remove this when retryable transaction history is migrated. + print("Skipping moveChunk until transaction API use supports retries"); + return; + } + + origMoveChunk.apply(this, arguments); + }; + $config.transitions = { init: { moveChunk: 0.2, @@ -540,4 +638,4 @@ var $config = extendWorkload($config, function($config, $super) { }; return $config; -}); \ No newline at end of file +}); diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key_kill_sessions.js b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key_kill_sessions.js new file mode 100644 index 00000000000..d4c324f8d95 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key_kill_sessions.js @@ -0,0 +1,103 @@ +'use strict'; + +/** + * Performs updates that will change a document's shard key while migrating chunks and killing + * sessions. Only runs updates that cause a document to change shards to increase the odds of + * killing an internal transaction. + * + * @tags: [ + * requires_sharding, + * assumes_balancer_off, + * uses_transactions, + * ] + */ +load('jstests/concurrency/fsm_libs/extend_workload.js'); +load('jstests/concurrency/fsm_workload_helpers/kill_session.js'); // for killSession +load('jstests/concurrency/fsm_workloads/random_moveChunk_update_shard_key.js'); +load('jstests/libs/override_methods/retry_on_killed_session.js'); + +var $config = extendWorkload($config, function($config, $super) { + $config.data.retryOnKilledSession = true; + + // The base workload uses connCache, so wrap killSessions so the fsm runner doesn't complain + // that it only expects 2 arguments. + $config.states.killSession = function wrappedKillSession(db, collName, connCache) { + return killSession(db, collName); + }; + + $config.transitions = { + init: { + killSession: 0.1, + moveChunk: 0.1, + findAndModifyWithRetryableWriteAcrossChunks: 0.2, + findAndModifyWithTransactionAcrossChunks: 0.2, + updateWithRetryableWriteAcrossChunks: 0.2, + updateWithTransactionAcrossChunks: 0.2, + }, + killSession: { + killSession: 0.1, + moveChunk: 0.1, + findAndModifyWithRetryableWriteAcrossChunks: 0.15, + findAndModifyWithTransactionAcrossChunks: 0.15, + updateWithRetryableWriteAcrossChunks: 0.15, + updateWithTransactionAcrossChunks: 0.15, + verifyDocuments: 0.2 + }, + moveChunk: { + killSession: 0.1, + moveChunk: 0.1, + findAndModifyWithRetryableWriteAcrossChunks: 0.15, + findAndModifyWithTransactionAcrossChunks: 0.15, + updateWithRetryableWriteAcrossChunks: 0.15, + updateWithTransactionAcrossChunks: 0.15, + verifyDocuments: 0.2 + }, + findAndModifyWithRetryableWriteAcrossChunks: { + killSession: 0.1, + moveChunk: 0.1, + findAndModifyWithRetryableWriteAcrossChunks: 0.15, + findAndModifyWithTransactionAcrossChunks: 0.15, + updateWithRetryableWriteAcrossChunks: 0.15, + updateWithTransactionAcrossChunks: 0.15, + verifyDocuments: 0.2 + }, + findAndModifyWithTransactionAcrossChunks: { + killSession: 0.1, + moveChunk: 0.1, + findAndModifyWithRetryableWriteAcrossChunks: 0.15, + findAndModifyWithTransactionAcrossChunks: 0.15, + updateWithRetryableWriteAcrossChunks: 0.15, + updateWithTransactionAcrossChunks: 0.15, + verifyDocuments: 0.2 + }, + updateWithRetryableWriteAcrossChunks: { + killSession: 0.1, + moveChunk: 0.1, + findAndModifyWithRetryableWriteAcrossChunks: 0.15, + findAndModifyWithTransactionAcrossChunks: 0.15, + updateWithRetryableWriteAcrossChunks: 0.15, + updateWithTransactionAcrossChunks: 0.15, + verifyDocuments: 0.2 + }, + updateWithTransactionAcrossChunks: { + killSession: 0.1, + moveChunk: 0.1, + findAndModifyWithRetryableWriteAcrossChunks: 0.15, + findAndModifyWithTransactionAcrossChunks: 0.15, + updateWithRetryableWriteAcrossChunks: 0.15, + updateWithTransactionAcrossChunks: 0.15, + verifyDocuments: 0.2 + }, + verifyDocuments: { + killSession: 0.1, + moveChunk: 0.1, + findAndModifyWithRetryableWriteAcrossChunks: 0.15, + findAndModifyWithTransactionAcrossChunks: 0.15, + updateWithRetryableWriteAcrossChunks: 0.15, + updateWithTransactionAcrossChunks: 0.15, + verifyDocuments: 0.2 + }, + }; + + return $config; +}); diff --git a/jstests/hooks/run_dbcheck_background.js b/jstests/hooks/run_dbcheck_background.js index 660fe3321e6..55040df5739 100644 --- a/jstests/hooks/run_dbcheck_background.js +++ b/jstests/hooks/run_dbcheck_background.js @@ -14,6 +14,10 @@ if (typeof db === 'undefined') { TestData = TestData || {}; +// Disable implicit sessions so FSM workloads that kill random sessions won't interrupt the +// operations in this test that aren't resilient to interruptions. +TestData.disableImplicitSessions = true; + const conn = db.getMongo(); const topology = DiscoverTopology.findConnectedNodes(conn); diff --git a/jstests/libs/killed_session_util.js b/jstests/libs/killed_session_util.js new file mode 100644 index 00000000000..5e3dbe6db6b --- /dev/null +++ b/jstests/libs/killed_session_util.js @@ -0,0 +1,26 @@ +/** + * Utilities for testing when sessions are killed. + */ +var KilledSessionUtil = (function() { + // Returns if the code is one that could come from a session being killed. + function isKilledSessionCode(code) { + return code === ErrorCodes.Interrupted || code === ErrorCodes.CursorKilled || + code === ErrorCodes.CursorNotFound; + } + + function hasKilledSessionError(errOrRes) { + return isKilledSessionCode(errOrRes.code) || + (Array.isArray(errOrRes.writeErrors) && + errOrRes.writeErrors.every(writeError => isKilledSessionCode(writeError.code))); + } + + function hasKilledSessionWCError(res) { + return res.writeConcernError && isKilledSessionCode(res.writeConcernError.code); + } + + return { + isKilledSessionCode, + hasKilledSessionError, + hasKilledSessionWCError, + }; +})(); diff --git a/jstests/libs/override_methods/retry_on_killed_session.js b/jstests/libs/override_methods/retry_on_killed_session.js new file mode 100644 index 00000000000..b776a64b184 --- /dev/null +++ b/jstests/libs/override_methods/retry_on_killed_session.js @@ -0,0 +1,64 @@ +/** + * Overrides Mongo.prototype.runCommand to retry on errors that come from an operation's session + * being killed. + */ +(function() { +"use strict"; + +load("jstests/libs/killed_session_util.js"); +load("jstests/libs/override_methods/override_helpers.js"); + +const mongoRunCommandOriginal = Mongo.prototype.runCommand; + +Mongo.prototype.runCommand = function runCommand(dbName, cmdObj, options) { + return runWithKilledSessionRetries(this, cmdObj, mongoRunCommandOriginal, arguments); +}; + +// Returns if the command should retry on killed session errors. +function shouldRetry(cmdObj) { + if (cmdObj.hasOwnProperty("autocommit")) { + // Transactions are retried at a higher level. + return false; + } + + // Assume every other operation can be retried. Callers should guarantee only idempotent + // operations are run when this override is active. + return true; +} + +function runWithKilledSessionRetries(mongo, cmdObj, clientFunction, clientFunctionArguments) { + if (!shouldRetry(cmdObj)) { + return clientFunction.apply(mongo, clientFunctionArguments); + } + + while (true) { + try { + const res = clientFunction.apply(mongo, clientFunctionArguments); + + if (KilledSessionUtil.hasKilledSessionError(res)) { + print("-=-=-=- Retrying " + tojsononeline(cmdObj) + + " after killed session error response: " + tojsononeline(res)); + continue; + } + + if (KilledSessionUtil.hasKilledSessionWCError(res)) { + print("-=-=-=- Retrying " + tojsononeline(cmdObj) + + " after killed session write concern error response: " + tojsononeline(res)); + continue; + } + + return res; + } catch (e) { + if (KilledSessionUtil.hasKilledSessionError(e)) { + print("-=-=-=- Retrying " + tojsononeline(cmdObj) + + " after thrown killed session error: " + tojsononeline(e)); + continue; + } + throw e; + } + } +} + +OverrideHelpers.prependOverrideInParallelShell( + "jstests/libs/override_methods/retry_on_killed_session.js"); +})(); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index ae2a12fe2e7..f932c572fbf 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -710,6 +710,16 @@ void TransactionParticipant::Participant::_beginOrContinueRetryableWrite( retryableWriteTxnParticipantCatalog.addParticipant(*this); } else { // Retrying a retryable write. + + // If this retryable write's transaction id has been converted to a transaction, and that + // transaction is in prepare, wait for it to exit prepare before throwing + // IncompleteTransactionHistory so the error response's operationTime is inclusive of the + // transaction's 2PC decision, guaranteeing causally consistent sessions will always read + // the transaction's writes. + uassert(ErrorCodes::PreparedTransactionInProgress, + "Retryable write that has been converted to a transaction is in prepare", + !o().txnState.isInSet(TransactionState::kPrepared)); + uassert(ErrorCodes::IncompleteTransactionHistory, "Cannot retry a retryable write that has been converted into a transaction", o().txnState.isInRetryableWriteMode()); -- cgit v1.2.1