diff options
author | Tess Avitabile <tess.avitabile@mongodb.com> | 2018-03-26 16:00:59 -0400 |
---|---|---|
committer | Tess Avitabile <tess.avitabile@mongodb.com> | 2018-03-30 12:04:49 -0400 |
commit | fbbdb5643a3fab20a4c7875830382748a4ba75e4 (patch) | |
tree | 0885c60196b8e827d9288a330467934b3bf1fada | |
parent | 82fce7bd6f6e2838ce3ccc72474ff8b8ebe325a0 (diff) | |
download | mongo-fbbdb5643a3fab20a4c7875830382748a4ba75e4.tar.gz |
SERVER-34011 Concurrency between transaction and other threads that can abort transaction
-rw-r--r-- | jstests/core/txns/multi_statement_transaction_abort.js | 2 | ||||
-rw-r--r-- | jstests/core/txns/no_implicit_collection_creation_in_txn.js | 6 | ||||
-rw-r--r-- | jstests/multiVersion/libs/global_snapshot_reads_helpers.js | 47 | ||||
-rw-r--r-- | jstests/noPassthrough/prepare_transaction.js | 3 | ||||
-rw-r--r-- | jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js | 6 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 80 | ||||
-rw-r--r-- | src/mongo/db/session.h | 7 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 261 |
9 files changed, 352 insertions, 62 deletions
diff --git a/jstests/core/txns/multi_statement_transaction_abort.js b/jstests/core/txns/multi_statement_transaction_abort.js index 044eb2aa375..902bd8d63fc 100644 --- a/jstests/core/txns/multi_statement_transaction_abort.js +++ b/jstests/core/txns/multi_statement_transaction_abort.js @@ -94,7 +94,7 @@ writeConcern: {w: "majority"}, txnNumber: NumberLong(txnNumber) }), - ErrorCodes.NoSuchTransaction); + ErrorCodes.TransactionAborted); // Verify the documents are the same. assert.eq({_id: "insert-1"}, testColl.findOne({_id: "insert-1"})); assert.eq(null, testColl.findOne({_id: "insert-2"})); diff --git a/jstests/core/txns/no_implicit_collection_creation_in_txn.js b/jstests/core/txns/no_implicit_collection_creation_in_txn.js index 3e40369e847..a399ddf047e 100644 --- a/jstests/core/txns/no_implicit_collection_creation_in_txn.js +++ b/jstests/core/txns/no_implicit_collection_creation_in_txn.js @@ -49,7 +49,7 @@ commitTransaction: 1, txnNumber: NumberLong(txnNumber), }), - ErrorCodes.NoSuchTransaction); + ErrorCodes.TransactionAborted); assert.eq(null, testColl.findOne({_id: "doc"})); jsTest.log("Cannot implicitly create a collection in a transaction using update."); @@ -84,7 +84,7 @@ commitTransaction: 1, txnNumber: NumberLong(txnNumber), }), - ErrorCodes.NoSuchTransaction); + ErrorCodes.TransactionAborted); assert.eq(null, testColl.findOne({_id: "doc"})); // Update without upsert=true succeeds when the collection does not exist. @@ -137,7 +137,7 @@ commitTransaction: 1, txnNumber: NumberLong(txnNumber), }), - ErrorCodes.NoSuchTransaction); + ErrorCodes.TransactionAborted); assert.eq(null, testColl.findOne({_id: "doc"})); // findAndModify without upsert=true succeeds when the collection does not exist. diff --git a/jstests/multiVersion/libs/global_snapshot_reads_helpers.js b/jstests/multiVersion/libs/global_snapshot_reads_helpers.js index 7cc214b01cf..32bdfcb5129 100644 --- a/jstests/multiVersion/libs/global_snapshot_reads_helpers.js +++ b/jstests/multiVersion/libs/global_snapshot_reads_helpers.js @@ -13,7 +13,7 @@ function supportsSnapshotReadConcern() { /** * Runs the given command on the given database, asserting the command failed or succeeded - * depending on the value of expectSuccess. + * depending on the value of expectSuccess. Returns the last used txnNumber. */ function runCommandAndVerifyResponse(sessionDb, txnNumber, cmdObj, expectSuccess, expectedCode) { if (expectSuccess) { @@ -22,14 +22,18 @@ function runCommandAndVerifyResponse(sessionDb, txnNumber, cmdObj, expectSuccess // noop writes may advance the majority commit point past the given atClusterTime // resulting in a SnapshotTooOld error. Eventually the read should succeed, when all // targeted shards are at the same cluster time, so retry until it does. + // A snapshot read may also fail with TransactionAborted if it encountered a StaleEpoch + // error while it was running. assert.soon(() => { const res = sessionDb.runCommand(cmdObj); if (!res.ok) { - assert.commandFailedWithCode( - res, - ErrorCodes.SnapshotTooOld, - "expected command to fail with SnapshotTooOld, cmd: " + tojson(cmdObj)); - print("Retrying because of SnapshotTooOld error."); + assert(res.code === ErrorCodes.SnapshotTooOld || + res.code === ErrorCodes.TransactionAborted, + "expected command to fail with SnapshotTooOld or TransactionAborted, cmd: " + + tojson(cmdObj) + ", result: " + tojson(res)); + print("Retrying because of SnapshotTooOld or TransactionAborted error."); + txnNumber++; + cmdObj.txnNumber = NumberLong(txnNumber); return false; } @@ -43,6 +47,7 @@ function runCommandAndVerifyResponse(sessionDb, txnNumber, cmdObj, expectSuccess tojson(cmdObj) + ", expectedCode: " + tojson(expectedCode)); } + return txnNumber; } /** @@ -55,31 +60,33 @@ function verifyGlobalSnapshotReads(conn, expectSuccess, expectedCode) { // Unsharded collection. const unshardedDb = session.getDatabase("unshardedDb"); - runCommandAndVerifyResponse( + txnNumber = runCommandAndVerifyResponse( unshardedDb, txnNumber, - {find: "unsharded", readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber++)}, + {find: "unsharded", readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber)}, expectSuccess, expectedCode); // Sharded collection, one shard. + txnNumber++; const shardedDb = session.getDatabase("shardedDb"); - runCommandAndVerifyResponse(shardedDb, - txnNumber, - { - find: "sharded", - filter: {x: 1}, - readConcern: {level: "snapshot"}, - txnNumber: NumberLong(txnNumber++) - }, - expectSuccess, - expectedCode); + txnNumber = runCommandAndVerifyResponse(shardedDb, + txnNumber, + { + find: "sharded", + filter: {x: 1}, + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(txnNumber) + }, + expectSuccess, + expectedCode); // Sharded collection, all shards. - runCommandAndVerifyResponse( + txnNumber++; + txnNumber = runCommandAndVerifyResponse( shardedDb, txnNumber, - {find: "sharded", readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber++)}, + {find: "sharded", readConcern: {level: "snapshot"}, txnNumber: NumberLong(txnNumber)}, expectSuccess, expectedCode); } diff --git a/jstests/noPassthrough/prepare_transaction.js b/jstests/noPassthrough/prepare_transaction.js index 673a93d5749..8f100c2fd6c 100644 --- a/jstests/noPassthrough/prepare_transaction.js +++ b/jstests/noPassthrough/prepare_transaction.js @@ -55,6 +55,7 @@ // The insert should be visible in this session, but because the prepare command immediately // aborts afterwards, the transaction is rolled back and the insert is not visible. assert.eq(null, testColl.findOne(doc1)); + txnNumber++; res = sessionDB.runCommand({find: collName, filter: doc1, txnNumber: NumberLong(txnNumber)}); assert.commandWorked(res); assert.eq([], res.cursor.firstBatch); @@ -89,6 +90,7 @@ // The update should be visible in this session, but because the prepare command immediately // aborts afterwards, the transaction is rolled back and the update is not visible. + txnNumber++; res = sessionDB.runCommand({find: collName, filter: doc2, txnNumber: NumberLong(txnNumber)}); assert.commandWorked(res); assert.eq([], res.cursor.firstBatch); @@ -125,6 +127,7 @@ // The delete should be visible in this session, but because the prepare command immediately // aborts afterwards, the transaction is rolled back and the document is still visible. + txnNumber++; res = sessionDB.runCommand({find: collName, filter: doc2, txnNumber: NumberLong(txnNumber)}); assert.commandWorked(res); assert.eq([doc2], res.cursor.firstBatch); diff --git a/jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js b/jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js index aaae19766aa..02ee496faff 100644 --- a/jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js +++ b/jstests/noPassthrough/snapshot_cursor_shutdown_stepdown.js @@ -76,9 +76,9 @@ rst.waitForState(primary, ReplSetTest.State.SECONDARY); // TODO SERVER-33690: Destroying stashed transaction resources should kill the cursor, so this - // getMore should fail. - assert.commandWorked(sessionDB.runCommand( - {getMore: res.cursor.id, collection: collName, txnNumber: NumberLong(0)})); + // getMore should fail with CursorNotFound. + assert.commandFailedWithCode( + sessionDB.runCommand({getMore: res.cursor.id, collection: collName}), 50740); rst.stopSet(); })(); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 1e7f3e67db7..c3216b51715 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -840,7 +840,7 @@ void OpObserverImpl::onTransactionCommit(OperationContext* opCtx) { invariant(opCtx->getTxnNumber()); Session* const session = OperationContextSession::get(opCtx); invariant(session); - auto stmts = session->endTransactionAndRetrieveOperations(); + auto stmts = session->endTransactionAndRetrieveOperations(opCtx); // It is possible that the transaction resulted in no changes. In that case, we should // not write an empty applyOps entry. diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index f4175a5a85b..9cee77ba3f5 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -560,17 +560,10 @@ void Session::stashTransactionResources(OperationContext* opCtx) { stdx::lock_guard<Client> lk(*opCtx->getClient()); stdx::unique_lock<stdx::mutex> lg(_mutex); - if (*opCtx->getTxnNumber() != _activeTxnNumber) { - // The session is checked out, so _activeTxnNumber cannot advance due to a user operation. - // However, when a chunk is migrated, session and transaction information is copied from the - // donor shard to the recipient. This occurs outside of the check-out mechanism and can lead - // to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort - // the current transaction. Note that it would indicate a user bug to have a newer - // transaction on one shard while an older transaction is still active on another shard. - uasserted(ErrorCodes::TransactionAborted, - str::stream() << "Transaction aborted. Active txnNumber is now " - << _activeTxnNumber); - } + // Always check '_activeTxnNumber', since it can be modified by migration, which does not check + // out the session. We intentionally do not error if _txnState=kAborted, since we expect this + // function to be called at the end of the 'abortTransaction' command. + _checkIsActiveTransaction(lg, *opCtx->getTxnNumber()); if (_txnState != MultiDocumentTransactionState::kInProgress && _txnState != MultiDocumentTransactionState::kInSnapshotRead) { @@ -610,20 +603,13 @@ void Session::unstashTransactionResources(OperationContext* opCtx) { // it doesn't go away, and then lock the Session owned by that client. stdx::lock_guard<Client> lk(*opCtx->getClient()); stdx::lock_guard<stdx::mutex> lg(_mutex); - if (opCtx->getTxnNumber() < _activeTxnNumber) { - // The session is checked out, so _activeTxnNumber cannot advance due to a user - // operation. - // However, when a chunk is migrated, session and transaction information is copied from - // the donor shard to the recipient. This occurs outside of the check-out mechanism and - // can lead to a higher _activeTxnNumber during the lifetime of a checkout. If that - // occurs, we abort the current transaction. Note that it would indicate a user bug to - // have a newer transaction on one shard while an older transaction is still active on - // another shard. - uasserted(ErrorCodes::TransactionAborted, - str::stream() << "Transaction aborted. Active txnNumber is now " - << _activeTxnNumber); - return; - } + + // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session + // kill and migration, which do not check out the session. + _checkIsActiveTransaction(lg, *opCtx->getTxnNumber()); + uassert(ErrorCodes::TransactionAborted, + str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.", + _txnState != MultiDocumentTransactionState::kAborted); if (_txnResourceStash) { // Transaction resources already exist for this transaction. Transfer them from the @@ -662,13 +648,21 @@ void Session::unstashTransactionResources(OperationContext* opCtx) { void Session::abortArbitraryTransaction() { stdx::lock_guard<stdx::mutex> lock(_mutex); - _abortTransaction(lock); + if (_txnState == MultiDocumentTransactionState::kInProgress || + _txnState == MultiDocumentTransactionState::kInSnapshotRead) { + _abortTransaction(lock); + } } void Session::abortActiveTransaction(OperationContext* opCtx) { stdx::lock_guard<Client> clientLock(*opCtx->getClient()); stdx::lock_guard<stdx::mutex> lock(_mutex); + if (_txnState != MultiDocumentTransactionState::kInProgress && + _txnState != MultiDocumentTransactionState::kInSnapshotRead) { + return; + } + _abortTransaction(lock); // Abort the WUOW. We should be able to abort empty transactions that don't have WUOW. @@ -716,25 +710,45 @@ void Session::_setActiveTxn(WithLock wl, TxnNumber txnNumber) { void Session::addTransactionOperation(OperationContext* opCtx, const repl::ReplOperation& operation) { stdx::lock_guard<stdx::mutex> lk(_mutex); + + // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill + // and migration, which do not check out the session. + _checkIsActiveTransaction(lk, *opCtx->getTxnNumber()); + uassert(ErrorCodes::TransactionAborted, + str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.", + _txnState != MultiDocumentTransactionState::kAborted); + invariant(_txnState == MultiDocumentTransactionState::kInProgress); invariant(!_autocommit && _activeTxnNumber != kUninitializedTxnNumber); invariant(opCtx->lockState()->inAWriteUnitOfWork()); _transactionOperations.push_back(operation); } -std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations() { +std::vector<repl::ReplOperation> Session::endTransactionAndRetrieveOperations( + OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lk(_mutex); + + // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill + // and migration, which do not check out the session. + _checkIsActiveTransaction(lk, *opCtx->getTxnNumber()); + uassert(ErrorCodes::TransactionAborted, + str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.", + _txnState != MultiDocumentTransactionState::kAborted); + invariant(!_autocommit); return std::move(_transactionOperations); } void Session::commitTransaction(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lk(_mutex); - if (opCtx->getTxnNumber() != _activeTxnNumber) { - uasserted(ErrorCodes::TransactionAborted, - str::stream() << "Transaction aborted. Active txnNumber is now " - << _activeTxnNumber); - } + + // Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill + // and migration, which do not check out the session. + _checkIsActiveTransaction(lk, *opCtx->getTxnNumber()); + uassert(ErrorCodes::TransactionAborted, + str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.", + _txnState != MultiDocumentTransactionState::kAborted); + if (_txnState == MultiDocumentTransactionState::kCommitted) return; _commitTransaction(std::move(lk), opCtx); @@ -793,7 +807,7 @@ void Session::_checkValid(WithLock) const { void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const { uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Cannot perform retryability check for transaction " << txnNumber + str::stream() << "Cannot perform operations on transaction " << txnNumber << " on session " << getSessionId() << " because a different transaction " diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index adad7376b43..51768dc9f69 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -284,12 +284,17 @@ public: * and marks the transaction as closed. It is illegal to attempt to add operations to the * transaction after this is called. */ - std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations(); + std::vector<repl::ReplOperation> endTransactionAndRetrieveOperations(OperationContext* opCtx); const std::vector<repl::ReplOperation>& transactionOperationsForTest() { return _transactionOperations; } + TxnNumber getActiveTxnNumberForTest() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + return _activeTxnNumber; + } + /** * Scan through the list of operations and add new oplog entries for updating * config.transactions if needed. diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index 004e554d9e0..5b03cec656c 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -124,6 +124,38 @@ protected: stmtId, link); } + + void bumpTxnNumberFromDifferentOpCtx(Session* session, TxnNumber newTxnNum) { + // Stash the original client. + auto originalClient = Client::releaseCurrent(); + + // Create a migration client and opCtx. + auto service = opCtx()->getServiceContext(); + auto migrationClientOwned = service->makeClient("migrationClient"); + auto migrationClient = migrationClientOwned.get(); + Client::setCurrent(std::move(migrationClientOwned)); + auto migrationOpCtx = migrationClient->makeOperationContext(); + + // Check that there is a transaction in progress with a lower txnNumber. + ASSERT(session->inMultiDocumentTransaction()); + ASSERT_LT(session->getActiveTxnNumberForTest(), newTxnNum); + + // Check that the transaction has some operations, so we can ensure they are cleared. + ASSERT_GT(session->transactionOperationsForTest().size(), 0u); + + // Bump the active transaction number on the session. This should clear all state from the + // previous transaction. + session->beginOrContinueTxnOnMigration(migrationOpCtx.get(), newTxnNum); + ASSERT_EQ(session->getActiveTxnNumberForTest(), newTxnNum); + ASSERT_FALSE(session->inMultiDocumentTransaction()); + ASSERT_FALSE(session->transactionIsAborted()); + ASSERT_EQ(session->transactionOperationsForTest().size(), 0u); + + // Restore the original client. + migrationOpCtx.reset(); + Client::releaseCurrent(); + Client::setCurrent(std::move(originalClient)); + } }; TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) { @@ -617,6 +649,8 @@ TEST_F(SessionTest, SameTransactionPreservesStoredStatements) { session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 22; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false); WriteUnitOfWork wuow(opCtx()); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); @@ -687,5 +721,232 @@ TEST_F(SessionTest, EmptyTransactionAbort) { ASSERT_TRUE(session.transactionIsAborted()); } +TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + // The transaction may be aborted without checking out the session. + session.abortArbitraryTransaction(); + + // An unstash after an abort should uassert. + ASSERT_THROWS_CODE(session.unstashTransactionResources(opCtx()), + AssertionException, + ErrorCodes::TransactionAborted); +} + +TEST_F(SessionTest, ConcurrencyOfUnstashAndMigration) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + session.unstashTransactionResources(opCtx()); + // The transaction machinery cannot store an empty locker. + { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } + auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); + session.addTransactionOperation(opCtx(), operation); + session.stashTransactionResources(opCtx()); + + // A migration may bump the active transaction number without checking out the session. + const TxnNumber higherTxnNum = 27; + bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); + + // An unstash after a migration that bumps the active transaction number should uassert. + ASSERT_THROWS_CODE(session.unstashTransactionResources(opCtx()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); +} + +TEST_F(SessionTest, ConcurrencyOfStashAndAbort) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + session.unstashTransactionResources(opCtx()); + + // The transaction may be aborted without checking out the session. + session.abortArbitraryTransaction(); + + // A stash after an abort should be a noop. + session.stashTransactionResources(opCtx()); +} + +TEST_F(SessionTest, ConcurrencyOfStashAndMigration) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + session.unstashTransactionResources(opCtx()); + auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); + session.addTransactionOperation(opCtx(), operation); + + // A migration may bump the active transaction number without checking out the session. + const TxnNumber higherTxnNum = 27; + bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); + + // A stash after a migration that bumps the active transaction number should uassert. + ASSERT_THROWS_CODE(session.stashTransactionResources(opCtx()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); +} + +TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + session.unstashTransactionResources(opCtx()); + + // The transaction may be aborted without checking out the session. + session.abortArbitraryTransaction(); + + // An addTransactionOperation() after an abort should uassert. + auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); + ASSERT_THROWS_CODE(session.addTransactionOperation(opCtx(), operation), + AssertionException, + ErrorCodes::TransactionAborted); +} + +TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndMigration) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + session.unstashTransactionResources(opCtx()); + auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); + session.addTransactionOperation(opCtx(), operation); + + // A migration may bump the active transaction number without checking out the session. + const TxnNumber higherTxnNum = 27; + bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); + + // An addTransactionOperation() after a migration that bumps the active transaction number + // should uassert. + ASSERT_THROWS_CODE(session.addTransactionOperation(opCtx(), operation), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); +} + +TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + session.unstashTransactionResources(opCtx()); + + // The transaction may be aborted without checking out the session. + session.abortArbitraryTransaction(); + + // An endTransactionAndRetrieveOperations() after an abort should uassert. + ASSERT_THROWS_CODE(session.endTransactionAndRetrieveOperations(opCtx()), + AssertionException, + ErrorCodes::TransactionAborted); +} + +TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + session.unstashTransactionResources(opCtx()); + auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); + session.addTransactionOperation(opCtx(), operation); + + // A migration may bump the active transaction number without checking out the session. + const TxnNumber higherTxnNum = 27; + bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); + + // An endTransactionAndRetrieveOperations() after a migration that bumps the active transaction + // number should uassert. + ASSERT_THROWS_CODE(session.endTransactionAndRetrieveOperations(opCtx()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); +} + +TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + session.unstashTransactionResources(opCtx()); + + // The transaction may be aborted without checking out the session. + session.abortArbitraryTransaction(); + + // An commitTransaction() after an abort should uassert. + ASSERT_THROWS_CODE( + session.commitTransaction(opCtx()), AssertionException, ErrorCodes::TransactionAborted); +} + +TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndMigration) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const TxnNumber txnNum = 26; + opCtx()->setLogicalSessionId(sessionId); + opCtx()->setTxnNumber(txnNum); + session.beginOrContinueTxn(opCtx(), txnNum, false); + + session.unstashTransactionResources(opCtx()); + auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); + session.addTransactionOperation(opCtx(), operation); + + // A migration may bump the active transaction number without checking out the session. + const TxnNumber higherTxnNum = 27; + bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); + + // An commitTransaction() after a migration that bumps the active transaction number should + // uassert. + ASSERT_THROWS_CODE(session.commitTransaction(opCtx()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); +} + } // namespace } // namespace mongo |