summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_participant.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_participant.cpp')
-rw-r--r--src/mongo/db/transaction_participant.cpp142
1 files changed, 44 insertions, 98 deletions
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 50812cf4c8e..d3480a14327 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -43,7 +43,6 @@
#include "mongo/db/session.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/stats/fill_locker_info.h"
-#include "mongo/db/stats/top.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_utils.h"
@@ -208,17 +207,13 @@ void TransactionParticipant::_beginMultiDocumentTransaction(WithLock wl, TxnNumb
_txnState.transitionTo(wl, TransactionState::kInProgress);
- // Tracks various transactions metrics.
- _singleTransactionStats.setStartTime(curTimeMicros64());
- _transactionExpireDate =
- Date_t::fromMillisSinceEpoch(_singleTransactionStats.getStartTime() / 1000) +
+ // Start tracking various transactions metrics.
+ auto curTime = curTimeMicros64();
+ _transactionMetricsObserver.onStart(ServerTransactionsMetrics::get(getGlobalServiceContext()),
+ curTime);
+ _transactionExpireDate = Date_t::fromMillisSinceEpoch(curTime / 1000) +
stdx::chrono::seconds{transactionLifetimeLimitSeconds.load()};
- ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementTotalStarted();
- // The transaction is considered open here and stays inactive until its first unstash event.
- ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementCurrentOpen();
- ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementCurrentInactive();
-
invariant(_transactionOperations.empty());
}
@@ -393,26 +388,12 @@ TransactionParticipant::SideTransactionBlock::~SideTransactionBlock() {
void TransactionParticipant::_stashActiveTransaction(WithLock, OperationContext* opCtx) {
invariant(_activeTxnNumber == opCtx->getTxnNumber());
- if (_singleTransactionStats.isActive()) {
- _singleTransactionStats.setInactive(curTimeMicros64());
- }
-
- // Add the latest operation stats to the aggregate OpDebug object stored in the
- // SingleTransactionStats instance on the Session.
- _singleTransactionStats.getOpDebug()->additiveMetrics.add(
- CurOp::get(opCtx)->debug().additiveMetrics);
+ _transactionMetricsObserver.onStash(ServerTransactionsMetrics::get(opCtx), curTimeMicros64());
+ _transactionMetricsObserver.onTransactionOperation(opCtx->getClient(),
+ CurOp::get(opCtx)->debug().additiveMetrics);
invariant(!_txnResourceStash);
_txnResourceStash = TxnResources(opCtx);
-
- // We accept possible slight inaccuracies in these counters from non-atomicity.
- ServerTransactionsMetrics::get(opCtx)->decrementCurrentActive();
- ServerTransactionsMetrics::get(opCtx)->incrementCurrentInactive();
-
- // Update the LastClientInfo object stored in the SingleTransactionStats instance on the Session
- // with this Client's information. This is the last client that ran a transaction operation on
- // the Session.
- _singleTransactionStats.updateLastClientInfo(opCtx->getClient());
}
@@ -491,13 +472,8 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
readConcernArgs.isEmpty());
_txnResourceStash->release(opCtx);
_txnResourceStash = boost::none;
- // Set the starting active time for this transaction.
- if (_txnState.isInProgress(lk)) {
- _singleTransactionStats.setActive(curTimeMicros64());
- }
- // We accept possible slight inaccuracies in these counters from non-atomicity.
- ServerTransactionsMetrics::get(opCtx)->incrementCurrentActive();
- ServerTransactionsMetrics::get(opCtx)->decrementCurrentInactive();
+ _transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx),
+ curTimeMicros64());
return;
}
@@ -513,11 +489,9 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
// Stashed transaction resources do not exist for this in-progress multi-document
// transaction. Set up the transaction resources on the opCtx.
opCtx->setWriteUnitOfWork(std::make_unique<WriteUnitOfWork>(opCtx));
- ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementCurrentActive();
- ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentInactive();
- // Set the starting active time for this transaction.
- _singleTransactionStats.setActive(curTimeMicros64());
+ _transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx),
+ curTimeMicros64());
// If maxTransactionLockRequestTimeoutMillis is set, then we will ensure no
// future lock request waits longer than maxTransactionLockRequestTimeoutMillis
@@ -740,28 +714,11 @@ void TransactionParticipant::_commitTransaction(stdx::unique_lock<stdx::mutex> l
_txnState.transitionTo(lk, TransactionState::kCommitted);
- // After the transaction has been committed, we must update the end time and mark it as
- // inactive.
- const auto now = curTimeMicros64();
- _singleTransactionStats.setEndTime(now);
- if (_singleTransactionStats.isActive()) {
- _singleTransactionStats.setInactive(now);
- }
-
- ServerTransactionsMetrics::get(opCtx)->incrementTotalCommitted();
- ServerTransactionsMetrics::get(opCtx)->decrementCurrentOpen();
- ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentActive();
- Top::get(getGlobalServiceContext())
- .incrementGlobalTransactionLatencyStats(_singleTransactionStats.getDuration(now));
-
- // Add the latest operation stats to the aggregate OpDebug object stored in the
- // SingleTransactionStats instance on the Session.
- _singleTransactionStats.getOpDebug()->additiveMetrics.add(
- CurOp::get(opCtx)->debug().additiveMetrics);
-
- // Update the LastClientInfo object stored in the SingleTransactionStats instance on the Session
- // with this Client's information.
- _singleTransactionStats.updateLastClientInfo(opCtx->getClient());
+ const auto curTime = curTimeMicros64();
+ _transactionMetricsObserver.onCommit(
+ ServerTransactionsMetrics::get(opCtx), curTime, &Top::get(getGlobalServiceContext()));
+ _transactionMetricsObserver.onTransactionOperation(opCtx->getClient(),
+ CurOp::get(opCtx)->debug().additiveMetrics);
// Log the transaction if its duration is longer than the slowMS command threshold.
_logSlowTransaction(lk,
@@ -834,14 +791,8 @@ void TransactionParticipant::_abortActiveTransaction(WithLock lock,
invariant(!_txnResourceStash);
if (!_txnState.isNone(lock)) {
- // Add the latest operation stats to the aggregate OpDebug object stored in the
- // SingleTransactionStats instance on the Session.
- _singleTransactionStats.getOpDebug()->additiveMetrics.add(
- CurOp::get(opCtx)->debug().additiveMetrics);
-
- // Update the LastClientInfo object stored in the SingleTransactionStats instance on the
- // Session with this Client's information.
- _singleTransactionStats.updateLastClientInfo(opCtx->getClient());
+ _transactionMetricsObserver.onTransactionOperation(
+ opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics);
}
// Only abort the transaction in session if it's in expected states.
@@ -881,28 +832,24 @@ void TransactionParticipant::_abortActiveTransaction(WithLock lock,
}
void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
- const auto now = curTimeMicros64();
- if (!_txnState.isNone(wl)) {
- _singleTransactionStats.setEndTime(now);
- // The transaction has aborted, so we mark it as inactive.
- if (_singleTransactionStats.isActive()) {
- _singleTransactionStats.setInactive(now);
- }
- }
-
+ const auto curTime = curTimeMicros64();
// If the transaction is stashed, then we have aborted an inactive transaction.
if (_txnResourceStash) {
// The transaction is stashed, so we abort the inactive transaction on session.
+ _transactionMetricsObserver.onAbortInactive(
+ ServerTransactionsMetrics::get(getGlobalServiceContext()),
+ curTime,
+ &Top::get(getGlobalServiceContext()));
_logSlowTransaction(wl,
&(_txnResourceStash->locker()->getLockerInfo())->stats,
TransactionState::kAborted,
_txnResourceStash->getReadConcernArgs());
_txnResourceStash = boost::none;
- ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentInactive();
} else {
- // Transaction resource has been unstashed and transferred into an active opCtx, which will
- // clean it up.
- ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentActive();
+ _transactionMetricsObserver.onAbortActive(
+ ServerTransactionsMetrics::get(getGlobalServiceContext()),
+ curTime,
+ &Top::get(getGlobalServiceContext()));
}
_transactionOperationBytes = 0;
@@ -911,12 +858,6 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_speculativeTransactionReadOpTime = repl::OpTime();
_getSession()->unlockTxnNumber();
-
- ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementTotalAborted();
- ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentOpen();
-
- Top::get(getGlobalServiceContext())
- .incrementGlobalTransactionLatencyStats(_singleTransactionStats.getDuration(now));
}
void TransactionParticipant::_cleanUpTxnResourceOnOpCtx(WithLock wl, OperationContext* opCtx) {
@@ -1000,7 +941,8 @@ void TransactionParticipant::reportStashedState(BSONObjBuilder* builder) const {
builder->append("host", getHostNameCachedAndPort());
builder->append("desc", "inactive transaction");
- auto lastClientInfo = _singleTransactionStats.getLastClientInfo();
+ auto lastClientInfo =
+ _transactionMetricsObserver.getSingleTransactionStats().getLastClientInfo();
builder->append("client", lastClientInfo.clientHostAndPort);
builder->append("connectionId", lastClientInfo.connectionId);
builder->append("appName", lastClientInfo.appName);
@@ -1150,19 +1092,21 @@ void TransactionParticipant::_reportTransactionStats(WithLock wl,
parametersBuilder.done();
builder->append("readTimestamp", _speculativeTransactionReadOpTime.getTimestamp());
+
+ auto singleTransactionStats = _transactionMetricsObserver.getSingleTransactionStats();
builder->append("startWallClockTime",
dateToISOStringLocal(Date_t::fromMillisSinceEpoch(
- _singleTransactionStats.getStartTime() / 1000)));
+ singleTransactionStats.getStartTime() / 1000)));
// We use the same "now" time so that the following time metrics are consistent with each other.
auto curTime = curTimeMicros64();
builder->append("timeOpenMicros",
- static_cast<long long>(_singleTransactionStats.getDuration(curTime)));
+ static_cast<long long>(singleTransactionStats.getDuration(curTime)));
auto timeActive =
- durationCount<Microseconds>(_singleTransactionStats.getTimeActiveMicros(curTime));
+ durationCount<Microseconds>(singleTransactionStats.getTimeActiveMicros(curTime));
auto timeInactive =
- durationCount<Microseconds>(_singleTransactionStats.getTimeInactiveMicros(curTime));
+ durationCount<Microseconds>(singleTransactionStats.getTimeInactiveMicros(curTime));
builder->append("timeActiveMicros", timeActive);
builder->append("timeInactiveMicros", timeInactive);
@@ -1278,7 +1222,9 @@ std::string TransactionParticipant::_transactionInfoForLog(
s << " readTimestamp:" << _speculativeTransactionReadOpTime.getTimestamp().toString() << ",";
- s << _singleTransactionStats.getOpDebug()->additiveMetrics.report();
+ auto singleTransactionStats = _transactionMetricsObserver.getSingleTransactionStats();
+
+ s << singleTransactionStats.getOpDebug()->additiveMetrics.report();
std::string terminationCauseString =
terminationCause == TransactionState::kCommitted ? "committed" : "aborted";
@@ -1286,9 +1232,9 @@ std::string TransactionParticipant::_transactionInfoForLog(
auto curTime = curTimeMicros64();
s << " timeActiveMicros:"
- << durationCount<Microseconds>(_singleTransactionStats.getTimeActiveMicros(curTime));
+ << durationCount<Microseconds>(singleTransactionStats.getTimeActiveMicros(curTime));
s << " timeInactiveMicros:"
- << durationCount<Microseconds>(_singleTransactionStats.getTimeInactiveMicros(curTime));
+ << durationCount<Microseconds>(singleTransactionStats.getTimeInactiveMicros(curTime));
// Number of yields is always 0 in multi-document transactions, but it is included mainly to
// match the format with other slow operation logging messages.
@@ -1301,7 +1247,7 @@ std::string TransactionParticipant::_transactionInfoForLog(
// Total duration of the transaction.
s << " "
- << Milliseconds{static_cast<long long>(_singleTransactionStats.getDuration(curTime)) / 1000};
+ << Milliseconds{static_cast<long long>(singleTransactionStats.getDuration(curTime)) / 1000};
return s.str();
}
@@ -1313,7 +1259,7 @@ void TransactionParticipant::_logSlowTransaction(WithLock wl,
// Only log multi-document transactions.
if (!_txnState.isNone(wl)) {
// Log the transaction if its duration is longer than the slowMS command threshold.
- if (_singleTransactionStats.getDuration(curTimeMicros64()) >
+ if (_transactionMetricsObserver.getSingleTransactionStats().getDuration(curTimeMicros64()) >
serverGlobalParams.slowMS * 1000ULL) {
log(logger::LogComponent::kTransaction)
<< "transaction "
@@ -1341,7 +1287,7 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN
_activeTxnNumber = txnNumber;
_txnState.transitionTo(wl, TransactionState::kNone);
- _singleTransactionStats = SingleTransactionStats();
+ _transactionMetricsObserver.resetSingleTransactionStats();
_speculativeTransactionReadOpTime = repl::OpTime();
_multikeyPathInfo.clear();
_autoCommit = boost::none;