summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTess Avitabile <tess.avitabile@mongodb.com>2018-03-26 16:00:59 -0400
committerTess Avitabile <tess.avitabile@mongodb.com>2018-03-30 12:04:49 -0400
commitfbbdb5643a3fab20a4c7875830382748a4ba75e4 (patch)
tree0885c60196b8e827d9288a330467934b3bf1fada
parent82fce7bd6f6e2838ce3ccc72474ff8b8ebe325a0 (diff)
downloadmongo-fbbdb5643a3fab20a4c7875830382748a4ba75e4.tar.gz
SERVER-34011 Concurrency between transaction and other threads that can abort transaction
-rw-r--r--jstests/core/txns/multi_statement_transaction_abort.js2
-rw-r--r--jstests/core/txns/no_implicit_collection_creation_in_txn.js6
-rw-r--r--jstests/multiVersion/libs/global_snapshot_reads_helpers.js47
-rw-r--r--jstests/noPassthrough/prepare_transaction.js3
-rw-r--r--jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js6
-rw-r--r--src/mongo/db/op_observer_impl.cpp2
-rw-r--r--src/mongo/db/session.cpp80
-rw-r--r--src/mongo/db/session.h7
-rw-r--r--src/mongo/db/session_test.cpp261
9 files changed, 352 insertions, 62 deletions
diff --git a/jstests/core/txns/multi_statement_transaction_abort.js b/jstests/core/txns/multi_statement_transaction_abort.js
index 044eb2aa375..902bd8d63fc 100644
--- a/jstests/core/txns/multi_statement_transaction_abort.js
+++ b/jstests/core/txns/multi_statement_transaction_abort.js
@@ -94,7 +94,7 @@
writeConcern: {w: "majority"},
txnNumber: NumberLong(txnNumber)
}),
- ErrorCodes.NoSuchTransaction);
+ ErrorCodes.TransactionAborted);
// Verify the documents are the same.
assert.eq({_id: "insert-1"}, testColl.findOne({_id: "insert-1"}));
assert.eq(null, testColl.findOne({_id: "insert-2"}));
diff --git a/jstests/core/txns/no_implicit_collection_creation_in_txn.js b/jstests/core/txns/no_implicit_collection_creation_in_txn.js
index 3e40369e847..a399ddf047e 100644
--- a/jstests/core/txns/no_implicit_collection_creation_in_txn.js
+++ b/jstests/core/txns/no_implicit_collection_creation_in_txn.js
@@ -49,7 +49,7 @@
commitTransaction: 1,
txnNumber: NumberLong(txnNumber),
}),
- ErrorCodes.NoSuchTransaction);
+ ErrorCodes.TransactionAborted);
assert.eq(null, testColl.findOne({_id: "doc"}));
jsTest.log("Cannot implicitly create a collection in a transaction using update.");
@@ -84,7 +84,7 @@
commitTransaction: 1,
txnNumber: NumberLong(txnNumber),
}),
- ErrorCodes.NoSuchTransaction);
+ ErrorCodes.TransactionAborted);
assert.eq(null, testColl.findOne({_id: "doc"}));
// Update without upsert=true succeeds when the collection does not exist.
@@ -137,7 +137,7 @@
commitTransaction: 1,
txnNumber: NumberLong(txnNumber),
}),
- ErrorCodes.NoSuchTransaction);
+ ErrorCodes.TransactionAborted);
assert.eq(null, testColl.findOne({_id: "doc"}));
// findAndModify without upsert=true succeeds when the collection does not exist.
diff --git a/jstests/multiVersion/libs/global_snapshot_reads_helpers.js b/jstests/multiVersion/libs/global_snapshot_reads_helpers.js
index 7cc214b01cf..32bdfcb5129 100644
--- a/jstests/multiVersion/libs/global_snapshot_reads_helpers.js
+++ b/jstests/multiVersion/libs/global_snapshot_reads_helpers.js
@@ -13,7 +13,7 @@ function supportsSnapshotReadConcern() {
/**
* Runs the given command on the given database, asserting the command failed or succeeded
- * depending on the value of expectSuccess.
+ * depending on the value of expectSuccess. Returns the last used txnNumber.
*/
function runCommandAndVerifyResponse(sessionDb, txnNumber, cmdObj, expectSuccess, expectedCode) {
if (expectSuccess) {
@@ -22,14 +22,18 @@ function runCommandAndVerifyResponse(sessionDb, txnNumber, cmdObj, expectSuccess
// noop writes may advance the majority commit point past the given atClusterTime
// resulting in a SnapshotTooOld error. Eventually the read should succeed, when all
// targeted shards are at the same cluster time, so retry until it does.
+ // A snapshot read may also fail with TransactionAborted if it encountered a StaleEpoch
+ // error while it was running.
assert.soon(() => {
const res = sessionDb.runCommand(cmdObj);
if (!res.ok) {
- assert.commandFailedWithCode(
- res,
- ErrorCodes.SnapshotTooOld,
- "expected command to fail with SnapshotTooOld, cmd: " + tojson(cmdObj));
- print("Retrying because of SnapshotTooOld error.");
+ assert(res.code === ErrorCodes.SnapshotTooOld ||
+ res.code === ErrorCodes.TransactionAborted,
+ "expected command to fail with SnapshotTooOld or TransactionAborted, cmd: " +
+ tojson(cmdObj) + ", result: " + tojson(res));
+ print("Retrying because of SnapshotTooOld or TransactionAborted error.");
+ txnNumber++;
+ cmdObj.txnNumber = NumberLong(txnNumber);
return false;
}
@@ -43,6 +47,7 @@ function runCommandAndVerifyResponse(sessionDb, txnNumber, cmdObj, expectSuccess
tojson(cmdObj) + ", expectedCode: " +
tojson(expectedCode));
}
+ return txnNumber;
}
/**
@@ -55,31 +60,33 @@ function verifyGlobalSnapshotReads(conn, expectSuccess, expectedCode) {
// Unsharded collection.
const unshardedDb = session.getDatabase("unshardedDb");
- runCommandAndVerifyResponse(
+ txnNumber = runCommandAndVerifyResponse(
unshardedDb,
txnNumber,
- {find: "unsharded", readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber++)},
+ {find: "unsharded", readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber)},
expectSuccess,
expectedCode);
// Sharded collection, one shard.
+ txnNumber++;
const shardedDb = session.getDatabase("shardedDb");
- runCommandAndVerifyResponse(shardedDb,
- txnNumber,
- {
- find: "sharded",
- filter: {x: 1},
- readConcern: {level: "snapshot"},
- txnNumber: NumberLong(txnNumber++)
- },
- expectSuccess,
- expectedCode);
+ txnNumber = runCommandAndVerifyResponse(shardedDb,
+ txnNumber,
+ {
+ find: "sharded",
+ filter: {x: 1},
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(txnNumber)
+ },
+ expectSuccess,
+ expectedCode);
// Sharded collection, all shards.
- runCommandAndVerifyResponse(
+ txnNumber++;
+ txnNumber = runCommandAndVerifyResponse(
shardedDb,
txnNumber,
- {find: "sharded", readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber++)},
+ {find: "sharded", readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber)},
expectSuccess,
expectedCode);
}
diff --git a/jstests/noPassthrough/prepare_transaction.js b/jstests/noPassthrough/prepare_transaction.js
index 673a93d5749..8f100c2fd6c 100644
--- a/jstests/noPassthrough/prepare_transaction.js
+++ b/jstests/noPassthrough/prepare_transaction.js
@@ -55,6 +55,7 @@
// The insert should be visible in this session, but because the prepare command immediately
// aborts afterwards, the transaction is rolled back and the insert is not visible.
assert.eq(null, testColl.findOne(doc1));
+ txnNumber++;
res = sessionDB.runCommand({find: collName, filter: doc1, txnNumber: NumberLong(txnNumber)});
assert.commandWorked(res);
assert.eq([], res.cursor.firstBatch);
@@ -89,6 +90,7 @@
// The update should be visible in this session, but because the prepare command immediately
// aborts afterwards, the transaction is rolled back and the update is not visible.
+ txnNumber++;
res = sessionDB.runCommand({find: collName, filter: doc2, txnNumber: NumberLong(txnNumber)});
assert.commandWorked(res);
assert.eq([], res.cursor.firstBatch);
@@ -125,6 +127,7 @@
// The delete should be visible in this session, but because the prepare command immediately
// aborts afterwards, the transaction is rolled back and the document is still visible.
+ txnNumber++;
res = sessionDB.runCommand({find: collName, filter: doc2, txnNumber: NumberLong(txnNumber)});
assert.commandWorked(res);
assert.eq([doc2], res.cursor.firstBatch);
diff --git a/jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js b/jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js
index aaae19766aa..02ee496faff 100644
--- a/jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js
+++ b/jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js
@@ -76,9 +76,9 @@
rst.waitForState(primary, ReplSetTest.State.SECONDARY);
// TODO SERVER-33690: Destroying stashed transaction resources should kill the cursor, so this
- // getMore should fail.
- assert.commandWorked(sessionDB.runCommand(
- {getMore: res.cursor.id, collection: collName, txnNumber: NumberLong(0)}));
+ // getMore should fail with CursorNotFound.
+ assert.commandFailedWithCode(
+ sessionDB.runCommand({getMore: res.cursor.id, collection: collName}), 50740);
rst.stopSet();
})();
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 1e7f3e67db7..c3216b51715 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -840,7 +840,7 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) {
invariant(opCtx->getTxnNumber());
Session* const session = OperationContextSession::get(opCtx);
invariant(session);
- auto stmts = session->endTransactionAndRetrieveOperations();
+ auto stmts = session->endTransactionAndRetrieveOperations(opCtx);
// It is possible that the transaction resulted in no changes. In that case, we should
// not write an empty applyOps entry.
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index f4175a5a85b..9cee77ba3f5 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -560,17 +560,10 @@ void Session::stashTransactionResources(OperationContext* opCtx) {
stdx::lock_guard<Client> lk(*opCtx->getClient());
stdx::unique_lock<stdx::mutex> lg(_mutex);
- if (*opCtx->getTxnNumber() != _activeTxnNumber) {
- // The session is checked out, so _activeTxnNumber cannot advance due to a user operation.
- // However, when a chunk is migrated, session and transaction information is copied from the
- // donor shard to the recipient. This occurs outside of the check-out mechanism and can lead
- // to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort
- // the current transaction. Note that it would indicate a user bug to have a newer
- // transaction on one shard while an older transaction is still active on another shard.
- uasserted(ErrorCodes::TransactionAborted,
- str::stream() << "Transaction aborted. Active txnNumber is now "
- << _activeTxnNumber);
- }
+ // Always check '_activeTxnNumber', since it can be modified by migration, which does not check
+ // out the session. We intentionally do not error if _txnState=kAborted, since we expect this
+ // function to be called at the end of the 'abortTransaction' command.
+ _checkIsActiveTransaction(lg, *opCtx->getTxnNumber());
if (_txnState != MultiDocumentTransactionState::kInProgress &&
_txnState != MultiDocumentTransactionState::kInSnapshotRead) {
@@ -610,20 +603,13 @@ void Session::unstashTransactionResources(OperationContext* opCtx) {
// it doesn't go away, and then lock the Session owned by that client.
stdx::lock_guard<Client> lk(*opCtx->getClient());
stdx::lock_guard<stdx::mutex> lg(_mutex);
- if (opCtx->getTxnNumber() < _activeTxnNumber) {
- // The session is checked out, so _activeTxnNumber cannot advance due to a user
- // operation.
- // However, when a chunk is migrated, session and transaction information is copied from
- // the donor shard to the recipient. This occurs outside of the check-out mechanism and
- // can lead to a higher _activeTxnNumber during the lifetime of a checkout. If that
- // occurs, we abort the current transaction. Note that it would indicate a user bug to
- // have a newer transaction on one shard while an older transaction is still active on
- // another shard.
- uasserted(ErrorCodes::TransactionAborted,
- str::stream() << "Transaction aborted. Active txnNumber is now "
- << _activeTxnNumber);
- return;
- }
+
+ // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session
+ // kill and migration, which do not check out the session.
+ _checkIsActiveTransaction(lg, *opCtx->getTxnNumber());
+ uassert(ErrorCodes::TransactionAborted,
+ str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.",
+ _txnState != MultiDocumentTransactionState::kAborted);
if (_txnResourceStash) {
// Transaction resources already exist for this transaction. Transfer them from the
@@ -662,13 +648,21 @@ void Session::unstashTransactionResources(OperationContext* opCtx) {
void Session::abortArbitraryTransaction() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- _abortTransaction(lock);
+ if (_txnState == MultiDocumentTransactionState::kInProgress ||
+ _txnState == MultiDocumentTransactionState::kInSnapshotRead) {
+ _abortTransaction(lock);
+ }
}
void Session::abortActiveTransaction(OperationContext* opCtx) {
stdx::lock_guard<Client> clientLock(*opCtx->getClient());
stdx::lock_guard<stdx::mutex> lock(_mutex);
+ if (_txnState != MultiDocumentTransactionState::kInProgress &&
+ _txnState != MultiDocumentTransactionState::kInSnapshotRead) {
+ return;
+ }
+
_abortTransaction(lock);
// Abort the WUOW. We should be able to abort empty transactions that don't have WUOW.
@@ -716,25 +710,45 @@ void Session::_setActiveTxn(WithLock wl, TxnNumber txnNumber) {
void Session::addTransactionOperation(OperationContext* opCtx,
const repl::ReplOperation& operation) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill
+ // and migration, which do not check out the session.
+ _checkIsActiveTransaction(lk, *opCtx->getTxnNumber());
+ uassert(ErrorCodes::TransactionAborted,
+ str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.",
+ _txnState != MultiDocumentTransactionState::kAborted);
+
invariant(_txnState == MultiDocumentTransactionState::kInProgress);
invariant(!_autocommit && _activeTxnNumber != kUninitializedTxnNumber);
invariant(opCtx->lockState()->inAWriteUnitOfWork());
_transactionOperations.push_back(operation);
}
-std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations() {
+std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations(
+ OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill
+ // and migration, which do not check out the session.
+ _checkIsActiveTransaction(lk, *opCtx->getTxnNumber());
+ uassert(ErrorCodes::TransactionAborted,
+ str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.",
+ _txnState != MultiDocumentTransactionState::kAborted);
+
invariant(!_autocommit);
return std::move(_transactionOperations);
}
void Session::commitTransaction(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- if (opCtx->getTxnNumber() != _activeTxnNumber) {
- uasserted(ErrorCodes::TransactionAborted,
- str::stream() << "Transaction aborted. Active txnNumber is now "
- << _activeTxnNumber);
- }
+
+ // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill
+ // and migration, which do not check out the session.
+ _checkIsActiveTransaction(lk, *opCtx->getTxnNumber());
+ uassert(ErrorCodes::TransactionAborted,
+ str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.",
+ _txnState != MultiDocumentTransactionState::kAborted);
+
if (_txnState == MultiDocumentTransactionState::kCommitted)
return;
_commitTransaction(std::move(lk), opCtx);
@@ -793,7 +807,7 @@ void Session::_checkValid(WithLock) const {
void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const {
uassert(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Cannot perform retryability check for transaction " << txnNumber
+ str::stream() << "Cannot perform operations on transaction " << txnNumber
<< " on session "
<< getSessionId()
<< " because a different transaction "
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index adad7376b43..51768dc9f69 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -284,12 +284,17 @@ public:
* and marks the transaction as closed. It is illegal to attempt to add operations to the
* transaction after this is called.
*/
- std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations();
+ std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations(OperationContext* opCtx);
const std::vector<repl::ReplOperation>& transactionOperationsForTest() {
return _transactionOperations;
}
+ TxnNumber getActiveTxnNumberForTest() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ return _activeTxnNumber;
+ }
+
/**
* Scan through the list of operations and add new oplog entries for updating
* config.transactions if needed.
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index 004e554d9e0..5b03cec656c 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -124,6 +124,38 @@ protected:
stmtId,
link);
}
+
+ void bumpTxnNumberFromDifferentOpCtx(Session* session, TxnNumber newTxnNum) {
+ // Stash the original client.
+ auto originalClient = Client::releaseCurrent();
+
+ // Create a migration client and opCtx.
+ auto service = opCtx()->getServiceContext();
+ auto migrationClientOwned = service->makeClient("migrationClient");
+ auto migrationClient = migrationClientOwned.get();
+ Client::setCurrent(std::move(migrationClientOwned));
+ auto migrationOpCtx = migrationClient->makeOperationContext();
+
+ // Check that there is a transaction in progress with a lower txnNumber.
+ ASSERT(session->inMultiDocumentTransaction());
+ ASSERT_LT(session->getActiveTxnNumberForTest(), newTxnNum);
+
+ // Check that the transaction has some operations, so we can ensure they are cleared.
+ ASSERT_GT(session->transactionOperationsForTest().size(), 0u);
+
+ // Bump the active transaction number on the session. This should clear all state from the
+ // previous transaction.
+ session->beginOrContinueTxnOnMigration(migrationOpCtx.get(), newTxnNum);
+ ASSERT_EQ(session->getActiveTxnNumberForTest(), newTxnNum);
+ ASSERT_FALSE(session->inMultiDocumentTransaction());
+ ASSERT_FALSE(session->transactionIsAborted());
+ ASSERT_EQ(session->transactionOperationsForTest().size(), 0u);
+
+ // Restore the original client.
+ migrationOpCtx.reset();
+ Client::releaseCurrent();
+ Client::setCurrent(std::move(originalClient));
+ }
};
TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) {
@@ -617,6 +649,8 @@ TEST_F(SessionTest, SameTransactionPreservesStoredStatements) {
session.refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 22;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
session.beginOrContinueTxn(opCtx(), txnNum, false);
WriteUnitOfWork wuow(opCtx());
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
@@ -687,5 +721,232 @@ TEST_F(SessionTest, EmptyTransactionAbort) {
ASSERT_TRUE(session.transactionIsAborted());
}
+TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ // The transaction may be aborted without checking out the session.
+ session.abortArbitraryTransaction();
+
+ // An unstash after an abort should uassert.
+ ASSERT_THROWS_CODE(session.unstashTransactionResources(opCtx()),
+ AssertionException,
+ ErrorCodes::TransactionAborted);
+}
+
+TEST_F(SessionTest, ConcurrencyOfUnstashAndMigration) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ session.unstashTransactionResources(opCtx());
+ // The transaction machinery cannot store an empty locker.
+ { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); }
+ auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
+ session.addTransactionOperation(opCtx(), operation);
+ session.stashTransactionResources(opCtx());
+
+ // A migration may bump the active transaction number without checking out the session.
+ const TxnNumber higherTxnNum = 27;
+ bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum);
+
+ // An unstash after a migration that bumps the active transaction number should uassert.
+ ASSERT_THROWS_CODE(session.unstashTransactionResources(opCtx()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+}
+
+TEST_F(SessionTest, ConcurrencyOfStashAndAbort) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ session.unstashTransactionResources(opCtx());
+
+ // The transaction may be aborted without checking out the session.
+ session.abortArbitraryTransaction();
+
+ // A stash after an abort should be a noop.
+ session.stashTransactionResources(opCtx());
+}
+
+TEST_F(SessionTest, ConcurrencyOfStashAndMigration) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ session.unstashTransactionResources(opCtx());
+ auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
+ session.addTransactionOperation(opCtx(), operation);
+
+ // A migration may bump the active transaction number without checking out the session.
+ const TxnNumber higherTxnNum = 27;
+ bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum);
+
+ // A stash after a migration that bumps the active transaction number should uassert.
+ ASSERT_THROWS_CODE(session.stashTransactionResources(opCtx()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+}
+
+TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ session.unstashTransactionResources(opCtx());
+
+ // The transaction may be aborted without checking out the session.
+ session.abortArbitraryTransaction();
+
+ // An addTransactionOperation() after an abort should uassert.
+ auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
+ ASSERT_THROWS_CODE(session.addTransactionOperation(opCtx(), operation),
+ AssertionException,
+ ErrorCodes::TransactionAborted);
+}
+
+TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndMigration) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ session.unstashTransactionResources(opCtx());
+ auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
+ session.addTransactionOperation(opCtx(), operation);
+
+ // A migration may bump the active transaction number without checking out the session.
+ const TxnNumber higherTxnNum = 27;
+ bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum);
+
+ // An addTransactionOperation() after a migration that bumps the active transaction number
+ // should uassert.
+ ASSERT_THROWS_CODE(session.addTransactionOperation(opCtx(), operation),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+}
+
+TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ session.unstashTransactionResources(opCtx());
+
+ // The transaction may be aborted without checking out the session.
+ session.abortArbitraryTransaction();
+
+ // An endTransactionAndRetrieveOperations() after an abort should uassert.
+ ASSERT_THROWS_CODE(session.endTransactionAndRetrieveOperations(opCtx()),
+ AssertionException,
+ ErrorCodes::TransactionAborted);
+}
+
+TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ session.unstashTransactionResources(opCtx());
+ auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
+ session.addTransactionOperation(opCtx(), operation);
+
+ // A migration may bump the active transaction number without checking out the session.
+ const TxnNumber higherTxnNum = 27;
+ bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum);
+
+ // An endTransactionAndRetrieveOperations() after a migration that bumps the active transaction
+ // number should uassert.
+ ASSERT_THROWS_CODE(session.endTransactionAndRetrieveOperations(opCtx()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+}
+
+TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ session.unstashTransactionResources(opCtx());
+
+ // The transaction may be aborted without checking out the session.
+ session.abortArbitraryTransaction();
+
+ // An commitTransaction() after an abort should uassert.
+ ASSERT_THROWS_CODE(
+ session.commitTransaction(opCtx()), AssertionException, ErrorCodes::TransactionAborted);
+}
+
+TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndMigration) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const TxnNumber txnNum = 26;
+ opCtx()->setLogicalSessionId(sessionId);
+ opCtx()->setTxnNumber(txnNum);
+ session.beginOrContinueTxn(opCtx(), txnNum, false);
+
+ session.unstashTransactionResources(opCtx());
+ auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
+ session.addTransactionOperation(opCtx(), operation);
+
+ // A migration may bump the active transaction number without checking out the session.
+ const TxnNumber higherTxnNum = 27;
+ bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum);
+
+ // An commitTransaction() after a migration that bumps the active transaction number should
+ // uassert.
+ ASSERT_THROWS_CODE(session.commitTransaction(opCtx()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+}
+
} // namespace
} // namespace mongo