diff options
Diffstat (limited to 'src/mongo/db/transaction_participant.cpp')
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 142 |
1 files changed, 44 insertions, 98 deletions
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 50812cf4c8e..d3480a14327 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -43,7 +43,6 @@ #include "mongo/db/session.h" #include "mongo/db/session_catalog.h" #include "mongo/db/stats/fill_locker_info.h" -#include "mongo/db/stats/top.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_utils.h" @@ -208,17 +207,13 @@ void TransactionParticipant::_beginMultiDocumentTransaction(WithLock wl, TxnNumb _txnState.transitionTo(wl, TransactionState::kInProgress); - // Tracks various transactions metrics. - _singleTransactionStats.setStartTime(curTimeMicros64()); - _transactionExpireDate = - Date_t::fromMillisSinceEpoch(_singleTransactionStats.getStartTime() / 1000) + + // Start tracking various transactions metrics. + auto curTime = curTimeMicros64(); + _transactionMetricsObserver.onStart(ServerTransactionsMetrics::get(getGlobalServiceContext()), + curTime); + _transactionExpireDate = Date_t::fromMillisSinceEpoch(curTime / 1000) + stdx::chrono::seconds{transactionLifetimeLimitSeconds.load()}; - ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementTotalStarted(); - // The transaction is considered open here and stays inactive until its first unstash event. - ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementCurrentOpen(); - ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementCurrentInactive(); - invariant(_transactionOperations.empty()); } @@ -393,26 +388,12 @@ TransactionParticipant::SideTransactionBlock::~SideTransactionBlock() { void TransactionParticipant::_stashActiveTransaction(WithLock, OperationContext* opCtx) { invariant(_activeTxnNumber == opCtx->getTxnNumber()); - if (_singleTransactionStats.isActive()) { - _singleTransactionStats.setInactive(curTimeMicros64()); - } - - // Add the latest operation stats to the aggregate OpDebug object stored in the - // SingleTransactionStats instance on the Session. - _singleTransactionStats.getOpDebug()->additiveMetrics.add( - CurOp::get(opCtx)->debug().additiveMetrics); + _transactionMetricsObserver.onStash(ServerTransactionsMetrics::get(opCtx), curTimeMicros64()); + _transactionMetricsObserver.onTransactionOperation(opCtx->getClient(), + CurOp::get(opCtx)->debug().additiveMetrics); invariant(!_txnResourceStash); _txnResourceStash = TxnResources(opCtx); - - // We accept possible slight inaccuracies in these counters from non-atomicity. - ServerTransactionsMetrics::get(opCtx)->decrementCurrentActive(); - ServerTransactionsMetrics::get(opCtx)->incrementCurrentInactive(); - - // Update the LastClientInfo object stored in the SingleTransactionStats instance on the Session - // with this Client's information. This is the last client that ran a transaction operation on - // the Session. - _singleTransactionStats.updateLastClientInfo(opCtx->getClient()); } @@ -491,13 +472,8 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx readConcernArgs.isEmpty()); _txnResourceStash->release(opCtx); _txnResourceStash = boost::none; - // Set the starting active time for this transaction. - if (_txnState.isInProgress(lk)) { - _singleTransactionStats.setActive(curTimeMicros64()); - } - // We accept possible slight inaccuracies in these counters from non-atomicity. - ServerTransactionsMetrics::get(opCtx)->incrementCurrentActive(); - ServerTransactionsMetrics::get(opCtx)->decrementCurrentInactive(); + _transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx), + curTimeMicros64()); return; } @@ -513,11 +489,9 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx // Stashed transaction resources do not exist for this in-progress multi-document // transaction. Set up the transaction resources on the opCtx. opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx)); - ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementCurrentActive(); - ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentInactive(); - // Set the starting active time for this transaction. - _singleTransactionStats.setActive(curTimeMicros64()); + _transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx), + curTimeMicros64()); // If maxTransactionLockRequestTimeoutMillis is set, then we will ensure no // future lock request waits longer than maxTransactionLockRequestTimeoutMillis @@ -740,28 +714,11 @@ void TransactionParticipant::_commitTransaction(stdx::unique_lock<stdx::mutex> l _txnState.transitionTo(lk, TransactionState::kCommitted); - // After the transaction has been committed, we must update the end time and mark it as - // inactive. - const auto now = curTimeMicros64(); - _singleTransactionStats.setEndTime(now); - if (_singleTransactionStats.isActive()) { - _singleTransactionStats.setInactive(now); - } - - ServerTransactionsMetrics::get(opCtx)->incrementTotalCommitted(); - ServerTransactionsMetrics::get(opCtx)->decrementCurrentOpen(); - ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentActive(); - Top::get(getGlobalServiceContext()) - .incrementGlobalTransactionLatencyStats(_singleTransactionStats.getDuration(now)); - - // Add the latest operation stats to the aggregate OpDebug object stored in the - // SingleTransactionStats instance on the Session. - _singleTransactionStats.getOpDebug()->additiveMetrics.add( - CurOp::get(opCtx)->debug().additiveMetrics); - - // Update the LastClientInfo object stored in the SingleTransactionStats instance on the Session - // with this Client's information. - _singleTransactionStats.updateLastClientInfo(opCtx->getClient()); + const auto curTime = curTimeMicros64(); + _transactionMetricsObserver.onCommit( + ServerTransactionsMetrics::get(opCtx), curTime, &Top::get(getGlobalServiceContext())); + _transactionMetricsObserver.onTransactionOperation(opCtx->getClient(), + CurOp::get(opCtx)->debug().additiveMetrics); // Log the transaction if its duration is longer than the slowMS command threshold. _logSlowTransaction(lk, @@ -834,14 +791,8 @@ void TransactionParticipant::_abortActiveTransaction(WithLock lock, invariant(!_txnResourceStash); if (!_txnState.isNone(lock)) { - // Add the latest operation stats to the aggregate OpDebug object stored in the - // SingleTransactionStats instance on the Session. - _singleTransactionStats.getOpDebug()->additiveMetrics.add( - CurOp::get(opCtx)->debug().additiveMetrics); - - // Update the LastClientInfo object stored in the SingleTransactionStats instance on the - // Session with this Client's information. - _singleTransactionStats.updateLastClientInfo(opCtx->getClient()); + _transactionMetricsObserver.onTransactionOperation( + opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics); } // Only abort the transaction in session if it's in expected states. @@ -881,28 +832,24 @@ void TransactionParticipant::_abortActiveTransaction(WithLock lock, } void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { - const auto now = curTimeMicros64(); - if (!_txnState.isNone(wl)) { - _singleTransactionStats.setEndTime(now); - // The transaction has aborted, so we mark it as inactive. - if (_singleTransactionStats.isActive()) { - _singleTransactionStats.setInactive(now); - } - } - + const auto curTime = curTimeMicros64(); // If the transaction is stashed, then we have aborted an inactive transaction. if (_txnResourceStash) { // The transaction is stashed, so we abort the inactive transaction on session. + _transactionMetricsObserver.onAbortInactive( + ServerTransactionsMetrics::get(getGlobalServiceContext()), + curTime, + &Top::get(getGlobalServiceContext())); _logSlowTransaction(wl, &(_txnResourceStash->locker()->getLockerInfo())->stats, TransactionState::kAborted, _txnResourceStash->getReadConcernArgs()); _txnResourceStash = boost::none; - ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentInactive(); } else { - // Transaction resource has been unstashed and transferred into an active opCtx, which will - // clean it up. - ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentActive(); + _transactionMetricsObserver.onAbortActive( + ServerTransactionsMetrics::get(getGlobalServiceContext()), + curTime, + &Top::get(getGlobalServiceContext())); } _transactionOperationBytes = 0; @@ -911,12 +858,6 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _speculativeTransactionReadOpTime = repl::OpTime(); _getSession()->unlockTxnNumber(); - - ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementTotalAborted(); - ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentOpen(); - - Top::get(getGlobalServiceContext()) - .incrementGlobalTransactionLatencyStats(_singleTransactionStats.getDuration(now)); } void TransactionParticipant::_cleanUpTxnResourceOnOpCtx(WithLock wl, OperationContext* opCtx) { @@ -1000,7 +941,8 @@ void TransactionParticipant::reportStashedState(BSONObjBuilder* builder) const { builder->append("host", getHostNameCachedAndPort()); builder->append("desc", "inactive transaction"); - auto lastClientInfo = _singleTransactionStats.getLastClientInfo(); + auto lastClientInfo = + _transactionMetricsObserver.getSingleTransactionStats().getLastClientInfo(); builder->append("client", lastClientInfo.clientHostAndPort); builder->append("connectionId", lastClientInfo.connectionId); builder->append("appName", lastClientInfo.appName); @@ -1150,19 +1092,21 @@ void TransactionParticipant::_reportTransactionStats(WithLock wl, parametersBuilder.done(); builder->append("readTimestamp", _speculativeTransactionReadOpTime.getTimestamp()); + + auto singleTransactionStats = _transactionMetricsObserver.getSingleTransactionStats(); builder->append("startWallClockTime", dateToISOStringLocal(Date_t::fromMillisSinceEpoch( - _singleTransactionStats.getStartTime() / 1000))); + singleTransactionStats.getStartTime() / 1000))); // We use the same "now" time so that the following time metrics are consistent with each other. auto curTime = curTimeMicros64(); builder->append("timeOpenMicros", - static_cast<long long>(_singleTransactionStats.getDuration(curTime))); + static_cast<long long>(singleTransactionStats.getDuration(curTime))); auto timeActive = - durationCount<Microseconds>(_singleTransactionStats.getTimeActiveMicros(curTime)); + durationCount<Microseconds>(singleTransactionStats.getTimeActiveMicros(curTime)); auto timeInactive = - durationCount<Microseconds>(_singleTransactionStats.getTimeInactiveMicros(curTime)); + durationCount<Microseconds>(singleTransactionStats.getTimeInactiveMicros(curTime)); builder->append("timeActiveMicros", timeActive); builder->append("timeInactiveMicros", timeInactive); @@ -1278,7 +1222,9 @@ std::string TransactionParticipant::_transactionInfoForLog( s << " readTimestamp:" << _speculativeTransactionReadOpTime.getTimestamp().toString() << ","; - s << _singleTransactionStats.getOpDebug()->additiveMetrics.report(); + auto singleTransactionStats = _transactionMetricsObserver.getSingleTransactionStats(); + + s << singleTransactionStats.getOpDebug()->additiveMetrics.report(); std::string terminationCauseString = terminationCause == TransactionState::kCommitted ? "committed" : "aborted"; @@ -1286,9 +1232,9 @@ std::string TransactionParticipant::_transactionInfoForLog( auto curTime = curTimeMicros64(); s << " timeActiveMicros:" - << durationCount<Microseconds>(_singleTransactionStats.getTimeActiveMicros(curTime)); + << durationCount<Microseconds>(singleTransactionStats.getTimeActiveMicros(curTime)); s << " timeInactiveMicros:" - << durationCount<Microseconds>(_singleTransactionStats.getTimeInactiveMicros(curTime)); + << durationCount<Microseconds>(singleTransactionStats.getTimeInactiveMicros(curTime)); // Number of yields is always 0 in multi-document transactions, but it is included mainly to // match the format with other slow operation logging messages. @@ -1301,7 +1247,7 @@ std::string TransactionParticipant::_transactionInfoForLog( // Total duration of the transaction. s << " " - << Milliseconds{static_cast<long long>(_singleTransactionStats.getDuration(curTime)) / 1000}; + << Milliseconds{static_cast<long long>(singleTransactionStats.getDuration(curTime)) / 1000}; return s.str(); } @@ -1313,7 +1259,7 @@ void TransactionParticipant::_logSlowTransaction(WithLock wl, // Only log multi-document transactions. if (!_txnState.isNone(wl)) { // Log the transaction if its duration is longer than the slowMS command threshold. - if (_singleTransactionStats.getDuration(curTimeMicros64()) > + if (_transactionMetricsObserver.getSingleTransactionStats().getDuration(curTimeMicros64()) > serverGlobalParams.slowMS * 1000ULL) { log(logger::LogComponent::kTransaction) << "transaction " @@ -1341,7 +1287,7 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN _activeTxnNumber = txnNumber; _txnState.transitionTo(wl, TransactionState::kNone); - _singleTransactionStats = SingleTransactionStats(); + _transactionMetricsObserver.resetSingleTransactionStats(); _speculativeTransactionReadOpTime = repl::OpTime(); _multikeyPathInfo.clear(); _autoCommit = boost::none; |