diff options
author | Matthew Russotto <matthew.russotto@mongodb.com> | 2019-09-30 15:41:45 +0000 |
---|---|---|
committer | evergreen <evergreen@mongodb.com> | 2019-09-30 15:41:45 +0000 |
commit | 28f301324e1809212f6f15ba641cea77f3c8719f (patch) | |
tree | 2c93c677bc751fb4a3191df90de842fc4591fb3f | |
parent | e7a7530bc76d62424afa455dc36495b57b09cd93 (diff) | |
download | mongo-28f301324e1809212f6f15ba641cea77f3c8719f.tar.gz |
SERVER-41457 Unify the different ways the TransactionParticipant offers for aborting a transaction
(cherry picked from commit f661267bf981e3b315d7e942057ab0ac9dc90bef)
-rw-r--r-- | jstests/replsets/rollback_reconstructs_transactions_prepared_before_stable.js | 14 | ||||
-rw-r--r-- | src/mongo/db/commands/txn_cmds.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/kill_sessions_local.cpp | 74 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops_exec.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/txn_two_phase_commit_cmds.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_mongod.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_metrics_observer.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 95 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 39 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 209 | ||||
-rw-r--r-- | src/mongo/dbtests/storage_timestamp_tests.cpp | 4 |
13 files changed, 186 insertions, 305 deletions
diff --git a/jstests/replsets/rollback_reconstructs_transactions_prepared_before_stable.js b/jstests/replsets/rollback_reconstructs_transactions_prepared_before_stable.js index 474d8246f93..525f7c3ae4c 100644 --- a/jstests/replsets/rollback_reconstructs_transactions_prepared_before_stable.js +++ b/jstests/replsets/rollback_reconstructs_transactions_prepared_before_stable.js @@ -39,6 +39,12 @@ const prepareTimestamp = PrepareHelpers.prepareTransaction(session); // Fastcount reflects the insert of a prepared transaction. assert.eq(testColl.count(), 2); +// Metrics reflect one inactive prepared transaction. +let metrics = assert.commandWorked(testDB.adminCommand({serverStatus: 1})); +assert.eq(1, metrics.transactions.currentPrepared); +assert.eq(1, metrics.transactions.currentInactive); +assert.eq(1, metrics.transactions.currentOpen); + jsTestLog("Do a majority write to advance the stable timestamp past the prepareTimestamp"); // Doing a majority write after preparing the transaction ensures that the stable timestamp is // past the prepare timestamp because this write must be in the committed snapshot. @@ -65,6 +71,12 @@ assert.eq(primary.getDB('config')['transactions'].find().itcount(), 1); // at the end of rollback. assert.eq(testColl.count(), 3); +// Metrics still reflect one inactive prepared transaction. +metrics = assert.commandWorked(testDB.adminCommand({serverStatus: 1})); +assert.eq(1, metrics.transactions.currentPrepared); +assert.eq(1, metrics.transactions.currentInactive); +assert.eq(1, metrics.transactions.currentOpen); + // Make sure we cannot see the writes from the prepared transaction yet. arrayEq(testColl.find().toArray(), [{_id: 0}, {_id: 2}]); @@ -109,4 +121,4 @@ arrayEq(testColl.find().toArray(), [{_id: 0, a: 1}, {_id: 1}, {_id: 2}]); assert.eq(testColl.count(), 3); rollbackTest.stop(); -}());
\ No newline at end of file +}()); diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp index ecb756b67d6..8a0fe30e9ec 100644 --- a/src/mongo/db/commands/txn_cmds.cpp +++ b/src/mongo/db/commands/txn_cmds.cpp @@ -196,7 +196,7 @@ public: opCtx, *opCtx->getLogicalSessionId(), *opCtx->getTxnNumber()); } - txnParticipant.abortActiveTransaction(opCtx); + txnParticipant.abortTransaction(opCtx); if (MONGO_FAIL_POINT(participantReturnNetworkErrorForAbortAfterExecutingAbortLogic)) { uasserted(ErrorCodes::HostUnreachable, diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp index c44234c7568..ab141d967c2 100644 --- a/src/mongo/db/kill_sessions_local.cpp +++ b/src/mongo/db/kill_sessions_local.cpp @@ -86,17 +86,21 @@ void killSessionsAction( void killSessionsAbortUnpreparedTransactions(OperationContext* opCtx, const SessionKiller::Matcher& matcher, ErrorCodes::Error reason) { - killSessionsAction( - opCtx, - matcher, - [](const ObservableSession& session) { - auto participant = TransactionParticipant::get(session); - return participant.transactionIsOpen() && !participant.transactionIsPrepared(); - }, - [](OperationContext* opCtx, const SessionToKill& session) { - TransactionParticipant::get(session).abortTransactionIfNotPrepared(opCtx); - }, - reason); + killSessionsAction(opCtx, + matcher, + [](const ObservableSession& session) { + auto participant = TransactionParticipant::get(session); + return participant.transactionIsInProgress(); + }, + [](OperationContext* opCtx, const SessionToKill& session) { + auto participant = TransactionParticipant::get(session); + // This is the same test as in the filter, but we must check again now + // that the session is checked out. + if (participant.transactionIsInProgress()) { + participant.abortTransaction(opCtx); + } + }, + reason); } SessionKiller::Result killSessionsLocal(OperationContext* opCtx, @@ -114,22 +118,31 @@ SessionKiller::Result killSessionsLocal(OperationContext* opCtx, void killAllExpiredTransactions(OperationContext* opCtx) { SessionKiller::Matcher matcherAllSessions( KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); - killSessionsAction( - opCtx, - matcherAllSessions, - [when = opCtx->getServiceContext()->getPreciseClockSource()->now()]( - const ObservableSession& session) { - return TransactionParticipant::get(session).expiredAsOf(when); - }, - [](OperationContext* opCtx, const SessionToKill& session) { - auto txnParticipant = TransactionParticipant::get(session); - log() - << "Aborting transaction with txnNumber " << txnParticipant.getActiveTxnNumber() - << " on session " << session.getSessionId().getId() - << " because it has been running for longer than 'transactionLifetimeLimitSeconds'"; - txnParticipant.abortTransactionIfNotPrepared(opCtx); - }, - ErrorCodes::ExceededTimeLimit); + killSessionsAction(opCtx, + matcherAllSessions, + [when = opCtx->getServiceContext()->getPreciseClockSource()->now()]( + const ObservableSession& session) { + return TransactionParticipant::get(session).expiredAsOf(when); + }, + [](OperationContext* opCtx, const SessionToKill& session) { + auto txnParticipant = TransactionParticipant::get(session); + // If the transaction is aborted here, it means it was aborted after + // the filter. The most likely reason for this is that the transaction + // was active and the session kill aborted it. We still want to log + // that as aborted due to transactionLifetimeLimitSessions. + if (txnParticipant.transactionIsInProgress() || + txnParticipant.transactionIsAborted()) { + log() << "Aborting transaction with txnNumber " + << txnParticipant.getActiveTxnNumber() << " on session " + << session.getSessionId().getId() + << " because it has been running for longer than " + "'transactionLifetimeLimitSeconds'"; + if (txnParticipant.transactionIsInProgress()) { + txnParticipant.abortTransaction(opCtx); + } + } + }, + ErrorCodes::ExceededTimeLimit); } void killSessionsLocalShutdownAllTransactions(OperationContext* opCtx) { @@ -156,10 +169,13 @@ void killSessionsAbortAllPreparedTransactions(OperationContext* opCtx) { return TransactionParticipant::get(session).transactionIsPrepared(); }, [](OperationContext* opCtx, const SessionToKill& session) { + // We're holding the RSTL, so the transaction shouldn't be otherwise + // affected. + invariant(TransactionParticipant::get(session).transactionIsPrepared()); // Abort the prepared transaction and invalidate the session it is // associated with. - TransactionParticipant::get(session).abortPreparedTransactionForRollback( - opCtx); + TransactionParticipant::get(session).abortTransaction(opCtx); + TransactionParticipant::get(session).invalidate(opCtx); }); } diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index 2695ef439ce..1ecceb4caa8 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -253,7 +253,7 @@ bool handleError(OperationContext* opCtx, } // If we are in a transaction, we must fail the whole batch. out->results.emplace_back(ex.toStatus()); - txnParticipant.abortActiveTransaction(opCtx); + txnParticipant.abortTransaction(opCtx); return false; } diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index 8f40abd535b..368e8c2200a 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -222,7 +222,7 @@ Status applyAbortTransaction(OperationContext* opCtx, auto transaction = TransactionParticipant::get(opCtx); transaction.unstashTransactionResources(opCtx, "abortTransaction"); - transaction.abortActiveTransaction(opCtx); + transaction.abortTransaction(opCtx); return Status::OK(); } diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 342ead52a0a..948a39982b4 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -298,16 +298,15 @@ public: { MongoDOperationContextSession sessionTxnState(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, *opCtx->getTxnNumber(), false /* autocommit */, boost::none /* startTransaction */); - try { - txnParticipant.abortTransactionIfNotPrepared(opCtx); - } catch (const ExceptionFor<ErrorCodes::TransactionCommitted>&) { + if (txnParticipant.transactionIsCommitted()) return; + if (txnParticipant.transactionIsInProgress()) { + txnParticipant.abortTransaction(opCtx); } participantExitPrepareFuture = txnParticipant.onExitPrepare(); diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index be12c34a4f4..8b64e1a6bc8 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -373,6 +373,25 @@ void appendClusterAndOperationTime(OperationContext* opCtx, operationTime.appendAsOperationTime(commandBodyFieldsBob); } +namespace { +void _abortUnpreparedOrStashPreparedTransaction( + OperationContext* opCtx, TransactionParticipant::Participant* txnParticipant) { + const bool isPrepared = txnParticipant->transactionIsPrepared(); + try { + if (isPrepared) + txnParticipant->stashTransactionResources(opCtx); + else if (txnParticipant->transactionIsOpen()) + txnParticipant->abortTransaction(opCtx); + } catch (...) { + // It is illegal for this to throw so we catch and log this here for diagnosability. + severe() << "Caught exception during transaction " << opCtx->getTxnNumber() + << (isPrepared ? " stash " : " abort ") << opCtx->getLogicalSessionId()->toBSON() + << ": " << exceptionToStatus(); + std::terminate(); + } +} +} // namespace + void invokeWithSessionCheckedOut(OperationContext* opCtx, CommandInvocation* invocation, const OperationSessionInfoFromClient& sessionOptions, @@ -406,8 +425,11 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, // transactions on failure to unstash the transaction resources to opCtx. We don't want to // have this error guard for beginOrContinue as it can abort the transaction for any // accidental invalid statements in the transaction. - auto abortOnError = makeGuard( - [&txnParticipant, opCtx] { txnParticipant.abortTransactionIfNotPrepared(opCtx); }); + auto abortOnError = makeGuard([&txnParticipant, opCtx] { + if (txnParticipant.transactionIsInProgress()) { + txnParticipant.abortTransaction(opCtx); + } + }); txnParticipant.unstashTransactionResources(opCtx, invocation->definition()->getName()); @@ -415,8 +437,8 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, abortOnError.dismiss(); } - auto guard = makeGuard([&txnParticipant, opCtx] { - txnParticipant.abortActiveUnpreparedOrStashPreparedTransaction(opCtx); + auto guard = makeGuard([opCtx, &txnParticipant] { + _abortUnpreparedOrStashPreparedTransaction(opCtx, &txnParticipant); }); try { diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 8a7a1164d33..b7302ff724a 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -208,7 +208,7 @@ void abortInProgressTransactions(OperationContext* opCtx) { auto txnParticipant = TransactionParticipant::get(opCtx); LOG(3) << "Aborting transaction sessionId: " << txnRecord.getSessionId().toBSON() << " txnNumber " << txnRecord.getTxnNum(); - txnParticipant.abortTransactionForStepUp(opCtx); + txnParticipant.abortTransaction(opCtx); } } } // namespace diff --git a/src/mongo/db/transaction_metrics_observer.cpp b/src/mongo/db/transaction_metrics_observer.cpp index ad0ce535173..e1b36d971bd 100644 --- a/src/mongo/db/transaction_metrics_observer.cpp +++ b/src/mongo/db/transaction_metrics_observer.cpp @@ -138,18 +138,12 @@ void TransactionMetricsObserver::_onAbortActive( // Server wide transactions metrics. // serverTransactionsMetrics->decrementCurrentActive(); - - if (_singleTransactionStats.isPrepared()) { - serverTransactionsMetrics->incrementTotalPreparedThenAborted(); - serverTransactionsMetrics->decrementCurrentPrepared(); - } } void TransactionMetricsObserver::_onAbortInactive( ServerTransactionsMetrics* serverTransactionsMetrics, TickSource* tickSource, Top* top) { auto curTick = tickSource->getTicks(); invariant(!_singleTransactionStats.isActive()); - invariant(!_singleTransactionStats.isPrepared()); _onAbort(serverTransactionsMetrics, curTick, tickSource, top); // @@ -213,6 +207,11 @@ void TransactionMetricsObserver::_onAbort(ServerTransactionsMetrics* serverTrans serverTransactionsMetrics->incrementTotalAborted(); serverTransactionsMetrics->decrementCurrentOpen(); + if (_singleTransactionStats.isPrepared()) { + serverTransactionsMetrics->incrementTotalPreparedThenAborted(); + serverTransactionsMetrics->decrementCurrentPrepared(); + } + auto latency = durationCount<Microseconds>(_singleTransactionStats.getDuration(tickSource, curTick)); top->incrementGlobalTransactionLatencyStats(static_cast<uint64_t>(latency)); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index f9a0a64cc65..7fcbcaecb14 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -568,6 +568,13 @@ void TransactionParticipant::Participant::beginOrContinueTransactionUnconditiona if (o().activeTxnNumber != txnNumber) { _beginMultiDocumentTransaction(opCtx, txnNumber); } + + // Assume we need to write an abort if we abort this transaction. This method is called only + // on secondaries (in which case we never write anything) and when a new primary knows about + // an in-progress transaction. If a new primary knows about an in-progress transaction, it + // needs an abort oplog entry to be written if aborted (because the new primary could not + // have found out if there wasn't an oplog entry for the new primary). + p().needToWriteAbortEntry = true; } SharedSemiFuture<void> TransactionParticipant::Participant::onExitPrepare() const { @@ -1002,7 +1009,15 @@ void TransactionParticipant::Participant::unstashTransactionResources(OperationC // Global intent lock before starting a transaction. We pessimistically acquire an intent // exclusive lock here because we might be doing writes in this transaction, and it is currently // not deadlock-safe to upgrade IS to IX. - Lock::GlobalLock globalLock(opCtx, MODE_IX); + boost::optional<Lock::GlobalLock> globalLock; + // If the global lock acquisition fails, we must release the write unit of work to avoid an + // invariant during _cleanUpTxnResourceOnOpCtx. + try { + globalLock.emplace(opCtx, MODE_IX); + } catch (const DBException&) { + opCtx->setWriteUnitOfWork(nullptr); + throw; + } // This begins the storage transaction and so we do it after acquiring the global lock. _setReadSnapshot(opCtx, repl::ReadConcernArgs::get(opCtx)); @@ -1054,11 +1069,11 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( try { // This shouldn't cause deadlocks with other prepared txns, because the acquisition - // of RSTL lock inside abortActiveTransaction will be no-op since we already have it. + // of RSTL lock inside abortTransaction will be no-op since we already have it. // This abortGuard gets dismissed before we release the RSTL while transitioning to // prepared. UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - abortActiveTransaction(opCtx); + abortTransaction(opCtx); } catch (...) { // It is illegal for aborting a prepared transaction to fail for any reason, so we crash // instead. @@ -1146,6 +1161,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( } opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.getTimestamp()); opCtx->getWriteUnitOfWork()->prepare(); + p().needToWriteAbortEntry = true; opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare( opCtx, reservedSlots, completedTransactionOperations); @@ -1456,26 +1472,22 @@ void TransactionParticipant::Participant::shutdown(OperationContext* opCtx) { o(lock).txnResourceStash = boost::none; } -void TransactionParticipant::Participant::abortTransactionIfNotPrepared(OperationContext* opCtx) { - if (!o().txnState.isInProgress()) { - // We do not want to abort transactions that are prepared unless we get an - // 'abortTransaction' command. - return; - } - - _abortTransactionOnSession(opCtx); -} - bool TransactionParticipant::Observer::expiredAsOf(Date_t when) const { return o().txnState.isInProgress() && o().transactionExpireDate && o().transactionExpireDate < when; } -void TransactionParticipant::Participant::abortActiveTransaction(OperationContext* opCtx) { - if (o().txnState.isPrepared()) { +void TransactionParticipant::Participant::abortTransaction(OperationContext* opCtx) { + // Normally, absence of a transaction resource stash indicates an inactive transaction. + // However, in the case of a failed "unstash", an active transaction may exist without a stash + // and be killed externally. In that case, the opCtx will not have a transaction number. + if (o().txnResourceStash || !opCtx->getTxnNumber()) { + // Aborting an inactive transaction. + _abortTransactionOnSession(opCtx); + } else if (o().txnState.isPrepared()) { _abortActivePreparedTransaction(opCtx); } else { - _abortActiveTransaction(opCtx, TransactionState::kInProgress, false); + _abortActiveTransaction(opCtx, TransactionState::kInProgress); } } @@ -1498,38 +1510,11 @@ void TransactionParticipant::Participant::_abortActivePreparedTransaction(Operat replCoord->canAcceptWritesForDatabase(opCtx, "admin")); } - _abortActiveTransaction(opCtx, TransactionState::kPrepared, true); -} - -void TransactionParticipant::Participant::abortActiveUnpreparedOrStashPreparedTransaction( - OperationContext* opCtx) try { - if (o().txnState.isInSet(TransactionState::kNone | TransactionState::kCommitted | - TransactionState::kExecutedRetryableWrite)) { - // If there is no active transaction, do nothing. - return; - } - - // Stash the transaction if it's in prepared state. - if (o().txnState.isInSet(TransactionState::kPrepared)) { - _stashActiveTransaction(opCtx); - return; - } - - _abortActiveTransaction(opCtx, TransactionState::kInProgress, false /* writeOplog */); -} catch (...) { - // It is illegal for this to throw so we catch and log this here for diagnosability. - severe() << "Caught exception during transaction " << opCtx->getTxnNumber() - << " abort or stash on " << _sessionId().toBSON() << " in state " << o().txnState - << ": " << exceptionToStatus(); - std::terminate(); -} - -void TransactionParticipant::Participant::abortTransactionForStepUp(OperationContext* opCtx) { - _abortActiveTransaction(opCtx, TransactionState::kInProgress, true); + _abortActiveTransaction(opCtx, TransactionState::kPrepared); } void TransactionParticipant::Participant::_abortActiveTransaction( - OperationContext* opCtx, TransactionState::StateSet expectedStates, bool writeOplog) { + OperationContext* opCtx, TransactionState::StateSet expectedStates) { invariant(!o().txnResourceStash); if (!o().txnState.isInRetryableWriteMode()) { @@ -1543,7 +1528,7 @@ void TransactionParticipant::Participant::_abortActiveTransaction( // OpObserver. boost::optional<OplogSlotReserver> oplogSlotReserver; boost::optional<OplogSlot> abortOplogSlot; - if (opCtx->writesAreReplicated() && writeOplog) { + if (opCtx->writesAreReplicated() && p().needToWriteAbortEntry) { oplogSlotReserver.emplace(opCtx); abortOplogSlot = oplogSlotReserver->getLastSlot(); } @@ -2069,6 +2054,7 @@ void TransactionParticipant::Participant::_resetTransactionState( o(wl).recoveryPrepareOpTime = repl::OpTime(); p().multikeyPathInfo.clear(); p().autoCommit = boost::none; + p().needToWriteAbortEntry = false; // Release any locks held by this participant and abort the storage transaction. o(wl).txnResourceStash = boost::none; @@ -2088,23 +2074,6 @@ void TransactionParticipant::Participant::invalidate(OperationContext* opCtx) { _resetTransactionState(lg, TransactionState::kNone); } -void TransactionParticipant::Participant::abortPreparedTransactionForRollback( - OperationContext* opCtx) { - uassert(51030, - str::stream() << "Cannot call abortPreparedTransactionForRollback on unprepared " - << "transaction.", - o().txnState.isPrepared()); - - stdx::lock_guard<Client> lg(*opCtx->getClient()); - - // It should be safe to clear transactionOperationBytes and transactionOperations because - // we only modify these variables when adding an operation to a transaction. Since this - // transaction is already prepared, we cannot add more operations to it. We will have this - // in the prepare oplog entry. - _invalidate(lg); - _resetTransactionState(lg, TransactionState::kNone); -} - boost::optional<repl::OplogEntry> TransactionParticipant::Participant::checkStatementExecuted( OperationContext* opCtx, StmtId stmtId) const { const auto stmtTimestamp = _checkStatementExecuted(stmtId); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index a569241e3cf..ac85e6f8c56 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -476,34 +476,10 @@ public: Timestamp commitTimestamp, boost::optional<repl::OpTime> commitOplogEntryOpTime); - /** - * Aborts the transaction, if it is not in the "prepared" state. - */ - void abortTransactionIfNotPrepared(OperationContext* opCtx); - /* * Aborts the transaction, releasing transaction resources. */ - void abortActiveTransaction(OperationContext* opCtx); - - /* - * If the transaction is prepared, stash its resources. If not, it's the same as - * abortActiveTransaction. - */ - void abortActiveUnpreparedOrStashPreparedTransaction(OperationContext* opCtx); - - /** - * Abort the transaction and write an abort oplog entry unconditionally. - */ - void abortTransactionForStepUp(OperationContext* opCtx); - - /** - * Aborts the storage transaction of the prepared transaction on this participant by - * releasing its resources. Also invalidates the session and the current transaction state. - * Avoids writing any oplog entries or making any changes to the transaction table since the - * state for prepared transactions will be re-constituted during replication recovery. - */ - void abortPreparedTransactionForRollback(OperationContext* opCtx); + void abortTransaction(OperationContext* opCtx); /** * Adds a stored operation to the list of stored operations for the current multi-document @@ -695,11 +671,10 @@ public: void _stashActiveTransaction(OperationContext* opCtx); // Abort the transaction if it's in one of the expected states and clean up the transaction - // states associated with the opCtx. - // If 'writeOplog' is true, logs an 'abortTransaction' oplog entry if writes are replicated. + // states associated with the opCtx. Write an abort oplog entry if specified by the + // needToWriteAbortEntry state bool. void _abortActiveTransaction(OperationContext* opCtx, - TransactionState::StateSet expectedStates, - bool writeOplog); + TransactionState::StateSet expectedStates); // Aborts a prepared transaction. void _abortActivePreparedTransaction(OperationContext* opCtx); @@ -953,6 +928,12 @@ private: // opTime. Used for fast retryability check and retrieving the previous write's data without // having to scan through the oplog. CommittedStatementTimestampMap activeTxnCommittedStatements; + + // Set to true if we need to write an "abort" oplog entry in the case of an abort. This + // is the case when we have (or may have) written or replicated an oplog entry for the + // transaction. + bool needToWriteAbortEntry{false}; + } _p; }; diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index a186012cc2b..5fe79784bbf 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -499,7 +499,7 @@ TEST_F(TxnParticipantTest, AbortClearsStoredStatements) { // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } txnParticipant.stashTransactionResources(opCtx()); - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_TRUE(txnParticipant.getTransactionOperationsForTest().empty()); ASSERT_TRUE(txnParticipant.transactionIsAborted()); } @@ -713,7 +713,7 @@ TEST_F(TxnParticipantTest, EmptyTransactionAbort) { // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } txnParticipant.stashTransactionResources(opCtx()); - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_TRUE(txnParticipant.transactionIsAborted()); } @@ -727,37 +727,10 @@ TEST_F(TxnParticipantTest, EmptyPreparedTransactionAbort) { // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } txnParticipant.prepareTransaction(opCtx(), {}); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_TRUE(txnParticipant.transactionIsAborted()); } -TEST_F(TxnParticipantTest, KillSessionsDuringPrepareDoesNotAbortTransaction) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); - - auto ruPrepareTimestamp = Timestamp(); - auto originalFn = _opObserver->onTransactionPrepareFn; - _opObserver->onTransactionPrepareFn = [&]() { - originalFn(); - - ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp(); - ASSERT_FALSE(ruPrepareTimestamp.isNull()); - - // The transaction may be aborted without checking out the txnParticipant. - txnParticipant.abortTransactionIfNotPrepared(opCtx()); - ASSERT_FALSE(txnParticipant.transactionIsAborted()); - }; - - // Check that prepareTimestamp gets set. - auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {}); - ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); - - ASSERT(_opObserver->transactionPrepared); - ASSERT_FALSE(txnParticipant.transactionIsAborted()); -} - TEST_F(TxnParticipantTest, KillOpBeforeCommittingPreparedTransaction) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -809,7 +782,7 @@ TEST_F(TxnParticipantTest, KillOpBeforeAbortingPreparedTransaction) { opCtx()->markKilled(ErrorCodes::Interrupted); try { // The abort should throw, since the operation was killed. - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); } catch (const DBException& ex) { ASSERT_EQ(ErrorCodes::Interrupted, ex.code()); } @@ -915,7 +888,7 @@ TEST_F(TxnParticipantTest, StepDownAfterPrepareDoesNotBlock) { }; runFunctionFromDifferentOpCtx(func); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT(_opObserver->transactionAborted); ASSERT(txnParticipant.transactionIsAborted()); } @@ -947,7 +920,7 @@ TEST_F(TxnParticipantTest, StepDownDuringAbortSucceeds) { ASSERT_OK(repl::ReplicationCoordinator::get(opCtx())->setFollowerMode( repl::MemberState::RS_SECONDARY)); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT(_opObserver->transactionAborted); ASSERT(txnParticipant.transactionIsAborted()); } @@ -962,7 +935,7 @@ TEST_F(TxnParticipantTest, StepDownDuringPreparedAbortFails) { ASSERT_OK(repl::ReplicationCoordinator::get(opCtx())->setFollowerMode( repl::MemberState::RS_SECONDARY)); ASSERT_THROWS_CODE( - txnParticipant.abortActiveTransaction(opCtx()), AssertionException, ErrorCodes::NotMaster); + txnParticipant.abortTransaction(opCtx()), AssertionException, ErrorCodes::NotMaster); } TEST_F(TxnParticipantTest, StepDownDuringPreparedCommitFails) { @@ -1018,7 +991,7 @@ TEST_F(TxnParticipantTest, StepDownDuringPreparedAbortReleasesRSTL) { ASSERT_OK(repl::ReplicationCoordinator::get(opCtx())->setFollowerMode( repl::MemberState::RS_SECONDARY)); ASSERT_THROWS_CODE( - txnParticipant.abortActiveTransaction(opCtx()), AssertionException, ErrorCodes::NotMaster); + txnParticipant.abortTransaction(opCtx()), AssertionException, ErrorCodes::NotMaster); ASSERT_EQ(opCtx()->lockState()->getLockMode(resourceIdReplicationStateTransitionLock), MODE_NONE); @@ -1101,7 +1074,7 @@ TEST_F(TxnParticipantTest, ThrowDuringUnpreparedCommitLetsTheAbortAtEntryPointTo ASSERT_FALSE(txnParticipant.transactionIsCommitted()); // Simulate the abort at entry point. - txnParticipant.abortActiveUnpreparedOrStashPreparedTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_TRUE(txnParticipant.transactionIsAborted()); } @@ -1119,56 +1092,6 @@ TEST_F(TxnParticipantTest, ContinuingATransactionWithNoResourcesAborts) { ErrorCodes::NoSuchTransaction); } -TEST_F(TxnParticipantTest, KillSessionsDoesNotAbortPreparedTransactions) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant.unstashTransactionResources(opCtx(), "insert"); - - auto ruPrepareTimestamp = Timestamp(); - auto originalFn = _opObserver->onTransactionPrepareFn; - _opObserver->onTransactionPrepareFn = [&]() { - originalFn(); - ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp(); - ASSERT_FALSE(ruPrepareTimestamp.isNull()); - }; - - // Check that prepareTimestamp gets set. - auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {}); - ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); - - txnParticipant.stashTransactionResources(opCtx()); - - txnParticipant.abortTransactionIfNotPrepared(opCtx()); - ASSERT_FALSE(txnParticipant.transactionIsAborted()); - ASSERT(_opObserver->transactionPrepared); -} - -TEST_F(TxnParticipantTest, CannotAbortArbitraryPreparedTransactions) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant.unstashTransactionResources(opCtx(), "insert"); - - auto ruPrepareTimestamp = Timestamp(); - auto originalFn = _opObserver->onTransactionPrepareFn; - _opObserver->onTransactionPrepareFn = [&]() { - originalFn(); - ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp(); - ASSERT_FALSE(ruPrepareTimestamp.isNull()); - }; - - // Check that prepareTimestamp gets set. - auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {}); - ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); - - txnParticipant.stashTransactionResources(opCtx()); - - txnParticipant.abortTransactionIfNotPrepared(opCtx()); - ASSERT(!txnParticipant.transactionIsAborted()); - ASSERT(_opObserver->transactionPrepared); -} - TEST_F(TxnParticipantTest, CannotStartNewTransactionIfNotPrimary) { ASSERT_OK(repl::ReplicationCoordinator::get(opCtx())->setFollowerMode( repl::MemberState::RS_SECONDARY)); @@ -1277,23 +1200,6 @@ TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) { ASSERT(_opObserver->transactionPrepared); } -TEST_F(TxnParticipantTest, ImplicitAbortDoesNotAbortPreparedTransaction) { - auto outerScopedSession = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant.unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, _uuid, BSON("TestValue" << 0)); - txnParticipant.addTransactionOperation(opCtx(), operation); - - txnParticipant.prepareTransaction(opCtx(), {}); - - // The next command throws an exception and wants to abort the transaction. - // This is a no-op. - txnParticipant.abortActiveUnpreparedOrStashPreparedTransaction(opCtx()); - ASSERT_FALSE(txnParticipant.transactionIsAborted()); - ASSERT_TRUE(_opObserver->transactionPrepared); -} - TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -1397,7 +1303,7 @@ protected: auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT(txnParticipant.transactionIsOpen()); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); txnParticipant.beginOrContinue( @@ -1453,7 +1359,7 @@ protected: txnParticipant.prepareTransaction(opCtx(), {}); ASSERT(txnParticipant.transactionIsPrepared()); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); startTransaction = true; @@ -1567,9 +1473,8 @@ TEST_F(TxnParticipantTest, ThrowDuringUnpreparedOnTransactionAbort) { _opObserver->onTransactionAbortThrowsException = true; - ASSERT_THROWS_CODE(txnParticipant.abortActiveTransaction(opCtx()), - AssertionException, - ErrorCodes::OperationFailed); + ASSERT_THROWS_CODE( + txnParticipant.abortTransaction(opCtx()), AssertionException, ErrorCodes::OperationFailed); } TEST_F(TxnParticipantTest, ThrowDuringPreparedOnTransactionAbortIsFatal) { @@ -1580,9 +1485,8 @@ TEST_F(TxnParticipantTest, ThrowDuringPreparedOnTransactionAbortIsFatal) { _opObserver->onTransactionAbortThrowsException = true; - ASSERT_THROWS_CODE(txnParticipant.abortActiveTransaction(opCtx()), - AssertionException, - ErrorCodes::OperationFailed); + ASSERT_THROWS_CODE( + txnParticipant.abortTransaction(opCtx()), AssertionException, ErrorCodes::OperationFailed); } TEST_F(TxnParticipantTest, InterruptedSessionsCannotBePrepared) { @@ -1647,7 +1551,7 @@ TEST_F(TxnParticipantTest, ReacquireLocksForPreparedTransactionsOnStepUp) { auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT(txnParticipant.getTxnResourceStashLockerForTest()->isLocked()); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); } } @@ -1742,7 +1646,7 @@ TEST_F(TransactionsMetricsTest, IncrementTotalAbortedUponAbort) { unsigned long long beforeAbortCount = ServerTransactionsMetrics::get(opCtx())->getTotalAborted(); - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); // Assert that the aborted counter is incremented by 1. ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalAborted(), beforeAbortCount + 1U); @@ -1757,7 +1661,7 @@ TEST_F(TransactionsMetricsTest, IncrementTotalPreparedThenAborted) { txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); txnParticipant.prepareTransaction(opCtx(), {}); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalPreparedThenAborted(), beforePreparedThenAbortedCount + 1U); @@ -1791,7 +1695,7 @@ TEST_F(TransactionsMetricsTest, IncrementCurrentPreparedWithAbort) { ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentPrepared(), beforeCurrentPrepared + 1U); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentPrepared(), beforeCurrentPrepared); } @@ -1824,7 +1728,8 @@ TEST_F(TransactionsMetricsTest, NoPreparedMetricsChangesAfterExceptionInPrepare) ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalPreparedThenAborted(), beforeTotalPreparedThenAborted); - txnParticipant.abortActiveTransaction(opCtx()); + if (txnParticipant.transactionIsOpen()) + txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentPrepared(), beforeCurrentPrepared); @@ -1853,7 +1758,7 @@ TEST_F(TransactionsMetricsTest, TrackTotalOpenTransactionsWithAbort) { beforeTransactionStart + 1U); // Tests that aborting a transaction decrements the open transactions counter by 1. - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentOpen(), beforeTransactionStart); } @@ -1949,7 +1854,7 @@ TEST_F(TransactionsMetricsTest, TrackTotalActiveAndInactiveTransactionsWithStash beforeInactiveCounter + 1U); // Tests that aborting a stashed transaction decrements the inactive counter only. - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActiveCounter); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter); } @@ -1974,7 +1879,7 @@ TEST_F(TransactionsMetricsTest, TrackTotalActiveAndInactiveTransactionsWithUnsta ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter); // Tests that aborting a stashed transaction decrements the active counter only. - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActiveCounter); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter); } @@ -2059,7 +1964,7 @@ TEST_F(TransactionsMetricsTest, beforeActivePreparedCounter + 1U); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactivePreparedCounter); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActivePreparedCounter); @@ -2154,7 +2059,7 @@ TEST_F(TransactionsMetricsTest, SingleTransactionStatsDurationShouldBeSetUponAbo // Advance the clock. tickSource->advance(Microseconds(100)); - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getDuration(tickSource, tickSource->getTicks()), Microseconds(100)); @@ -2174,7 +2079,7 @@ TEST_F(TransactionsMetricsTest, SingleTransactionStatsPreparedDurationShouldBeSe txnParticipant.prepareTransaction(opCtx(), {}); tickSource->advance(Microseconds(100)); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getPreparedDuration( tickSource, tickSource->getTicks()), Microseconds(100)); @@ -2265,7 +2170,7 @@ TEST_F(TransactionsMetricsTest, SingleTransactionStatsDurationShouldKeepIncreasi tickSource->advance(Microseconds(100)); // Abort the transaction and check duration. - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getDuration(tickSource, tickSource->getTicks()), Microseconds(200)); @@ -2299,7 +2204,7 @@ TEST_F(TransactionsMetricsTest, tickSource->advance(Microseconds(100)); // Abort the prepared transaction and check the prepared duration. - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getPreparedDuration( tickSource, tickSource->getTicks()), Microseconds(200)); @@ -2372,7 +2277,7 @@ TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldBeSetUponUnstashAndAbort) txnParticipant.unstashTransactionResources(opCtx(), "insert"); tickSource->advance(Microseconds(100)); - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); // Time active should have increased. ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getTimeActiveMicros( @@ -2401,7 +2306,7 @@ TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldNotBeSetUponAbortOnly) { // Advance clock during inactive period. tickSource->advance(Microseconds(100)); - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); // Time active should still be zero. ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getTimeActiveMicros( @@ -2594,7 +2499,7 @@ TEST_F(TransactionsMetricsTest, AdditiveMetricsObjectsShouldBeAddedTogetherUponA txnParticipant.unstashTransactionResources(opCtx(), "insert"); // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.getSingleTransactionStatsForTest().getOpDebug()->additiveMetrics.equals( additiveMetricsToCompare)); @@ -2658,7 +2563,7 @@ TEST_F(TransactionsMetricsTest, TimeInactiveMicrosShouldBeSetUponUnstashAndAbort Microseconds{100}); txnParticipant.unstashTransactionResources(opCtx(), "insert"); - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getTimeInactiveMicros( tickSource, tickSource->getTicks()), @@ -2991,7 +2896,7 @@ TEST_F(TransactionsMetricsTest, LastClientInfoShouldUpdateUponAbort) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); // LastClientInfo should have been set. auto lastClientInfo = txnParticipant.getSingleTransactionStatsForTest().getLastClientInfo(); @@ -3261,7 +3166,7 @@ TEST_F(TransactionsMetricsTest, TestTransactionInfoForLogAfterAbort) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); const auto lockerInfo = opCtx()->lockState()->getLockerInfo(boost::none); ASSERT(lockerInfo); @@ -3305,7 +3210,7 @@ TEST_F(TransactionsMetricsTest, TestPreparedTransactionInfoForLogAfterAbort) { txnParticipant.prepareTransaction(opCtx(), {}); tickSource->advance(Microseconds(10)); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); const auto lockerInfo = opCtx()->lockState()->getLockerInfo(boost::none); ASSERT(lockerInfo); @@ -3448,7 +3353,7 @@ TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowAbort) { tickSource->advance(Microseconds(11 * 1000)); startCapturingLogMessages(); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); stopCapturingLogMessages(); const auto lockerInfo = opCtx()->lockState()->getLockerInfo(boost::none); @@ -3493,7 +3398,7 @@ TEST_F(TransactionsMetricsTest, LogPreparedTransactionInfoAfterSlowAbort) { auto prepareOpTime = txnParticipant.getPrepareOpTime(); startCapturingLogMessages(); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); stopCapturingLogMessages(); const auto lockerInfo = opCtx()->lockState()->getLockerInfo(boost::none); @@ -3591,7 +3496,7 @@ TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowStashedAbort) { tickSource->advance(Microseconds(11 * 1000)); startCapturingLogMessages(); - txnParticipant.abortTransactionIfNotPrepared(opCtx()); + txnParticipant.abortTransaction(opCtx()); stopCapturingLogMessages(); std::string expectedTransactionInfo = "transaction parameters"; @@ -3676,9 +3581,10 @@ TEST_F(TxnParticipantTest, RollbackResetsInMemoryStateOfPreparedTransaction) { ASSERT_EQ(txnParticipant.getPrepareOpTime().getTimestamp(), prepareTimestamp); ASSERT_NE(txnParticipant.getActiveTxnNumber(), kUninitializedTxnNumber); - txnParticipant.abortPreparedTransactionForRollback(opCtx()); + txnParticipant.abortTransaction(opCtx()); + txnParticipant.invalidate(opCtx()); - // After calling abortPreparedTransactionForRollback, the state of txnParticipant should be + // After calling abortTransaction and invalidate, the state of txnParticipant should be // invalidated. ASSERT_FALSE(txnParticipant.transactionIsPrepared()); ASSERT_EQ(txnParticipant.getTransactionOperationsForTest().size(), 0U); @@ -3791,7 +3697,7 @@ TEST_F(TxnParticipantTest, AbortTransactionOnSessionCheckoutWithoutRefresh) { ASSERT_EQ(txnParticipant.getActiveTxnNumber(), txnNumber); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_TRUE(txnParticipant.transactionIsAborted()); } @@ -3861,7 +3767,7 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyFalseIfAborted) { txnParticipant.unstashTransactionResources(opCtx(), "find"); ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly()); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); } @@ -3967,7 +3873,7 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnAbortAfterPrepare) { const auto exitPrepareFuture = txnParticipant.onExitPrepare(); ASSERT_FALSE(exitPrepareFuture.isReady()); - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); ASSERT_TRUE(exitPrepareFuture.isReady()); // Once the promise has been fulfilled, new callers of onExitPrepare should immediately be @@ -3975,7 +3881,7 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnAbortAfterPrepare) { ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); // abortTransaction is retryable, but does not cause the completion promise to be set again. - txnParticipant.abortActiveTransaction(opCtx()); + txnParticipant.abortTransaction(opCtx()); } TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnCommitAfterPrepare) { @@ -4004,28 +3910,5 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnCommitAfterPrepare) { ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); } -TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnAbortPreparedTransactionForRollback) { - MongoDOperationContextSession opCtxSession(opCtx()); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */); - ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); - - txnParticipant.unstashTransactionResources(opCtx(), "find"); - ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); - - const auto prepareOpTime = repl::OpTime({3, 2}, 0); - txnParticipant.prepareTransaction(opCtx(), prepareOpTime); - const auto exitPrepareFuture = txnParticipant.onExitPrepare(); - ASSERT_FALSE(exitPrepareFuture.isReady()); - - txnParticipant.abortPreparedTransactionForRollback(opCtx()); - ASSERT_TRUE(exitPrepareFuture.isReady()); - - // Once the promise has been fulfilled, new callers of onExitPrepare should immediately be - // ready. - ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); -} } // namespace } // namespace mongo diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index 1672d0ac2fe..e20ace4c166 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -3142,7 +3142,7 @@ public: txnParticipant.unstashTransactionResources(_opCtx, "abortTransaction"); - txnParticipant.abortActiveTransaction(_opCtx); + txnParticipant.abortTransaction(_opCtx); txnParticipant.stashTransactionResources(_opCtx); { @@ -3364,7 +3364,7 @@ public: } txnParticipant.unstashTransactionResources(_opCtx, "abortTransaction"); - txnParticipant.abortActiveTransaction(_opCtx); + txnParticipant.abortTransaction(_opCtx); assertNoStartOpTime(); txnParticipant.stashTransactionResources(_opCtx); |