summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/txns/new_transactions_on_session_with_prepared_txn_block_behind_prepare.js155
-rw-r--r--jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js56
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp19
-rw-r--r--src/mongo/db/service_entry_point_common.cpp30
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp12
-rw-r--r--src/mongo/db/session_catalog_mongod.h5
-rw-r--r--src/mongo/db/transaction_participant_test.cpp2
7 files changed, 200 insertions, 79 deletions
diff --git a/jstests/core/txns/new_transactions_on_session_with_prepared_txn_block_behind_prepare.js b/jstests/core/txns/new_transactions_on_session_with_prepared_txn_block_behind_prepare.js
new file mode 100644
index 00000000000..f83de45dd3d
--- /dev/null
+++ b/jstests/core/txns/new_transactions_on_session_with_prepared_txn_block_behind_prepare.js
@@ -0,0 +1,155 @@
+/**
+ * Tests that new transactions on a session block behind an existing prepared transaction on the
+ * session.
+ *
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+
+(function() {
+"use strict";
+load("jstests/core/txns/libs/prepare_helpers.js");
+load("jstests/libs/curop_helpers.js"); // for waitForCurOpByFailPoint().
+load("jstests/libs/parallel_shell_helpers.js");
+
+/**
+ * Launches a parallel shell to start a new transaction on the session with the given lsid. It
+ * performs an insert and then commits. Assumes that there will be an already-prepared transaction
+ * on the session, and blocks using a failpoint until the transaction in the parallel shell has
+ * begun to block behind the prepared transaction.
+ */
+function runConcurrentTransactionOnSession(dbName, collName, lsid) {
+ var awaitShell;
+ try {
+ // Turn on failpoint that parallel shell will hit when blocked on prepare.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: 'waitAfterNewStatementBlocksBehindPrepare', mode: "alwaysOn"}));
+
+ function runTransactionOnSession(dbName, collName, lsid) {
+ // Use txnNumber : 1 since the active txnNumber will be 0.
+ const txnNumber = NumberLong(1);
+ // Try to do an insert in a new transaction on the same session. Note that we're
+ // manually including the lsid and stmtId instead of using the session object directly
+ // since there's no way to share a session with the parallel shell.
+ assert.commandWorked(db.getSiblingDB(dbName).runCommand({
+ insert: collName,
+ documents: [{x: "blocks_behind_prepare"}],
+ readConcern: {level: "snapshot"},
+ lsid: lsid,
+ txnNumber: txnNumber,
+ stmtId: NumberInt(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ assert.commandWorked(db.adminCommand(
+ {commitTransaction: 1, lsid: lsid, txnNumber: txnNumber, autocommit: false}));
+ }
+ // Launch a parallel shell to start a new transaction, insert a document, and commit. These
+ // operations should block behind the previous prepared transaction on the session.
+ awaitShell =
+ startParallelShell(funWithArgs(runTransactionOnSession, dbName, collName, lsid));
+
+ // Wait until parallel shell insert is blocked on prepare.
+ waitForCurOpByFailPointNoNS(db, "waitAfterNewStatementBlocksBehindPrepare");
+ } finally {
+ // Disable failpoint to allow the parallel shell to continue - it should still be blocked on
+ // prepare. This is needed in a finally block so that if something fails we're guaranteed to
+ // turn this failpoint off, so that it doesn't cause problems for subsequent tests.
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: 'waitAfterNewStatementBlocksBehindPrepare', mode: "off"}));
+ }
+
+ return awaitShell;
+}
+
+/**
+ * Common variables and setup.
+ */
+const dbName = "test";
+const collName = jsTestName();
+const testDB = db.getSiblingDB(dbName);
+
+testDB.runCommand({drop: collName});
+assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}}));
+
+(() => {
+ jsTestLog(
+ "New transactions on a session should block behind an existing prepared transaction on that session until it aborts.");
+
+ const session = testDB.getMongo().startSession();
+ const sessionDb = session.getDatabase(dbName);
+ const sessionColl = sessionDb.getCollection(collName);
+ const lsid = session.getSessionId();
+
+ // Start and prepare a transaction.
+ session.startTransaction();
+ assert.commandWorked(sessionColl.insert({x: "foo"}));
+ PrepareHelpers.prepareTransaction(session);
+
+ // Launch a concurrent transaction which should block behind the active prepared transaction.
+ const awaitShell = runConcurrentTransactionOnSession(dbName, collName, lsid);
+
+ // Abort the original transaction - this should allow the parallel shell to continue and start a
+ // new transaction.
+ assert.commandWorked(session.abortTransaction_forTesting());
+
+ awaitShell();
+
+ session.endSession();
+})();
+
+(() => {
+ jsTestLog(
+ "New transactions on a session should block behind an existing prepared transaction on that session until it commits.");
+
+ const session = testDB.getMongo().startSession();
+ const sessionDb = session.getDatabase(dbName);
+ const sessionColl = sessionDb.getCollection(collName);
+ const lsid = session.getSessionId();
+
+ // Start and prepare a transaction.
+ session.startTransaction();
+ assert.commandWorked(sessionColl.insert({x: "foo"}));
+ const prepareTimestamp = PrepareHelpers.prepareTransaction(session);
+
+ // Launch a concurrent transaction which should block behind the active prepared transaction.
+ const awaitShell = runConcurrentTransactionOnSession(dbName, collName, lsid);
+
+ // Commit the original transaction - this should allow the parallel shell to continue and start
+ // a new transaction. Not using PrepareHelpers.commitTransaction because it calls
+ // commitTransaction twice, and the second call races with the second transaction the test
+ // started.
+ assert.commandWorked(session.getDatabase('admin').adminCommand(
+ {commitTransaction: 1, commitTimestamp: prepareTimestamp}));
+
+ awaitShell();
+
+ session.endSession();
+})();
+
+(() => {
+ jsTestLog(
+ "Test error precedence when executing a malformed command during a prepared transaction.");
+
+ const session = testDB.getMongo().startSession();
+ const sessionDb = session.getDatabase(dbName);
+ const sessionColl = sessionDb.getCollection(collName);
+ session.startTransaction();
+ assert.commandWorked(sessionColl.insert({_id: "insert-1"}));
+ PrepareHelpers.prepareTransaction(session);
+
+ // The following command specifies txnNumber: 2 without startTransaction: true.
+ assert.commandFailedWithCode(sessionDb.runCommand({
+ insert: collName,
+ documents: [{_id: "no_such_txn"}],
+ txnNumber: NumberLong(2),
+ stmtId: NumberInt(0),
+ autocommit: false
+ }),
+ ErrorCodes.NoSuchTransaction);
+
+ assert.commandWorked(session.abortTransaction_forTesting());
+
+ session.endSession();
+})();
+}());
diff --git a/jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js b/jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js
deleted file mode 100644
index ce41fb98620..00000000000
--- a/jstests/core/txns/no_new_transactions_when_prepared_transaction_in_progress.js
+++ /dev/null
@@ -1,56 +0,0 @@
-/**
- * Tests that we cannot start a new transaction when a prepared transaction exists on the session.
- * @tags: [uses_transactions, uses_prepare_transaction]
- *
- */
-
-(function() {
-"use strict";
-load("jstests/core/txns/libs/prepare_helpers.js");
-
-const dbName = "test";
-const collName = "no_new_transactions_when_prepared_transaction_in_progress";
-const testDB = db.getSiblingDB(dbName);
-
-testDB.runCommand({drop: collName, writeConcern: {w: "majority"}});
-
-assert.commandWorked(testDB.runCommand({create: collName, writeConcern: {w: "majority"}}));
-
-const sessionOptions = {
- causalConsistency: false
-};
-const session = testDB.getMongo().startSession(sessionOptions);
-const sessionDb = session.getDatabase(dbName);
-const sessionColl = sessionDb.getCollection(collName);
-
-jsTestLog("Test starting a new transaction while an existing prepared transaction exists on the " +
- "session.");
-session.startTransaction();
-assert.commandWorked(sessionColl.insert({_id: "insert-1"}));
-PrepareHelpers.prepareTransaction(session);
-assert.commandFailedWithCode(sessionDb.runCommand({
- insert: collName,
- documents: [{_id: "cannot_start"}],
- readConcern: {level: "snapshot"},
- txnNumber: NumberLong(1),
- stmtId: NumberInt(0),
- startTransaction: true,
- autocommit: false
-}),
- ErrorCodes.PreparedTransactionInProgress);
-
-jsTestLog(
- "Test error precedence when executing a malformed command during a prepared transaction.");
-// The following command specifies txnNumber: 2 without startTransaction: true.
-assert.commandFailedWithCode(sessionDb.runCommand({
- insert: collName,
- documents: [{_id: "no_such_txn"}],
- txnNumber: NumberLong(2),
- stmtId: NumberInt(0),
- autocommit: false
-}),
- ErrorCodes.NoSuchTransaction);
-assert.commandWorked(session.abortTransaction_forTesting());
-
-session.endSession();
-}());
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 899cc6b81b2..f939c320bc8 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -85,6 +85,10 @@ public:
Session* const session = OperationContextSession::get(opCtx);
if (session) {
+ if (auto txnParticipant = TransactionParticipant::get(opCtx)) {
+ txnParticipant.stashTransactionResources(opCtx);
+ }
+
MongoDOperationContextSession::checkIn(opCtx);
}
_yielded = (session != nullptr);
@@ -97,13 +101,14 @@ public:
// unblocking this thread of execution. However, we must wait until the child operation
// on this shard finishes so we can get the session back. This may limit the throughput
// of the operation, but it's correct.
- MongoDOperationContextSession::checkOut(opCtx,
- // Assumes this is only called from the
- // 'aggregate' or 'getMore' commands. The code
- // which relies on this parameter does not
- // distinguish/care about the difference so we
- // simply always pass 'aggregate'.
- "aggregate");
+ MongoDOperationContextSession::checkOut(opCtx);
+
+ if (auto txnParticipant = TransactionParticipant::get(opCtx)) {
+ // Assumes this is only called from the 'aggregate' or 'getMore' commands. The code
+ // which relies on this parameter does not distinguish/care about the difference so
+ // we simply always pass 'aggregate'.
+ txnParticipant.unstashTransactionResources(opCtx, "aggregate");
+ }
}
}
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 26b4c75ca18..f3760c3c8f4 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -101,6 +101,7 @@ MONGO_FAIL_POINT_DEFINE(respondWithNotPrimaryInCommandDispatch);
MONGO_FAIL_POINT_DEFINE(skipCheckingForNotMasterInCommandDispatch);
MONGO_FAIL_POINT_DEFINE(waitAfterReadCommandFinishesExecution);
MONGO_FAIL_POINT_DEFINE(sleepMillisAfterCommandExecutionBegins);
+MONGO_FAIL_POINT_DEFINE(waitAfterNewStatementBlocksBehindPrepare);
// Tracks the number of times a legacy unacknowledged write failed due to
// not master error resulted in network disconnection.
@@ -414,10 +415,31 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
auto txnParticipant = TransactionParticipant::get(opCtx);
if (!opCtx->getClient()->isInDirectClient()) {
- txnParticipant.beginOrContinue(opCtx,
- *sessionOptions.getTxnNumber(),
- sessionOptions.getAutocommit(),
- sessionOptions.getStartTransaction());
+ bool beganOrContinuedTxn{false};
+ // This loop allows new transactions on a session to block behind a previous prepared
+ // transaction on that session.
+ while (!beganOrContinuedTxn) {
+ try {
+ txnParticipant.beginOrContinue(opCtx,
+ *sessionOptions.getTxnNumber(),
+ sessionOptions.getAutocommit(),
+ sessionOptions.getStartTransaction());
+ beganOrContinuedTxn = true;
+ } catch (const ExceptionFor<ErrorCodes::PreparedTransactionInProgress>&) {
+ auto prepareCompleted = txnParticipant.onExitPrepare();
+
+ CurOpFailpointHelpers::waitWhileFailPointEnabled(
+ &waitAfterNewStatementBlocksBehindPrepare,
+ opCtx,
+ "waitAfterNewStatementBlocksBehindPrepare");
+
+ // Check the session back in and wait for ongoing prepared transaction to complete.
+ MongoDOperationContextSession::checkIn(opCtx);
+ prepareCompleted.wait(opCtx);
+ MongoDOperationContextSession::checkOut(opCtx);
+ }
+ }
+
// Create coordinator if needed. If "startTransaction" is present, it must be true.
if (sessionOptions.getStartTransaction()) {
// If this shard has been selected as the coordinator, set up the coordinator state
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index 86aeb61fdeb..a4200433a24 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -408,19 +408,13 @@ MongoDOperationContextSession::MongoDOperationContextSession(OperationContext* o
MongoDOperationContextSession::~MongoDOperationContextSession() = default;
void MongoDOperationContextSession::checkIn(OperationContext* opCtx) {
- if (auto txnParticipant = TransactionParticipant::get(opCtx)) {
- txnParticipant.stashTransactionResources(opCtx);
- }
-
OperationContextSession::checkIn(opCtx);
}
-void MongoDOperationContextSession::checkOut(OperationContext* opCtx, const std::string& cmdName) {
+void MongoDOperationContextSession::checkOut(OperationContext* opCtx) {
OperationContextSession::checkOut(opCtx);
-
- if (auto txnParticipant = TransactionParticipant::get(opCtx)) {
- txnParticipant.unstashTransactionResources(opCtx, cmdName);
- }
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.refreshFromStorageIfNeeded(opCtx);
}
MongoDOperationContextSessionWithoutRefresh::MongoDOperationContextSessionWithoutRefresh(
diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h
index 591d234029b..cd379edb8ec 100644
--- a/src/mongo/db/session_catalog_mongod.h
+++ b/src/mongo/db/session_catalog_mongod.h
@@ -96,10 +96,9 @@ public:
static void checkIn(OperationContext* opCtx);
/**
- * May only be called if the session is not checked out already. 'cmdType' is used to validate
- * that the expected transaction flow control is being obeyed.
+ * May only be called if the session is not checked out already.
*/
- static void checkOut(OperationContext* opCtx, const std::string& cmdName);
+ static void checkOut(OperationContext* opCtx);
private:
OperationContextSession _operationContextSession;
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index c80293146ad..a638c8f6e2d 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -743,6 +743,7 @@ TEST_F(TxnParticipantTest, KillOpBeforeCommittingPreparedTransaction) {
}
// Check the session back in.
+ txnParticipant.stashTransactionResources(opCtx());
sessionCheckout->checkIn(opCtx());
// The transaction state should have been unaffected.
@@ -783,6 +784,7 @@ TEST_F(TxnParticipantTest, KillOpBeforeAbortingPreparedTransaction) {
}
// Check the session back in.
+ txnParticipant.stashTransactionResources(opCtx());
sessionCheckout->checkIn(opCtx());
// The transaction state should have been unaffected.