diff options
7 files changed, 200 insertions, 79 deletions
diff --git a/jstests/core/txns/new_transactions_on_session_with_prepared_txn_block_behind_prepare.js b/jstests/core/txns/new_transactions_on_session_with_prepared_txn_block_behind_prepare.js new file mode 100644 index 00000000000..f83de45dd3d --- /dev/null +++ b/jstests/core/txns/new_transactions_on_session_with_prepared_txn_block_behind_prepare.js @@ -0,0 +1,155 @@ +/** + * Tests that new transactions on a session block behind an existing prepared transaction on the + * session. + * + * @tags: [uses_transactions, uses_prepare_transaction] + */ + +(function() { +"use strict"; +load("jstests/core/txns/libs/prepare_helpers.js"); +load("jstests/libs/curop_helpers.js"); // for waitForCurOpByFailPoint(). +load("jstests/libs/parallel_shell_helpers.js"); + +/** + * Launches a parallel shell to start a new transaction on the session with the given lsid. It + * performs an insert and then commits. Assumes that there will be an already-prepared transaction + * on the session, and blocks using a failpoint until the transaction in the parallel shell has + * begun to block behind the prepared transaction. + */ +function runConcurrentTransactionOnSession(dbName, collName, lsid) { + var awaitShell; + try { + // Turn on failpoint that parallel shell will hit when blocked on prepare. + assert.commandWorked(db.adminCommand( + {configureFailPoint: 'waitAfterNewStatementBlocksBehindPrepare', mode: "alwaysOn"})); + + function runTransactionOnSession(dbName, collName, lsid) { + // Use txnNumber : 1 since the active txnNumber will be 0. + const txnNumber = NumberLong(1); + // Try to do an insert in a new transaction on the same session. Note that we're + // manually including the lsid and stmtId instead of using the session object directly + // since there's no way to share a session with the parallel shell. + assert.commandWorked(db.getSiblingDB(dbName).runCommand({ + insert: collName, + documents: [{x: "blocks_behind_prepare"}], + readConcern: {level: "snapshot"}, + lsid: lsid, + txnNumber: txnNumber, + stmtId: NumberInt(0), + startTransaction: true, + autocommit: false + })); + + assert.commandWorked(db.adminCommand( + {commitTransaction: 1, lsid: lsid, txnNumber: txnNumber, autocommit: false})); + } + // Launch a parallel shell to start a new transaction, insert a document, and commit. These + // operations should block behind the previous prepared transaction on the session. + awaitShell = + startParallelShell(funWithArgs(runTransactionOnSession, dbName, collName, lsid)); + + // Wait until parallel shell insert is blocked on prepare. + waitForCurOpByFailPointNoNS(db, "waitAfterNewStatementBlocksBehindPrepare"); + } finally { + // Disable failpoint to allow the parallel shell to continue - it should still be blocked on + // prepare. This is needed in a finally block so that if something fails we're guaranteed to + // turn this failpoint off, so that it doesn't cause problems for subsequent tests. + assert.commandWorked(db.adminCommand( + {configureFailPoint: 'waitAfterNewStatementBlocksBehindPrepare', mode: "off"})); + } + + return awaitShell; +} + +/** + * Common variables and setup. + */ +const dbName = "test"; +const collName = jsTestName(); +const testDB = db.getSiblingDB(dbName); + +testDB.runCommand({drop: collName}); +assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}})); + +(() => { + jsTestLog( + "New transactions on a session should block behind an existing prepared transaction on that session until it aborts."); + + const session = testDB.getMongo().startSession(); + const sessionDb = session.getDatabase(dbName); + const sessionColl = sessionDb.getCollection(collName); + const lsid = session.getSessionId(); + + // Start and prepare a transaction. + session.startTransaction(); + assert.commandWorked(sessionColl.insert({x: "foo"})); + PrepareHelpers.prepareTransaction(session); + + // Launch a concurrent transaction which should block behind the active prepared transaction. + const awaitShell = runConcurrentTransactionOnSession(dbName, collName, lsid); + + // Abort the original transaction - this should allow the parallel shell to continue and start a + // new transaction. + assert.commandWorked(session.abortTransaction_forTesting()); + + awaitShell(); + + session.endSession(); +})(); + +(() => { + jsTestLog( + "New transactions on a session should block behind an existing prepared transaction on that session until it commits."); + + const session = testDB.getMongo().startSession(); + const sessionDb = session.getDatabase(dbName); + const sessionColl = sessionDb.getCollection(collName); + const lsid = session.getSessionId(); + + // Start and prepare a transaction. + session.startTransaction(); + assert.commandWorked(sessionColl.insert({x: "foo"})); + const prepareTimestamp = PrepareHelpers.prepareTransaction(session); + + // Launch a concurrent transaction which should block behind the active prepared transaction. + const awaitShell = runConcurrentTransactionOnSession(dbName, collName, lsid); + + // Commit the original transaction - this should allow the parallel shell to continue and start + // a new transaction. Not using PrepareHelpers.commitTransaction because it calls + // commitTransaction twice, and the second call races with the second transaction the test + // started. + assert.commandWorked(session.getDatabase('admin').adminCommand( + {commitTransaction: 1, commitTimestamp: prepareTimestamp})); + + awaitShell(); + + session.endSession(); +})(); + +(() => { + jsTestLog( + "Test error precedence when executing a malformed command during a prepared transaction."); + + const session = testDB.getMongo().startSession(); + const sessionDb = session.getDatabase(dbName); + const sessionColl = sessionDb.getCollection(collName); + session.startTransaction(); + assert.commandWorked(sessionColl.insert({_id: "insert-1"})); + PrepareHelpers.prepareTransaction(session); + + // The following command specifies txnNumber: 2 without startTransaction: true. + assert.commandFailedWithCode(sessionDb.runCommand({ + insert: collName, + documents: [{_id: "no_such_txn"}], + txnNumber: NumberLong(2), + stmtId: NumberInt(0), + autocommit: false + }), + ErrorCodes.NoSuchTransaction); + + assert.commandWorked(session.abortTransaction_forTesting()); + + session.endSession(); +})(); +}()); diff --git a/jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js b/jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js deleted file mode 100644 index ce41fb98620..00000000000 --- a/jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Tests that we cannot start a new transaction when a prepared transaction exists on the session. - * @tags: [uses_transactions, uses_prepare_transaction] - * - */ - -(function() { -"use strict"; -load("jstests/core/txns/libs/prepare_helpers.js"); - -const dbName = "test"; -const collName = "no_new_transactions_when_prepared_transaction_in_progress"; -const testDB = db.getSiblingDB(dbName); - -testDB.runCommand({drop: collName, writeConcern: {w: "majority"}}); - -assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}})); - -const sessionOptions = { - causalConsistency: false -}; -const session = testDB.getMongo().startSession(sessionOptions); -const sessionDb = session.getDatabase(dbName); -const sessionColl = sessionDb.getCollection(collName); - -jsTestLog("Test starting a new transaction while an existing prepared transaction exists on the " + - "session."); -session.startTransaction(); -assert.commandWorked(sessionColl.insert({_id: "insert-1"})); -PrepareHelpers.prepareTransaction(session); -assert.commandFailedWithCode(sessionDb.runCommand({ - insert: collName, - documents: [{_id: "cannot_start"}], - readConcern: {level: "snapshot"}, - txnNumber: NumberLong(1), - stmtId: NumberInt(0), - startTransaction: true, - autocommit: false -}), - ErrorCodes.PreparedTransactionInProgress); - -jsTestLog( - "Test error precedence when executing a malformed command during a prepared transaction."); -// The following command specifies txnNumber: 2 without startTransaction: true. -assert.commandFailedWithCode(sessionDb.runCommand({ - insert: collName, - documents: [{_id: "no_such_txn"}], - txnNumber: NumberLong(2), - stmtId: NumberInt(0), - autocommit: false -}), - ErrorCodes.NoSuchTransaction); -assert.commandWorked(session.abortTransaction_forTesting()); - -session.endSession(); -}()); diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 899cc6b81b2..f939c320bc8 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -85,6 +85,10 @@ public: Session* const session = OperationContextSession::get(opCtx); if (session) { + if (auto txnParticipant = TransactionParticipant::get(opCtx)) { + txnParticipant.stashTransactionResources(opCtx); + } + MongoDOperationContextSession::checkIn(opCtx); } _yielded = (session != nullptr); @@ -97,13 +101,14 @@ public: // unblocking this thread of execution. However, we must wait until the child operation // on this shard finishes so we can get the session back. This may limit the throughput // of the operation, but it's correct. - MongoDOperationContextSession::checkOut(opCtx, - // Assumes this is only called from the - // 'aggregate' or 'getMore' commands. The code - // which relies on this parameter does not - // distinguish/care about the difference so we - // simply always pass 'aggregate'. - "aggregate"); + MongoDOperationContextSession::checkOut(opCtx); + + if (auto txnParticipant = TransactionParticipant::get(opCtx)) { + // Assumes this is only called from the 'aggregate' or 'getMore' commands. The code + // which relies on this parameter does not distinguish/care about the difference so + // we simply always pass 'aggregate'. + txnParticipant.unstashTransactionResources(opCtx, "aggregate"); + } } } diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 26b4c75ca18..f3760c3c8f4 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -101,6 +101,7 @@ MONGO_FAIL_POINT_DEFINE(respondWithNotPrimaryInCommandDispatch); MONGO_FAIL_POINT_DEFINE(skipCheckingForNotMasterInCommandDispatch); MONGO_FAIL_POINT_DEFINE(waitAfterReadCommandFinishesExecution); MONGO_FAIL_POINT_DEFINE(sleepMillisAfterCommandExecutionBegins); +MONGO_FAIL_POINT_DEFINE(waitAfterNewStatementBlocksBehindPrepare); // Tracks the number of times a legacy unacknowledged write failed due to // not master error resulted in network disconnection. @@ -414,10 +415,31 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, auto txnParticipant = TransactionParticipant::get(opCtx); if (!opCtx->getClient()->isInDirectClient()) { - txnParticipant.beginOrContinue(opCtx, - *sessionOptions.getTxnNumber(), - sessionOptions.getAutocommit(), - sessionOptions.getStartTransaction()); + bool beganOrContinuedTxn{false}; + // This loop allows new transactions on a session to block behind a previous prepared + // transaction on that session. + while (!beganOrContinuedTxn) { + try { + txnParticipant.beginOrContinue(opCtx, + *sessionOptions.getTxnNumber(), + sessionOptions.getAutocommit(), + sessionOptions.getStartTransaction()); + beganOrContinuedTxn = true; + } catch (const ExceptionFor<ErrorCodes::PreparedTransactionInProgress>&) { + auto prepareCompleted = txnParticipant.onExitPrepare(); + + CurOpFailpointHelpers::waitWhileFailPointEnabled( + &waitAfterNewStatementBlocksBehindPrepare, + opCtx, + "waitAfterNewStatementBlocksBehindPrepare"); + + // Check the session back in and wait for ongoing prepared transaction to complete. + MongoDOperationContextSession::checkIn(opCtx); + prepareCompleted.wait(opCtx); + MongoDOperationContextSession::checkOut(opCtx); + } + } + // Create coordinator if needed. If "startTransaction" is present, it must be true. if (sessionOptions.getStartTransaction()) { // If this shard has been selected as the coordinator, set up the coordinator state diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 86aeb61fdeb..a4200433a24 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -408,19 +408,13 @@ MongoDOperationContextSession::MongoDOperationContextSession(OperationContext* o MongoDOperationContextSession::~MongoDOperationContextSession() = default; void MongoDOperationContextSession::checkIn(OperationContext* opCtx) { - if (auto txnParticipant = TransactionParticipant::get(opCtx)) { - txnParticipant.stashTransactionResources(opCtx); - } - OperationContextSession::checkIn(opCtx); } -void MongoDOperationContextSession::checkOut(OperationContext* opCtx, const std::string& cmdName) { +void MongoDOperationContextSession::checkOut(OperationContext* opCtx) { OperationContextSession::checkOut(opCtx); - - if (auto txnParticipant = TransactionParticipant::get(opCtx)) { - txnParticipant.unstashTransactionResources(opCtx, cmdName); - } + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.refreshFromStorageIfNeeded(opCtx); } MongoDOperationContextSessionWithoutRefresh::MongoDOperationContextSessionWithoutRefresh( diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h index 591d234029b..cd379edb8ec 100644 --- a/src/mongo/db/session_catalog_mongod.h +++ b/src/mongo/db/session_catalog_mongod.h @@ -96,10 +96,9 @@ public: static void checkIn(OperationContext* opCtx); /** - * May only be called if the session is not checked out already. 'cmdType' is used to validate - * that the expected transaction flow control is being obeyed. + * May only be called if the session is not checked out already. */ - static void checkOut(OperationContext* opCtx, const std::string& cmdName); + static void checkOut(OperationContext* opCtx); private: OperationContextSession _operationContextSession; diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index c80293146ad..a638c8f6e2d 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -743,6 +743,7 @@ TEST_F(TxnParticipantTest, KillOpBeforeCommittingPreparedTransaction) { } // Check the session back in. + txnParticipant.stashTransactionResources(opCtx()); sessionCheckout->checkIn(opCtx()); // The transaction state should have been unaffected. @@ -783,6 +784,7 @@ TEST_F(TxnParticipantTest, KillOpBeforeAbortingPreparedTransaction) { } // Check the session back in. + txnParticipant.stashTransactionResources(opCtx()); sessionCheckout->checkIn(opCtx()); // The transaction state should have been unaffected. |