diff options
author | Gregory Noma <gregory.noma@gmail.com> | 2020-08-25 16:07:00 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2020-12-13 05:05:01 +0000 |
commit | 6637619fd08517e1dc2e85a0b9ee4cc6e0b04ed9 (patch) | |
tree | 48a2340a0bd60ad8c71191f828fbf6bbfa9186ff | |
parent | 68bf17aa3b19d0b7f53b7a1b6fe1ebbafdf558d2 (diff) | |
download | mongo-6637619fd08517e1dc2e85a0b9ee4cc6e0b04ed9.tar.gz |
SERVER-50365 Use short WT transaction rollback timeout in the multi-document transaction expirer thread
-rw-r--r-- | src/mongo/db/kill_sessions_local.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 132 | ||||
-rw-r--r-- | src/mongo/db/session.h | 13 | ||||
-rw-r--r-- | src/mongo/db/session_test.cpp | 34 | ||||
-rw-r--r-- | src/mongo/db/storage/recovery_unit.h | 10 | ||||
-rw-r--r-- | src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp | 12 |
7 files changed, 124 insertions, 86 deletions
diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp index 1cf4156f438..0b12d6609a1 100644 --- a/src/mongo/db/kill_sessions_local.cpp +++ b/src/mongo/db/kill_sessions_local.cpp @@ -56,7 +56,7 @@ void killSessionsLocalKillTransactions(OperationContext* opCtx, const SessionKiller::Matcher& matcher) { SessionCatalog::get(opCtx)->scanSessions( opCtx, matcher, [](OperationContext* opCtx, Session* session) { - session->abortArbitraryTransaction(); + session->abortArbitraryTransaction(opCtx); }); } @@ -75,7 +75,7 @@ void killAllExpiredTransactions(OperationContext* opCtx) { SessionCatalog::get(opCtx)->scanSessions( opCtx, matcherAllSessions, [](OperationContext* opCtx, Session* session) { try { - session->abortArbitraryTransactionIfExpired(); + session->abortArbitraryTransactionIfExpired(opCtx); } catch (const DBException& ex) { Status status = ex.toStatus(); std::string errmsg = str::stream() diff --git a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp index 5eab0381527..baaca921550 100644 --- a/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp +++ b/src/mongo/db/periodic_runner_job_abort_expired_transactions.cpp @@ -127,6 +127,11 @@ void PeriodicThreadToAbortExpiredTransactions::_init(ServiceContext* serviceCont // behind an active transaction's intent lock. opCtx->lockState()->setMaxLockTimeout(Milliseconds(0)); + // This thread needs storage rollback to complete timely, so + // instruct the storage engine to not do any extra eviction + // for this thread, if supported. + opCtx->recoveryUnit()->setNoEvictionAfterRollback(); + killAllExpiredTransactions(opCtx.get()); }, jobPeriodMillis); diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index c087ae93ce6..69c5f76342a 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -339,7 +339,7 @@ void Session::beginOrContinueTxn(OperationContext* opCtx, invariant(!opCtx->lockState()->isLocked()); stdx::lock_guard<stdx::mutex> lg(_mutex); - _beginOrContinueTxn(lg, txnNumber, autocommit, startTransaction); + _beginOrContinueTxn(lg, opCtx, txnNumber, autocommit, startTransaction); } void Session::beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber) { @@ -347,7 +347,7 @@ void Session::beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber t invariant(!opCtx->lockState()->isLocked()); stdx::lock_guard<stdx::mutex> lg(_mutex); - _beginOrContinueTxnOnMigration(lg, txnNumber); + _beginOrContinueTxnOnMigration(lg, opCtx, txnNumber); } void Session::_setSpeculativeTransactionOpTime(WithLock, @@ -499,6 +499,7 @@ bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtI } void Session::_beginOrContinueTxn(WithLock wl, + OperationContext* opCtx, TxnNumber txnNumber, boost::optional<bool> autocommit, boost::optional<bool> startTransaction) { @@ -549,7 +550,7 @@ void Session::_beginOrContinueTxn(WithLock wl, ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementCurrentActive(); ServerTransactionsMetrics::get(getGlobalServiceContext()) ->decrementCurrentInactive(); - _abortTransaction(wl); + _abortTransaction(wl, opCtx); uasserted(ErrorCodes::NoSuchTransaction, str::stream() << "Transaction " << txnNumber << " has been aborted because an earlier command in this " @@ -585,7 +586,7 @@ void Session::_beginOrContinueTxn(WithLock wl, serverGlobalParams.featureCompatibility.getVersion() == ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo40)); - _setActiveTxn(wl, txnNumber); + _setActiveTxn(wl, opCtx, txnNumber); _autocommit = false; _txnState = MultiDocumentTransactionState::kInProgress; @@ -617,7 +618,7 @@ void Session::_beginOrContinueTxn(WithLock wl, } else { // Execute a retryable write. invariant(startTransaction == boost::none); - _setActiveTxn(wl, txnNumber); + _setActiveTxn(wl, opCtx, txnNumber); _autocommit = true; _txnState = MultiDocumentTransactionState::kNone; } @@ -718,6 +719,10 @@ void Session::TxnResources::release(OperationContext* opCtx) { readConcernArgs = _readConcernArgs; } +void Session::TxnResources::setNoEvictionAfterRollback() { + _recoveryUnit->setNoEvictionAfterRollback(); +} + void Session::stashTransactionResources(OperationContext* opCtx) { if (opCtx->getClient()->isInDirectClient()) { return; @@ -898,17 +903,17 @@ void Session::unstashTransactionResources(OperationContext* opCtx, const std::st } } -void Session::abortArbitraryTransaction() { +void Session::abortArbitraryTransaction(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lock(_mutex); if (_txnState != MultiDocumentTransactionState::kInProgress) { return; } - _abortTransaction(lock); + _abortTransaction(lock, opCtx); } -void Session::abortArbitraryTransactionIfExpired() { +void Session::abortArbitraryTransactionIfExpired(OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lock(_mutex); if (_txnState != MultiDocumentTransactionState::kInProgress || !_transactionExpireDate || _transactionExpireDate >= Date_t::now()) { @@ -928,7 +933,7 @@ void Session::abortArbitraryTransactionIfExpired() { << _sessionId.getId() << " because it has been running for longer than 'transactionLifetimeLimitSeconds'"; - _abortTransaction(lock); + _abortTransaction(lock, opCtx); } void Session::abortActiveTransaction(OperationContext* opCtx) { @@ -951,7 +956,7 @@ void Session::abortActiveTransaction(OperationContext* opCtx) { return; } - _abortTransaction(lock); + _abortTransaction(lock, opCtx); { stdx::lock_guard<stdx::mutex> ls(_statsMutex); // Add the latest operation stats to the aggregate OpDebug object stored in the @@ -986,7 +991,7 @@ void Session::abortActiveTransaction(OperationContext* opCtx) { repl::ReadConcernArgs::get(opCtx)); } -void Session::_abortTransaction(WithLock wl) { +void Session::_abortTransaction(WithLock wl, OperationContext* opCtx) { // TODO SERVER-33432 Disallow aborting committed transaction after we implement implicit abort. // A transaction in kCommitting state will either commit or abort for storage-layer reasons; it // is too late to abort externally. @@ -1011,6 +1016,10 @@ void Session::_abortTransaction(WithLock wl) { MultiDocumentTransactionState::kAborted, _txnResourceStash->getReadConcernArgs()); ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentInactive(); + + if (opCtx->recoveryUnit()->getNoEvictionAfterRollback()) { + _txnResourceStash->setNoEvictionAfterRollback(); + } } else { ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentActive(); } @@ -1026,7 +1035,9 @@ void Session::_abortTransaction(WithLock wl) { .incrementGlobalTransactionLatencyStats(_singleTransactionStats.getDuration(curTime)); } -void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) { +void Session::_beginOrContinueTxnOnMigration(WithLock wl, + OperationContext* opCtx, + TxnNumber txnNumber) { _checkValid(wl); // The value for 'autocommit' is only used to // generate the uassert error message. In this case, the exception will never be @@ -1037,13 +1048,13 @@ void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) { if (txnNumber == _activeTxnNumber) return; - _setActiveTxn(wl, txnNumber); + _setActiveTxn(wl, opCtx, txnNumber); } -void Session::_setActiveTxn(WithLock wl, TxnNumber txnNumber) { +void Session::_setActiveTxn(WithLock wl, OperationContext* opCtx, TxnNumber txnNumber) { // Abort the existing transaction if it's not committed or aborted. if (_txnState == MultiDocumentTransactionState::kInProgress) { - _abortTransaction(wl); + _abortTransaction(wl, opCtx); } _activeTxnNumber = txnNumber; _activeTxnCommittedStatements.clear(); @@ -1465,62 +1476,63 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) { - opCtx->recoveryUnit()->onCommit( - [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ]( - boost::optional<Timestamp>) { - RetryableWritesStats::get(getGlobalServiceContext()) - ->incrementTransactionsCollectionWriteCount(); + opCtx->recoveryUnit()->onCommit([ + this, + opCtx, + newTxnNumber, + stmtIdsWritten = std::move(stmtIdsWritten), + lastStmtIdWriteOpTime + ](boost::optional<Timestamp>) { + RetryableWritesStats::get(getGlobalServiceContext()) + ->incrementTransactionsCollectionWriteCount(); - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); - if (!_isValid) - return; + if (!_isValid) + return; - // The cache of the last written record must always be advanced after a write so that - // subsequent writes have the correct point to start from. - if (!_lastWrittenSessionRecord) { - _lastWrittenSessionRecord.emplace(); + // The cache of the last written record must always be advanced after a write so that + // subsequent writes have the correct point to start from. + if (!_lastWrittenSessionRecord) { + _lastWrittenSessionRecord.emplace(); - _lastWrittenSessionRecord->setSessionId(_sessionId); + _lastWrittenSessionRecord->setSessionId(_sessionId); + _lastWrittenSessionRecord->setTxnNum(newTxnNumber); + _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); + } else { + if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } else { - if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) - _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) - _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } + if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) + _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); + } - if (newTxnNumber > _activeTxnNumber) { - // This call is necessary in order to advance the txn number and reset the cached - // state in the case where just before the storage transaction commits, the cache - // entry gets invalidated and immediately refreshed while there were no writes for - // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber - // and we will fail to update the cache even though the write was successful. - _beginOrContinueTxn(lg, newTxnNumber, boost::none, boost::none); - } + if (newTxnNumber > _activeTxnNumber) { + // This call is necessary in order to advance the txn number and reset the cached + // state in the case where just before the storage transaction commits, the cache + // entry gets invalidated and immediately refreshed while there were no writes for + // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber + // and we will fail to update the cache even though the write was successful. + _beginOrContinueTxn(lg, opCtx, newTxnNumber, boost::none, boost::none); + } - if (newTxnNumber == _activeTxnNumber) { - for (const auto stmtId : stmtIdsWritten) { - if (stmtId == kIncompleteHistoryStmtId) { - _hasIncompleteHistory = true; - continue; - } + if (newTxnNumber == _activeTxnNumber) { + for (const auto stmtId : stmtIdsWritten) { + if (stmtId == kIncompleteHistoryStmtId) { + _hasIncompleteHistory = true; + continue; + } - const auto insertRes = - _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); - if (!insertRes.second) { - const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(_sessionId, - newTxnNumber, - stmtId, - existingOpTime, - lastStmtIdWriteOpTime); - } + const auto insertRes = + _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); + if (!insertRes.second) { + const auto& existingOpTime = insertRes.first->second; + fassertOnRepeatedExecution( + _sessionId, newTxnNumber, stmtId, existingOpTime, lastStmtIdWriteOpTime); } } - }); + } + }); MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) { const auto& data = customArgs.getData(); diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index df9bceeae2e..fdf7358efdf 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -111,6 +111,8 @@ public: return _readConcernArgs; } + void setNoEvictionAfterRollback(); + private: bool _released = false; std::unique_ptr<Locker> _locker; @@ -270,13 +272,13 @@ public: /** * Aborts the transaction outside the transaction, releasing transaction resources. */ - void abortArbitraryTransaction(); + void abortArbitraryTransaction(OperationContext* opCtx); /** * Same as abortArbitraryTransaction, except only executes if _transactionExpireDate indicates * that the transaction has expired. */ - void abortArbitraryTransactionIfExpired(); + void abortArbitraryTransactionIfExpired(OperationContext* opCtx); /* * Aborts the transaction inside the transaction, releasing transaction resources. @@ -412,11 +414,12 @@ private: static CursorExistsFunction _cursorExistsFunction; void _beginOrContinueTxn(WithLock, + OperationContext* opCtx, TxnNumber txnNumber, boost::optional<bool> autocommit, boost::optional<bool> startTransaction); - void _beginOrContinueTxnOnMigration(WithLock, TxnNumber txnNumber); + void _beginOrContinueTxnOnMigration(WithLock, OperationContext* opCtx, TxnNumber txnNumber); // Checks if there is a conflicting operation on the current Session void _checkValid(WithLock) const; @@ -426,7 +429,7 @@ private: // than the current one. void _checkTxnValid(WithLock, TxnNumber txnNumber, boost::optional<bool> autocommit) const; - void _setActiveTxn(WithLock, TxnNumber txnNumber); + void _setActiveTxn(WithLock, OperationContext* opCtx, TxnNumber txnNumber); void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber, bool checkAbort) const; @@ -457,7 +460,7 @@ private: // Releases stashed transaction resources to abort the transaction. - void _abortTransaction(WithLock); + void _abortTransaction(WithLock, OperationContext* opCtx); // Committing a transaction first changes its state to "Committing" and writes to the oplog, // then it changes the state to "Committed". diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index fcd5ad7b9dc..60eb1134448 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -1080,7 +1080,7 @@ TEST_F(SessionTest, AbortClearsStoredStatements) { // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } session.stashTransactionResources(opCtx()); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); ASSERT_TRUE(session.transactionOperationsForTest().empty()); ASSERT_TRUE(session.transactionIsAborted()); } @@ -1119,7 +1119,7 @@ TEST_F(SessionTest, EmptyTransactionAbort) { // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } session.stashTransactionResources(opCtx()); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); ASSERT_TRUE(session.transactionIsAborted()); } @@ -1134,7 +1134,7 @@ TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) { session.beginOrContinueTxn(opCtx(), txnNum, false, true, "testDB", "find"); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); // An unstash after an abort should uassert. ASSERT_THROWS_CODE(session.unstashTransactionResources(opCtx(), "find"), @@ -1182,7 +1182,7 @@ TEST_F(SessionTest, ConcurrencyOfStashAndAbort) { session.unstashTransactionResources(opCtx(), "find"); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); // A stash after an abort should be a noop. session.stashTransactionResources(opCtx()); @@ -1225,7 +1225,7 @@ TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) { session.unstashTransactionResources(opCtx(), "insert"); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); // An addTransactionOperation() after an abort should uassert. auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); @@ -1272,7 +1272,7 @@ TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) { session.unstashTransactionResources(opCtx(), "insert"); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); // An endTransactionAndRetrieveOperations() after an abort should uassert. ASSERT_THROWS_CODE(session.endTransactionAndRetrieveOperations(opCtx()), @@ -1318,7 +1318,7 @@ TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) { session.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction may be aborted without checking out the session. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); // An commitTransaction() after an abort should uassert. ASSERT_THROWS_CODE( @@ -1428,7 +1428,7 @@ TEST_F(SessionTest, IncrementTotalAbortedUponAbort) { unsigned long long beforeAbortCount = ServerTransactionsMetrics::get(opCtx())->getTotalAborted(); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); // Assert that the aborted counter is incremented by 1. ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalAborted(), beforeAbortCount + 1U); @@ -1459,7 +1459,7 @@ TEST_F(SessionTest, TrackTotalOpenTransactionsWithAbort) { beforeTransactionStart + 1U); // Tests that aborting a transaction decrements the open transactions counter by 1. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentOpen(), beforeTransactionStart); } @@ -1576,7 +1576,7 @@ TEST_F(SessionTest, TrackTotalActiveAndInactiveTransactionsWithStashedAbort) { beforeInactiveCounter + 1U); // Tests that aborting a stashed transaction decrements the inactive counter only. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActiveCounter); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter); } @@ -1608,7 +1608,7 @@ TEST_F(SessionTest, TrackTotalActiveAndInactiveTransactionsWithUnstashedAbort) { ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter); // Tests that aborting a stashed transaction decrements the active counter only. - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActiveCounter); ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter); } @@ -1726,7 +1726,7 @@ TEST_F(TransactionsMetricsTest, SingleTransactionStatsDurationShouldBeSetUponAbo sleepmillis(10); unsigned long long timeBeforeTxnAbort = curTimeMicros64(); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); unsigned long long timeAfterTxnAbort = curTimeMicros64(); ASSERT_GTE(session.getSingleTransactionStats()->getDuration(curTimeMicros64()), @@ -1796,7 +1796,7 @@ TEST_F(TransactionsMetricsTest, SingleTransactionStatsDurationShouldKeepIncreasi ASSERT_GT(session.getSingleTransactionStats()->getDuration(curTimeMicros64()), txnDurationAfterStart); sleepmillis(10); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); // Sleep here to allow enough time to elapse. sleepmillis(10); @@ -1876,7 +1876,7 @@ TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldBeSetUponUnstashAndAbort) session.unstashTransactionResources(opCtx(), "insert"); // Sleep here to allow enough time to elapse. sleepmillis(10); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); // Time active should have increased. ASSERT_GT(session.getSingleTransactionStats()->getTimeActiveMicros(curTimeMicros64()), @@ -1905,7 +1905,7 @@ TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldNotBeSetUponAbortOnly) { ASSERT_EQ(session.getSingleTransactionStats()->getTimeActiveMicros(curTimeMicros64()), Microseconds{0}); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); // Time active should not have increased. ASSERT_EQ(session.getSingleTransactionStats()->getTimeActiveMicros(curTimeMicros64()), @@ -2234,7 +2234,7 @@ TEST_F(TransactionsMetricsTest, TimeInactiveMicrosShouldBeSetUponUnstashAndAbort timeInactiveSoFar); session.unstashTransactionResources(opCtx(), "insert"); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); timeInactiveSoFar = session.getSingleTransactionStats()->getTimeInactiveMicros(curTimeMicros64()); @@ -2737,7 +2737,7 @@ TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowStashedAbort) { sleepmillis(5 * serverGlobalParams.slowMS); startCapturingLogMessages(); - session.abortArbitraryTransaction(); + session.abortArbitraryTransaction(opCtx()); stopCapturingLogMessages(); std::string expectedTransactionInfo = "transaction " + diff --git a/src/mongo/db/storage/recovery_unit.h b/src/mongo/db/storage/recovery_unit.h index 9f4f47f6142..c60fd75126e 100644 --- a/src/mongo/db/storage/recovery_unit.h +++ b/src/mongo/db/storage/recovery_unit.h @@ -413,8 +413,18 @@ public: virtual void setOrderedCommit(bool orderedCommit) = 0; + void setNoEvictionAfterRollback() { + _noEvictionAfterRollback = true; + } + + bool getNoEvictionAfterRollback() const { + return _noEvictionAfterRollback; + } + protected: RecoveryUnit() {} + + bool _noEvictionAfterRollback = false; }; } // namespace mongo diff --git a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp index 7ec4e21c916..9030fbe2583 100644 --- a/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp +++ b/src/mongo/db/storage/wiredtiger/wiredtiger_recovery_unit.cpp @@ -363,8 +363,16 @@ void WiredTigerRecoveryUnit::_txnClose(bool commit) { wtRet = s->commit_transaction(s, nullptr); LOG(3) << "WT commit_transaction for snapshot id " << _mySnapshotId; } else { - wtRet = s->rollback_transaction(s, nullptr); - invariant(!wtRet); + StringBuilder config; + if (_noEvictionAfterRollback) { + // The only point at which rollback_transaction() can time out is in the bonus-eviction + // phase. If the timeout expires here, the function will stop the eviction and return + // success. It cannot return an error due to timeout. + config << "operation_timeout_ms=1,"; + } + + wtRet = s->rollback_transaction(s, config.str().c_str()); + LOG(3) << "WT rollback_transaction for snapshot id " << _mySnapshotId; } |