summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2019-09-12 14:49:10 +0000
committerevergreen <evergreen@mongodb.com>2019-09-12 14:49:10 +0000
commit523d4a46f4cdc9cc928f85eed2c63bc898c8ca63 (patch)
tree629763fde75f71d00920e00a802d4c6f8280f5a5 /src/mongo/s
parentf57dacda0dedbe206e4215d7fe0a7685d587cfe0 (diff)
downloadmongo-523d4a46f4cdc9cc928f85eed2c63bc898c8ca63.tar.gz
SERVER-41376 Track time transactions on mongos are active and inactive and include in slow txn logging
(cherry picked from commit 4d59f45f85919ddaffa260fb76d1e7dbd8950edf) SERVER-42907 Add timeActiveMicros and timeInactiveMicros for transactions in mongos currentOp output (cherry picked from commit 387b570d0ddf0dc87c888d030651357152484191) SERVER-39573 Prefix TransactionCoordinator logs with transaction id (cherry picked from commit f92b912452b540fdcbb1b3b959391fb31e64d408) SERVER-42963 For active sessions only set transaction stats in TransactionRouter::Observer::_reportState() (cherry picked from commit 4b526b31c6560ec4c632c8062b057d4e346fb1d0) SERVER-42963 Add uses_atclustertime tag to router_transaction_current_op.js (cherry picked from commit 936595473bd8423b79e477a442d3093aa11d80c8) SERVER-41374 Track current open, active, and inactive transactions on mongos in serverStatus (cherry picked from commit 953e1692d51c6742f1bb1c61dcfece75338afeae)
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/router_transactions_metrics.cpp54
-rw-r--r--src/mongo/s/router_transactions_metrics.h35
-rw-r--r--src/mongo/s/router_transactions_stats.idl9
-rw-r--r--src/mongo/s/server.cpp4
-rw-r--r--src/mongo/s/session_catalog_router.cpp7
-rw-r--r--src/mongo/s/session_catalog_router.h1
-rw-r--r--src/mongo/s/transaction_router.cpp328
-rw-r--r--src/mongo/s/transaction_router.h148
-rw-r--r--src/mongo/s/transaction_router_test.cpp648
10 files changed, 1099 insertions, 136 deletions
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 00a42bcc683..9c5072d1e2e 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -542,6 +542,7 @@ env.CppUnitTest(
'transaction_router_test.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/db/session_catalog',
'sharding_router_api',
'sharding_router_test_fixture',
]
diff --git a/src/mongo/s/router_transactions_metrics.cpp b/src/mongo/s/router_transactions_metrics.cpp
index e493d71892b..ddc8406354f 100644
--- a/src/mongo/s/router_transactions_metrics.cpp
+++ b/src/mongo/s/router_transactions_metrics.cpp
@@ -52,7 +52,43 @@ RouterTransactionsMetrics* RouterTransactionsMetrics::get(OperationContext* opCt
return get(opCtx->getServiceContext());
}
-std::int64_t RouterTransactionsMetrics::getTotalStarted() {
+std::int64_t RouterTransactionsMetrics::getCurrentOpen() const {
+ return _currentOpen.load();
+}
+
+void RouterTransactionsMetrics::incrementCurrentOpen() {
+ _currentOpen.fetchAndAdd(1);
+}
+
+void RouterTransactionsMetrics::decrementCurrentOpen() {
+ _currentOpen.fetchAndSubtract(1);
+}
+
+std::int64_t RouterTransactionsMetrics::getCurrentActive() const {
+ return _currentActive.load();
+}
+
+void RouterTransactionsMetrics::incrementCurrentActive() {
+ _currentActive.fetchAndAdd(1);
+}
+
+void RouterTransactionsMetrics::decrementCurrentActive() {
+ _currentActive.fetchAndSubtract(1);
+}
+
+std::int64_t RouterTransactionsMetrics::getCurrentInactive() const {
+ return _currentInactive.load();
+}
+
+void RouterTransactionsMetrics::incrementCurrentInactive() {
+ _currentInactive.fetchAndAdd(1);
+}
+
+void RouterTransactionsMetrics::decrementCurrentInactive() {
+ _currentInactive.fetchAndSubtract(1);
+}
+
+std::int64_t RouterTransactionsMetrics::getTotalStarted() const {
return _totalStarted.load();
}
@@ -60,7 +96,7 @@ void RouterTransactionsMetrics::incrementTotalStarted() {
_totalStarted.fetchAndAdd(1);
}
-std::int64_t RouterTransactionsMetrics::getTotalAborted() {
+std::int64_t RouterTransactionsMetrics::getTotalAborted() const {
return _totalAborted.load();
}
@@ -68,7 +104,7 @@ void RouterTransactionsMetrics::incrementTotalAborted() {
_totalAborted.fetchAndAdd(1);
}
-std::int64_t RouterTransactionsMetrics::getTotalCommitted() {
+std::int64_t RouterTransactionsMetrics::getTotalCommitted() const {
return _totalCommitted.load();
}
@@ -76,7 +112,7 @@ void RouterTransactionsMetrics::incrementTotalCommitted() {
_totalCommitted.fetchAndAdd(1);
}
-std::int64_t RouterTransactionsMetrics::getTotalContactedParticipants() {
+std::int64_t RouterTransactionsMetrics::getTotalContactedParticipants() const {
return _totalContactedParticipants.load();
}
@@ -84,7 +120,7 @@ void RouterTransactionsMetrics::incrementTotalContactedParticipants() {
_totalContactedParticipants.fetchAndAdd(1);
}
-std::int64_t RouterTransactionsMetrics::getTotalParticipantsAtCommit() {
+std::int64_t RouterTransactionsMetrics::getTotalParticipantsAtCommit() const {
return _totalParticipantsAtCommit.load();
}
@@ -92,7 +128,7 @@ void RouterTransactionsMetrics::addToTotalParticipantsAtCommit(std::int64_t inc)
_totalParticipantsAtCommit.fetchAndAdd(inc);
}
-std::int64_t RouterTransactionsMetrics::getTotalRequestsTargeted() {
+std::int64_t RouterTransactionsMetrics::getTotalRequestsTargeted() const {
return _totalRequestsTargeted.load();
}
@@ -101,7 +137,7 @@ void RouterTransactionsMetrics::incrementTotalRequestsTargeted() {
}
const RouterTransactionsMetrics::CommitStats& RouterTransactionsMetrics::getCommitTypeStats_forTest(
- TransactionRouter::CommitType commitType) {
+ TransactionRouter::CommitType commitType) const {
switch (commitType) {
case TransactionRouter::CommitType::kNotInitiated:
break;
@@ -205,6 +241,10 @@ CommitTypeStats RouterTransactionsMetrics::_constructCommitTypeStats(const Commi
}
void RouterTransactionsMetrics::updateStats(RouterTransactionsStats* stats) {
+ stats->setCurrentOpen(_currentOpen.load());
+ stats->setCurrentActive(_currentActive.load());
+ stats->setCurrentInactive(_currentInactive.load());
+
stats->setTotalStarted(_totalStarted.load());
stats->setTotalCommitted(_totalCommitted.load());
stats->setTotalAborted(_totalAborted.load());
diff --git a/src/mongo/s/router_transactions_metrics.h b/src/mongo/s/router_transactions_metrics.h
index a4e6710c457..ed496fe394c 100644
--- a/src/mongo/s/router_transactions_metrics.h
+++ b/src/mongo/s/router_transactions_metrics.h
@@ -61,25 +61,37 @@ public:
static RouterTransactionsMetrics* get(ServiceContext* service);
static RouterTransactionsMetrics* get(OperationContext* opCtx);
- std::int64_t getTotalStarted();
+ std::int64_t getCurrentOpen() const;
+ void incrementCurrentOpen();
+ void decrementCurrentOpen();
+
+ std::int64_t getCurrentActive() const;
+ void incrementCurrentActive();
+ void decrementCurrentActive();
+
+ std::int64_t getCurrentInactive() const;
+ void incrementCurrentInactive();
+ void decrementCurrentInactive();
+
+ std::int64_t getTotalStarted() const;
void incrementTotalStarted();
- std::int64_t getTotalAborted();
+ std::int64_t getTotalAborted() const;
void incrementTotalAborted();
- std::int64_t getTotalCommitted();
+ std::int64_t getTotalCommitted() const;
void incrementTotalCommitted();
- std::int64_t getTotalContactedParticipants();
+ std::int64_t getTotalContactedParticipants() const;
void incrementTotalContactedParticipants();
- std::int64_t getTotalParticipantsAtCommit();
+ std::int64_t getTotalParticipantsAtCommit() const;
void addToTotalParticipantsAtCommit(std::int64_t inc);
- std::int64_t getTotalRequestsTargeted();
+ std::int64_t getTotalRequestsTargeted() const;
void incrementTotalRequestsTargeted();
- const CommitStats& getCommitTypeStats_forTest(TransactionRouter::CommitType commitType);
+ const CommitStats& getCommitTypeStats_forTest(TransactionRouter::CommitType commitType) const;
void incrementCommitInitiated(TransactionRouter::CommitType commitType);
void incrementCommitSuccessful(TransactionRouter::CommitType commitType,
Microseconds durationMicros);
@@ -98,6 +110,15 @@ private:
*/
CommitTypeStats _constructCommitTypeStats(const CommitStats& stats);
+ // Total number of currently open transactions.
+ AtomicWord<std::int64_t> _currentOpen{0};
+
+ // Total number of currently active transactions.
+ AtomicWord<std::int64_t> _currentActive{0};
+
+ // Total number of currently inactive transactions.
+ AtomicWord<std::int64_t> _currentInactive{0};
+
// The total number of multi-document transactions started since the last server startup.
AtomicWord<std::int64_t> _totalStarted{0};
diff --git a/src/mongo/s/router_transactions_stats.idl b/src/mongo/s/router_transactions_stats.idl
index d61d7fa268f..0829d66fe92 100644
--- a/src/mongo/s/router_transactions_stats.idl
+++ b/src/mongo/s/router_transactions_stats.idl
@@ -72,6 +72,15 @@ structs:
command on mongos with information about sharded transactions"
strict: true
fields:
+ currentOpen:
+ type: long
+ default: 0
+ currentActive:
+ type: long
+ default: 0
+ currentInactive:
+ type: long
+ default: 0
totalStarted:
type: long
default: 0
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 36e7290866d..970137187d3 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -232,7 +232,9 @@ void implicitlyAbortAllTransactions(OperationContext* opCtx) {
newOpCtx->setLogicalSessionId(session->getSessionId());
auto txnRouter = TransactionRouter::get(newOpCtx);
- txnRouter.implicitlyAbortTransaction(newOpCtx, shutDownStatus);
+ if (txnRouter.isInitialized()) {
+ txnRouter.implicitlyAbortTransaction(newOpCtx, shutDownStatus);
+ }
}
}
diff --git a/src/mongo/s/session_catalog_router.cpp b/src/mongo/s/session_catalog_router.cpp
index aa4f770bec7..5c35434fe25 100644
--- a/src/mongo/s/session_catalog_router.cpp
+++ b/src/mongo/s/session_catalog_router.cpp
@@ -34,6 +34,7 @@
#include "mongo/s/session_catalog_router.h"
#include "mongo/db/sessions_collection.h"
+#include "mongo/s/transaction_router.h"
namespace mongo {
@@ -68,8 +69,10 @@ int RouterSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx,
}
RouterOperationContextSession::RouterOperationContextSession(OperationContext* opCtx)
- : _operationContextSession(opCtx) {}
+ : _opCtx(opCtx), _operationContextSession(opCtx) {}
-RouterOperationContextSession::~RouterOperationContextSession() = default;
+RouterOperationContextSession::~RouterOperationContextSession() {
+ TransactionRouter::get(_opCtx).stash(_opCtx);
+};
} // namespace mongo
diff --git a/src/mongo/s/session_catalog_router.h b/src/mongo/s/session_catalog_router.h
index 057f9632593..ff54747aeed 100644
--- a/src/mongo/s/session_catalog_router.h
+++ b/src/mongo/s/session_catalog_router.h
@@ -62,6 +62,7 @@ public:
~RouterOperationContextSession();
private:
+ OperationContext* _opCtx;
OperationContextSession _operationContextSession;
};
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 9de327e0d45..5b791ab0270 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -303,29 +303,39 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx,
return;
}
- // Append relevant client metadata.
+ // Append relevant client metadata for transactions with inactive sessions. For those with
+ // active sessions, these fields will already be in the output.
- builder->append("type", sessionIsActive ? "activeSession" : "idleSession");
- builder->append("host", getHostNameCachedAndPort());
- builder->append("desc", sessionIsActive ? "active transaction" : "inactive transaction");
+ if (!sessionIsActive) {
+ builder->append("type", "idleSession");
+ builder->append("host", getHostNameCachedAndPort());
+ builder->append("desc", "inactive transaction");
- const auto& lastClientInfo = o().lastClientInfo;
- builder->append("client", lastClientInfo.clientHostAndPort);
- builder->append("connectionId", lastClientInfo.connectionId);
- builder->append("appName", lastClientInfo.appName);
- builder->append("clientMetadata", lastClientInfo.clientMetadata);
+ const auto& lastClientInfo = o().lastClientInfo;
+ builder->append("client", lastClientInfo.clientHostAndPort);
+ builder->append("connectionId", lastClientInfo.connectionId);
+ builder->append("appName", lastClientInfo.appName);
+ builder->append("clientMetadata", lastClientInfo.clientMetadata);
- // Append session and transaction metadata.
+ {
+ BSONObjBuilder lsid(builder->subobjStart("lsid"));
+ _sessionId().serialize(&lsid);
+ }
- {
- BSONObjBuilder lsid(builder->subobjStart("lsid"));
- _sessionId().serialize(&lsid);
+ builder->append("active", sessionIsActive);
}
- BSONObjBuilder transactionBuilder(builder->subobjStart("transaction"));
+ // Append current transaction info.
+
+ BSONObjBuilder transactionBuilder;
+ _reportTransactionState(opCtx, &transactionBuilder);
+ builder->append("transaction", transactionBuilder.obj());
+}
+void TransactionRouter::Observer::_reportTransactionState(OperationContext* opCtx,
+ BSONObjBuilder* builder) const {
{
- BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters"));
+ BSONObjBuilder parametersBuilder(builder->subobjStart("parameters"));
parametersBuilder.append("txnNumber", o().txnNumber);
parametersBuilder.append("autocommit", false);
if (!o().readConcernArgs.isEmpty()) {
@@ -333,13 +343,11 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx,
}
}
- // Append current transaction info.
-
if (_atClusterTimeHasBeenSet()) {
builder->append("globalReadTimestamp", o().atClusterTime->getTime().asTimestamp());
}
- const auto& timingStats = o().timingStats;
+ const auto& timingStats = o().metricsTracker->getTimingStats();
builder->append("startWallClockTime", dateToISOStringLocal(timingStats.startWallClockTime));
@@ -349,9 +357,13 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx,
builder->append("timeOpenMicros",
durationCount<Microseconds>(timingStats.getDuration(tickSource, curTicks)));
- // TODO SERVER-41376: Log timeActiveMicros
+ builder->append(
+ "timeActiveMicros",
+ durationCount<Microseconds>(timingStats.getTimeActiveMicros(tickSource, curTicks)));
- // TODO SERVER-41376: Log timeInactiveMicros
+ builder->append(
+ "timeInactiveMicros",
+ durationCount<Microseconds>(timingStats.getTimeInactiveMicros(tickSource, curTicks)));
int numReadOnlyParticipants = 0;
int numNonReadOnlyParticipants = 0;
@@ -376,21 +388,17 @@ void TransactionRouter::Observer::_reportState(OperationContext* opCtx,
participantsArrayBuilder.append(participantBuilder.obj());
}
- transactionBuilder.appendArray("participants", participantsArrayBuilder.obj());
+ builder->appendArray("participants", participantsArrayBuilder.obj());
}
- if (o().commitType != CommitType::kNotInitiated) {
- transactionBuilder.append("commitStartWallClockTime",
- dateToISOStringLocal(timingStats.commitStartWallClockTime));
- transactionBuilder.append("commitType", commitTypeToString(o().commitType));
+ if (o().metricsTracker->commitHasStarted()) {
+ builder->append("commitStartWallClockTime",
+ dateToISOStringLocal(timingStats.commitStartWallClockTime));
+ builder->append("commitType", commitTypeToString(o().commitType));
}
- transactionBuilder.append("numReadOnlyParticipants", numReadOnlyParticipants);
- transactionBuilder.append("numNonReadOnlyParticipants", numNonReadOnlyParticipants);
-
- transactionBuilder.done();
-
- builder->append("active", sessionIsActive);
+ builder->append("numReadOnlyParticipants", numReadOnlyParticipants);
+ builder->append("numNonReadOnlyParticipants", numNonReadOnlyParticipants);
}
bool TransactionRouter::Observer::_atClusterTimeHasBeenSet() const {
@@ -851,10 +859,12 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
repl::ReadConcernArgs::get(opCtx) = o().readConcernArgs;
++p().latestStmtId;
+ _onContinue(opCtx);
break;
}
case TransactionActions::kCommit:
++p().latestStmtId;
+ _onContinue(opCtx);
break;
}
} else if (txnNumber > o().txnNumber) {
@@ -882,7 +892,6 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
o(lk).atClusterTime.emplace();
}
- _onNewTransaction(opCtx);
LOG(3) << txnIdToString() << " New transaction started";
break;
}
@@ -898,7 +907,6 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
// means that the client is attempting to recover a commit decision.
p().isRecoveringCommit = true;
- _onBeginRecoveringDecision(opCtx);
LOG(3) << txnIdToString() << " Commit recovery started";
break;
}
@@ -908,6 +916,16 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
_updateLastClientInfo(opCtx->getClient());
}
+void TransactionRouter::Router::stash(OperationContext* opCtx) {
+ if (!isInitialized()) {
+ return;
+ }
+
+ auto tickSource = opCtx->getServiceContext()->getTickSource();
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).metricsTracker->trySetInactive(tickSource, tickSource->getTicks());
+}
+
BSONObj TransactionRouter::Router::_handOffCommitToCoordinator(OperationContext* opCtx) {
invariant(o().coordinatorId);
auto coordinatorIter = o().participants.find(*o().coordinatorId);
@@ -946,6 +964,8 @@ BSONObj TransactionRouter::Router::_handOffCommitToCoordinator(OperationContext*
BSONObj TransactionRouter::Router::commitTransaction(
OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken) {
+ invariant(isInitialized());
+
p().terminationInitiated = true;
auto commitRes = _commitTransaction(opCtx, recoveryToken);
@@ -1080,7 +1100,11 @@ BSONObj TransactionRouter::Router::_commitTransaction(
}
BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) {
- _onExplicitAbort(opCtx);
+ invariant(isInitialized());
+
+ // Update stats on scope exit so the transaction is considered "active" while waiting on abort
+ // responses.
+ auto updateStatsGuard = makeGuard([&] { _onExplicitAbort(opCtx); });
// The router has yet to send any commands to a remote shard for this transaction.
// Return the same error that would have been returned by a shard.
@@ -1132,6 +1156,8 @@ BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) {
void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opCtx,
const Status& errorStatus) {
+ invariant(isInitialized());
+
if (o().commitType == CommitType::kTwoPhaseCommit ||
o().commitType == CommitType::kRecoverWithToken) {
LOG(3) << txnIdToString()
@@ -1140,7 +1166,9 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC
return;
}
- _onImplicitAbort(opCtx, errorStatus);
+ // Update stats on scope exit so the transaction is considered "active" while waiting on abort
+ // responses.
+ auto updateStatsGuard = makeGuard([&] { _onImplicitAbort(opCtx, errorStatus); });
if (o().participants.empty()) {
return;
@@ -1204,13 +1232,11 @@ void TransactionRouter::Router::_resetRouterState(OperationContext* opCtx,
o(lk).readConcernArgs = {};
o(lk).atClusterTime.reset();
o(lk).abortCause = std::string();
- o(lk).timingStats = TimingStats();
+ o(lk).metricsTracker.emplace(opCtx->getServiceContext());
p().terminationInitiated = false;
auto tickSource = opCtx->getServiceContext()->getTickSource();
- o(lk).timingStats.startTime = tickSource->getTicks();
- o(lk).timingStats.startWallClockTime =
- opCtx->getServiceContext()->getPreciseClockSource()->now();
+ o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks());
// TODO SERVER-37115: Parse statement ids from the client and remember the statement id
// of the command that started the transaction, if one was included.
@@ -1283,7 +1309,7 @@ std::string TransactionRouter::Router::_transactionInfoForLog(
}
if (o().commitType == CommitType::kTwoPhaseCommit) {
- invariant(o().coordinatorId);
+ dassert(o().coordinatorId);
sb << " coordinator:" << *o().coordinatorId << ",";
}
@@ -1293,49 +1319,44 @@ std::string TransactionRouter::Router::_transactionInfoForLog(
if (terminationCause == TerminationCause::kCommitted) {
sb << " terminationCause:committed,";
- invariant(o().commitType != CommitType::kNotInitiated);
- invariant(o().abortCause.empty());
+ dassert(o().metricsTracker->commitHasStarted());
+ dassert(o().commitType != CommitType::kNotInitiated);
+ dassert(o().abortCause.empty());
} else {
sb << " terminationCause:aborted,";
- invariant(!o().abortCause.empty());
+ dassert(!o().abortCause.empty());
sb << " abortCause:" << o().abortCause << ",";
-
- // TODO SERVER-40985: Log abortSource
}
- if (o().commitType != CommitType::kNotInitiated) {
+ const auto& timingStats = o().metricsTracker->getTimingStats();
+
+ if (o().metricsTracker->commitHasStarted()) {
+ dassert(o().commitType != CommitType::kNotInitiated);
sb << " commitType:" << commitTypeToString(o().commitType) << ",";
sb << " commitDurationMicros:"
- << durationCount<Microseconds>(o().timingStats.getCommitDuration(tickSource, curTicks))
+ << durationCount<Microseconds>(timingStats.getCommitDuration(tickSource, curTicks))
<< ",";
}
- // TODO SERVER-41376: Log timeActiveMicros
+ sb << " timeActiveMicros:"
+ << durationCount<Microseconds>(timingStats.getTimeActiveMicros(tickSource, curTicks)) << ",";
- // TODO SERVER-41376: Log timeInactiveMicros
+ sb << " timeInactiveMicros:"
+ << durationCount<Microseconds>(timingStats.getTimeInactiveMicros(tickSource, curTicks))
+ << ",";
// Total duration of the transaction. Logged at the end of the line for consistency with slow
// command logging.
- sb << " " << duration_cast<Milliseconds>(o().timingStats.getDuration(tickSource, curTicks));
+ sb << " " << duration_cast<Milliseconds>(timingStats.getDuration(tickSource, curTicks));
return sb.str();
}
-void TransactionRouter::Router::_onNewTransaction(OperationContext* opCtx) {
- auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
- routerTxnMetrics->incrementTotalStarted();
-}
-
-void TransactionRouter::Router::_onBeginRecoveringDecision(OperationContext* opCtx) {
- auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
- routerTxnMetrics->incrementTotalStarted();
-}
-
void TransactionRouter::Router::_onImplicitAbort(OperationContext* opCtx,
const Status& errorStatus) {
- if (o().commitType != CommitType::kNotInitiated && o().timingStats.endTime == 0) {
+ if (o().metricsTracker->commitHasStarted() && !o().metricsTracker->isTrackingOver()) {
// If commit was started but an end time wasn't set, then we don't know the commit result
// and can't consider the transaction over until the client retries commit and definitively
// learns the result. Note that this behavior may lead to no logging in some cases, but
@@ -1368,23 +1389,13 @@ void TransactionRouter::Router::_onExplicitAbort(OperationContext* opCtx) {
void TransactionRouter::Router::_onStartCommit(WithLock wl, OperationContext* opCtx) {
invariant(o().commitType != CommitType::kNotInitiated);
- if (o().timingStats.commitStartTime != 0) {
+ if (o().metricsTracker->commitHasStarted() || o().metricsTracker->isTrackingOver()) {
return;
}
auto tickSource = opCtx->getServiceContext()->getTickSource();
- {
- o(wl).timingStats.commitStartTime = tickSource->getTicks();
- o(wl).timingStats.commitStartWallClockTime =
- opCtx->getServiceContext()->getPreciseClockSource()->now();
- }
-
- auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
- routerTxnMetrics->incrementCommitInitiated(o().commitType);
- if (o().commitType != CommitType::kRecoverWithToken) {
- // We only know the participant list if we're not recovering a decision.
- routerTxnMetrics->addToTotalParticipantsAtCommit(o().participants.size());
- }
+ o(wl).metricsTracker->startCommit(
+ tickSource, tickSource->getTicks(), o().commitType, o().participants.size());
}
void TransactionRouter::Router::_onNonRetryableCommitError(OperationContext* opCtx,
@@ -1398,13 +1409,20 @@ void TransactionRouter::Router::_onNonRetryableCommitError(OperationContext* opC
_endTransactionTrackingIfNecessary(opCtx, TerminationCause::kAborted);
}
+void TransactionRouter::Router::_onContinue(OperationContext* opCtx) {
+ auto tickSource = opCtx->getServiceContext()->getTickSource();
+
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks());
+}
+
void TransactionRouter::Router::_onSuccessfulCommit(OperationContext* opCtx) {
_endTransactionTrackingIfNecessary(opCtx, TerminationCause::kCommitted);
}
void TransactionRouter::Router::_endTransactionTrackingIfNecessary(
OperationContext* opCtx, TerminationCause terminationCause) {
- if (o().timingStats.endTime != 0) {
+ if (o().metricsTracker->isTrackingOver()) {
// If the transaction was already ended, don't end it again.
return;
}
@@ -1414,30 +1432,21 @@ void TransactionRouter::Router::_endTransactionTrackingIfNecessary(
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
- o(lk).timingStats.endTime = curTicks;
- // If startTime hasn't been set yet, that probably means it run into an error and is
- // getting aborted.
- if (o().timingStats.startTime == 0) {
- o(lk).timingStats.startTime = curTicks;
- }
+ // In some error contexts, the transaction may not have been started yet, so try setting the
+ // transaction's timing stats to active before ending it below. This is a no-op for already
+ // active transactions.
+ o(lk).metricsTracker->trySetActive(tickSource, curTicks);
+
+ o(lk).metricsTracker->endTransaction(
+ tickSource, curTicks, terminationCause, o().commitType, o().abortCause);
}
+ const auto& timingStats = o().metricsTracker->getTimingStats();
if (shouldLog(logger::LogComponent::kTransaction, logger::LogSeverity::Debug(1)) ||
- o().timingStats.getDuration(tickSource, curTicks) >
- Milliseconds(serverGlobalParams.slowMS)) {
+ timingStats.getDuration(tickSource, curTicks) > Milliseconds(serverGlobalParams.slowMS)) {
_logSlowTransaction(opCtx, terminationCause);
}
-
- auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
- if (terminationCause == TerminationCause::kAborted) {
- routerTxnMetrics->incrementTotalAborted();
- routerTxnMetrics->incrementAbortCauseMap(o().abortCause);
- } else {
- routerTxnMetrics->incrementTotalCommitted();
- routerTxnMetrics->incrementCommitSuccessful(
- o().commitType, o().timingStats.getCommitDuration(tickSource, curTicks));
- }
}
void TransactionRouter::Router::_updateLastClientInfo(Client* client) {
@@ -1447,7 +1456,7 @@ void TransactionRouter::Router::_updateLastClientInfo(Client* client) {
Microseconds TransactionRouter::TimingStats::getDuration(TickSource* tickSource,
TickSource::Tick curTicks) const {
- invariant(startTime > 0);
+ dassert(startTime > 0);
// If the transaction hasn't ended, return how long it has been running for.
if (endTime == 0) {
@@ -1458,7 +1467,7 @@ Microseconds TransactionRouter::TimingStats::getDuration(TickSource* tickSource,
Microseconds TransactionRouter::TimingStats::getCommitDuration(TickSource* tickSource,
TickSource::Tick curTicks) const {
- invariant(commitStartTime > 0);
+ dassert(commitStartTime > 0);
// If the transaction hasn't ended, return how long commit has been running for.
if (endTime == 0) {
@@ -1467,4 +1476,133 @@ Microseconds TransactionRouter::TimingStats::getCommitDuration(TickSource* tickS
return tickSource->ticksTo<Microseconds>(endTime - commitStartTime);
}
+Microseconds TransactionRouter::TimingStats::getTimeActiveMicros(TickSource* tickSource,
+ TickSource::Tick curTicks) const {
+ dassert(startTime > 0);
+
+ if (lastTimeActiveStart != 0) {
+ // The transaction is currently active, so return the active time so far plus the time since
+ // the transaction became active.
+ return timeActiveMicros + tickSource->ticksTo<Microseconds>(curTicks - lastTimeActiveStart);
+ }
+ return timeActiveMicros;
+}
+
+Microseconds TransactionRouter::TimingStats::getTimeInactiveMicros(
+ TickSource* tickSource, TickSource::Tick curTicks) const {
+ dassert(startTime > 0);
+
+ auto micros = getDuration(tickSource, curTicks) - getTimeActiveMicros(tickSource, curTicks);
+ dassert(micros >= Microseconds(0),
+ str::stream() << "timeInactiveMicros should never be negative, was: " << micros);
+ return micros;
+}
+
+TransactionRouter::MetricsTracker::~MetricsTracker() {
+ // If there was an in-progress transaction, clean up its stats. This may happen if a transaction
+ // is overriden by a higher txnNumber or its session is reaped.
+
+ if (hasStarted() && !isTrackingOver()) {
+ // A transaction was started but not ended, so clean up the appropriate stats for it.
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(_service);
+ routerTxnMetrics->decrementCurrentOpen();
+
+ if (!isActive()) {
+ routerTxnMetrics->decrementCurrentInactive();
+ } else {
+ routerTxnMetrics->decrementCurrentActive();
+ }
+ }
+}
+
+void TransactionRouter::MetricsTracker::trySetActive(TickSource* tickSource,
+ TickSource::Tick curTicks) {
+ if (isTrackingOver() || isActive()) {
+ // A transaction can't become active if it has already ended or is already active.
+ return;
+ }
+
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(_service);
+ if (!hasStarted()) {
+ // If the transaction is becoming active for the first time, also set the transaction's
+ // start time.
+ timingStats.startTime = curTicks;
+ timingStats.startWallClockTime = _service->getPreciseClockSource()->now();
+
+ routerTxnMetrics->incrementCurrentOpen();
+ routerTxnMetrics->incrementTotalStarted();
+ } else {
+ // The transaction was already open, so it must have been inactive.
+ routerTxnMetrics->decrementCurrentInactive();
+ }
+
+ timingStats.lastTimeActiveStart = curTicks;
+ routerTxnMetrics->incrementCurrentActive();
+}
+
+void TransactionRouter::MetricsTracker::trySetInactive(TickSource* tickSource,
+ TickSource::Tick curTicks) {
+ if (isTrackingOver() || !isActive()) {
+ // If the transaction is already over or the router has already been stashed, the relevant
+ // stats should have been updated earlier. In certain error scenarios, it's possible for a
+ // transaction to be stashed twice in a row.
+ return;
+ }
+
+ timingStats.timeActiveMicros +=
+ tickSource->ticksTo<Microseconds>(curTicks - timingStats.lastTimeActiveStart);
+ timingStats.lastTimeActiveStart = 0;
+
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(_service);
+ routerTxnMetrics->decrementCurrentActive();
+ routerTxnMetrics->incrementCurrentInactive();
+}
+
+void TransactionRouter::MetricsTracker::startCommit(TickSource* tickSource,
+ TickSource::Tick curTicks,
+ TransactionRouter::CommitType commitType,
+ std::size_t numParticipantsAtCommit) {
+ dassert(isActive());
+
+ timingStats.commitStartTime = tickSource->getTicks();
+ timingStats.commitStartWallClockTime = _service->getPreciseClockSource()->now();
+
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(_service);
+ routerTxnMetrics->incrementCommitInitiated(commitType);
+ if (commitType != CommitType::kRecoverWithToken) {
+ // We only know the participant list if we're not recovering a decision.
+ routerTxnMetrics->addToTotalParticipantsAtCommit(numParticipantsAtCommit);
+ }
+}
+
+void TransactionRouter::MetricsTracker::endTransaction(
+ TickSource* tickSource,
+ TickSource::Tick curTicks,
+ TransactionRouter::TerminationCause terminationCause,
+ TransactionRouter::CommitType commitType,
+ StringData abortCause) {
+ dassert(isActive());
+
+ timingStats.timeActiveMicros +=
+ tickSource->ticksTo<Microseconds>(curTicks - timingStats.lastTimeActiveStart);
+ timingStats.lastTimeActiveStart = 0;
+
+ timingStats.endTime = curTicks;
+
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(_service);
+ routerTxnMetrics->decrementCurrentOpen();
+ routerTxnMetrics->decrementCurrentActive();
+
+ if (terminationCause == TerminationCause::kAborted) {
+ dassert(!abortCause.empty());
+ routerTxnMetrics->incrementTotalAborted();
+ routerTxnMetrics->incrementAbortCauseMap(abortCause.toString());
+ } else {
+ dassert(commitType != CommitType::kNotInitiated);
+ routerTxnMetrics->incrementTotalCommitted();
+ routerTxnMetrics->incrementCommitSuccessful(
+ commitType, timingStats.getCommitDuration(tickSource, curTicks));
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index 3f1e45d5fa3..16c0c15d67e 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -134,23 +134,43 @@ public:
*/
Microseconds getCommitDuration(TickSource* tickSource, TickSource::Tick curTicks) const;
+ /**
+ * Returns the total active time of the transaction, given the current time value. A
+ * transaction is active when there is a running operation that is part of the transaction.
+ */
+ Microseconds getTimeActiveMicros(TickSource* tickSource, TickSource::Tick curTicks) const;
+
+ /**
+ * Returns the total inactive time of the transaction, given the current time value. A
+ * transaction is inactive when it is idly waiting for a new operation to occur.
+ */
+ Microseconds getTimeInactiveMicros(TickSource* tickSource, TickSource::Tick curTicks) const;
+
// The start time of the transaction in millisecond resolution. Used only for diagnostics
// reporting.
Date_t startWallClockTime;
// The start time of the transaction. Note that tick values should only ever be used to
- // measure distance from other tick values, not for reporting absolute wall clock time.
+ // measure distance from other tick values, not for reporting absolute wall clock time. A
+ // value of zero means the transaction hasn't started yet.
TickSource::Tick startTime{0};
// The start time of the transaction commit in millisecond resolution. Used only for
// diagnostics reporting.
Date_t commitStartWallClockTime;
- // When commit was started.
+ // When commit was started. A value of zero means the commit hasn't started yet.
TickSource::Tick commitStartTime{0};
- // The end time of the transaction.
+ // The end time of the transaction. A value of zero means the transaction hasn't ended yet.
TickSource::Tick endTime{0};
+
+ // The total amount of active time spent by the transaction.
+ Microseconds timeActiveMicros = Microseconds{0};
+
+ // The time at which the transaction was last marked as active. The transaction is
+ // considered active if this value is not equal to 0.
+ TickSource::Tick lastTimeActiveStart{0};
};
enum class TransactionActions { kStart, kContinue, kCommit };
@@ -162,6 +182,83 @@ public:
};
/**
+ * Helper class responsible for updating per transaction and router wide transaction metrics on
+ * certain transaction events.
+ */
+ class MetricsTracker {
+ public:
+ MetricsTracker(ServiceContext* service) : _service(service) {}
+ MetricsTracker(const MetricsTracker&) = delete;
+ MetricsTracker& operator=(const MetricsTracker&) = delete;
+ MetricsTracker(MetricsTracker&&) = delete;
+ MetricsTracker& operator=(MetricsTracker&&) = delete;
+ ~MetricsTracker();
+
+ bool isTrackingOver() const {
+ return timingStats.endTime != 0;
+ }
+
+ bool hasStarted() const {
+ return timingStats.startTime != 0;
+ }
+
+ bool isActive() const {
+ return timingStats.lastTimeActiveStart != 0;
+ }
+
+ bool commitHasStarted() const {
+ return timingStats.commitStartTime != 0;
+ }
+
+ const auto& getTimingStats() const {
+ return timingStats;
+ }
+
+ /**
+ * Marks the transaction as active and sets the start of the transaction's active time and
+ * overall start time the first time it is called.
+ *
+ * This method is a no-op if the transaction is not currently inactive or has already ended.
+ */
+ void trySetActive(TickSource* tickSource, TickSource::Tick curTicks);
+
+ /**
+ * Marks the transaction as inactive, sets the total active time of the transaction, and
+ * updates relevant server status counters.
+ *
+ * This method is a no-op if the transaction is not currently active or has already ended.
+ */
+ void trySetInactive(TickSource* tickSource, TickSource::Tick curTicks);
+
+ /**
+ * Marks the transaction as having begun commit, updating relevent stats. Assumes the
+ * transaction is currently active.
+ */
+ void startCommit(TickSource* tickSource,
+ TickSource::Tick curTicks,
+ TransactionRouter::CommitType commitType,
+ std::size_t numParticipantsAtCommit);
+
+ /**
+ * Marks the transaction as over, updating stats based on the termination cause, which is
+ * either commit or abort.
+ */
+ void endTransaction(TickSource* tickSource,
+ TickSource::Tick curTicks,
+ TransactionRouter::TerminationCause terminationCause,
+ TransactionRouter::CommitType commitType,
+ StringData abortCause);
+
+ private:
+ // Pointer to the service context used to get the tick source and router wide transaction
+ // metrics decorations.
+ ServiceContext* const _service;
+
+ // Stats used for calculating durations for the active transaction.
+ TransactionRouter::TimingStats timingStats;
+ };
+
+ /**
* Encapsulates the logic around selecting a global read timestamp for a sharded transaction at
* snapshot level read concern.
*
@@ -218,6 +315,13 @@ public:
BSONObjBuilder* builder,
bool sessionIsActive) const;
+ /**
+ * Returns if the router has received at least one request for a transaction.
+ */
+ auto isInitialized() {
+ return o().txnNumber != kUninitializedTxnNumber;
+ }
+
protected:
explicit Observer(TransactionRouter* tr) : _tr(tr) {}
@@ -230,6 +334,10 @@ public:
BSONObjBuilder* builder,
bool sessionIsActive) const;
+ // Reports the 'transaction' state of this transaction for currentOp using the provided
+ // builder.
+ void _reportTransactionState(OperationContext* opCtx, BSONObjBuilder* builder) const;
+
// Returns true if the atClusterTime has been changed from the default uninitialized value.
bool _atClusterTimeHasBeenSet() const;
@@ -260,6 +368,11 @@ public:
TransactionActions action);
/**
+ * Updates transaction diagnostics when the transaction's session is checked in.
+ */
+ void stash(OperationContext* opCtx);
+
+ /**
* Attaches the required transaction related fields for a request to be sent to the given
* shard.
*
@@ -408,8 +521,9 @@ public:
/**
* Returns a copy of the timing stats of the transaction router's active transaction.
*/
- const TimingStats& getTimingStats() const {
- return o().timingStats;
+ const auto& getTimingStats_forTest() const {
+ invariant(o().metricsTracker);
+ return o().metricsTracker->getTimingStats();
}
private:
@@ -480,17 +594,6 @@ public:
const Participant::ReadOnly readOnly);
/**
- * Updates relevant metrics when a new transaction is begun.
- */
- void _onNewTransaction(OperationContext* opCtx);
-
- /**
- * Updates relevant metrics when a router receives commit for a higher txnNumber than it has
- * seen so far.
- */
- void _onBeginRecoveringDecision(OperationContext* opCtx);
-
- /**
* Updates relevant metrics when the router receives an explicit abort from the client.
*/
void _onExplicitAbort(OperationContext* opCtx);
@@ -517,6 +620,11 @@ public:
void _onNonRetryableCommitError(OperationContext* opCtx, Status commitStatus);
/**
+ * Updates relevent metrics when a transaction is continued.
+ */
+ void _onContinue(OperationContext* opCtx);
+
+ /**
* The first time this method is called it marks the transaction as over in the router's
* diagnostics and will log transaction information if its duration is over the global
* slowMS threshold or the transaction log componenet verbosity >= 1. Only meant to be
@@ -609,12 +717,14 @@ private:
// code that led to an implicit abort or "abort" if the client sent abortTransaction.
std::string abortCause;
- // Stats used for calculating durations for the active transaction.
- TimingStats timingStats;
-
// Information about the last client to run a transaction operation on this transaction
// router.
SingleTransactionStats::LastClientInfo lastClientInfo;
+
+ // Class responsible for updating per transaction and router wide transaction metrics on
+ // certain transaction events. Unset until the transaction router has processed at least one
+ // transaction command.
+ boost::optional<MetricsTracker> metricsTracker;
} _o;
/**
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 810887f7f34..a02b6126c5c 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -39,6 +39,7 @@
#include "mongo/db/logical_clock.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/logger/logger.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/type_shard.h"
@@ -2794,6 +2795,9 @@ protected:
const TxnNumber kTxnNumber = 10;
const TxnRecoveryToken kDummyRecoveryToken;
+ static constexpr auto kDefaultTimeActive = Microseconds(50);
+ static constexpr auto kDefaultTimeInactive = Microseconds(100);
+
void setUp() override {
TransactionRouterTestWithDefaultSession::setUp();
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
@@ -2842,12 +2846,12 @@ protected:
}
void assertDurationIs(Microseconds micros) {
- auto stats = txnRouter().getTimingStats();
+ auto stats = txnRouter().getTimingStats_forTest();
ASSERT_EQ(stats.getDuration(tickSource(), tickSource()->getTicks()), micros);
}
void assertCommitDurationIs(Microseconds micros) {
- auto stats = txnRouter().getTimingStats();
+ auto stats = txnRouter().getTimingStats_forTest();
ASSERT_EQ(stats.getCommitDuration(tickSource(), tickSource()->getTicks()), micros);
}
@@ -2856,6 +2860,16 @@ protected:
return guard->hasReadyRequests();
}
+ void assertTimeActiveIs(Microseconds micros) {
+ auto stats = txnRouter().getTimingStats_forTest();
+ ASSERT_EQ(stats.getTimeActiveMicros(tickSource(), tickSource()->getTicks()), micros);
+ }
+
+ void assertTimeInactiveIs(Microseconds micros) {
+ auto stats = txnRouter().getTimingStats_forTest();
+ ASSERT_EQ(stats.getTimeInactiveMicros(tickSource(), tickSource()->getTicks()), micros);
+ }
+
//
// Helpers for each way a router's transaction may terminate. Meant to be used where the
// particular commit type is not being tested.
@@ -3051,6 +3065,31 @@ protected:
auto routerTxnMetrics() {
return RouterTransactionsMetrics::get(operationContext());
}
+
+ void assertTimeActiveAndInactiveCannotAdvance(Microseconds timeActive,
+ Microseconds timeInactive) {
+ tickSource()->advance(Microseconds(150));
+ assertTimeActiveIs(Microseconds(timeActive));
+ assertTimeInactiveIs(Microseconds(timeInactive));
+
+ txnRouter().stash(operationContext());
+
+ tickSource()->advance(Microseconds(150));
+ assertTimeActiveIs(Microseconds(timeActive));
+ assertTimeInactiveIs(Microseconds(timeInactive));
+ }
+
+ void setUpDefaultTimeActiveAndInactive() {
+ tickSource()->advance(kDefaultTimeActive);
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(Microseconds(0));
+
+ txnRouter().stash(operationContext());
+
+ tickSource()->advance(kDefaultTimeInactive);
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(kDefaultTimeInactive);
+ }
};
//
@@ -3132,6 +3171,23 @@ TEST_F(TransactionRouterMetricsTest, SlowLoggingPrintsDurationAtEnd) {
ASSERT_EQUALS(1, countLogLinesContaining(" 111ms\n") + countLogLinesContaining(" 111ms\r\n"));
}
+TEST_F(TransactionRouterMetricsTest, SlowLoggingPrintsTimeActiveAndInactive) {
+ beginTxnWithDefaultTxnNumber();
+ tickSource()->advance(Microseconds(111));
+ assertTimeActiveIs(Microseconds(111));
+
+ txnRouter().stash(operationContext());
+ tickSource()->advance(Microseconds(222));
+ assertTimeInactiveIs(Microseconds(222));
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ runCommit(kDummyOkRes);
+
+ ASSERT_EQUALS(1, countLogLinesContaining("timeActiveMicros:111,"));
+ ASSERT_EQUALS(1, countLogLinesContaining("timeInactiveMicros:222,"));
+}
+
//
// Slow transaction logging tests for the parameters that depend on the read concern level.
//
@@ -3386,6 +3442,23 @@ TEST_F(TransactionRouterMetricsTest, NoSlowLoggingOnImplicitAbortAfterUnknownCom
assertDidNotPrintSlowLogLine();
}
+TEST_F(TransactionRouterMetricsTest, NoSlowLoggingCommitAfterAbort_Failed) {
+ beginSlowTxnWithDefaultTxnNumber();
+ implicitAbortInProgress();
+
+ runCommit(kDummyErrorRes);
+ assertDidNotPrintSlowLogLine();
+}
+
+TEST_F(TransactionRouterMetricsTest, NoSlowLoggingCommitAfterAbort_Successful) {
+ beginSlowTxnWithDefaultTxnNumber();
+ implicitAbortInProgress();
+
+ // Note that this shouldn't be possible, but is included as a test case for completeness.
+ runCommit(kDummyOkRes);
+ assertDidNotPrintSlowLogLine();
+}
+
//
// Slow transaction logging tests that retrying after an unknown commit result logs if the result is
// discovered.
@@ -3587,6 +3660,51 @@ TEST_F(TransactionRouterMetricsTest, CommitDurationDoesNotAdvanceAfterFailedComm
assertCommitDurationIs(Microseconds(50));
}
+TEST_F(TransactionRouterMetricsTest, CommitStatsNotInReportStatsForFailedCommitAfterAbort) {
+ // It's a user error to commit a transaction after a failed command or explicit abort, but if it
+ // happens, stats for the commit should not be tracked or included in reporting.
+ beginTxnWithDefaultTxnNumber();
+ txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus);
+
+ // A commit in this case should always fail.
+ auto future = beginAndPauseCommit();
+ tickSource()->advance(Microseconds(50));
+ expectCommitTransaction(kDummyErrorRes);
+ future.default_timed_get();
+
+ auto activeState = txnRouter().reportState(operationContext(), true /* sessionIsActive */);
+ ASSERT(activeState.hasField("transaction"));
+ ASSERT(!activeState["transaction"].Obj().hasField("commitStartWallClockTime"));
+ ASSERT(!activeState["transaction"].Obj().hasField("commitType"));
+
+ auto inactiveState = txnRouter().reportState(operationContext(), false /* sessionIsActive */);
+ ASSERT(inactiveState.hasField("transaction"));
+ ASSERT(!inactiveState["transaction"].Obj().hasField("commitStartWallClockTime"));
+ ASSERT(!inactiveState["transaction"].Obj().hasField("commitType"));
+}
+
+TEST_F(TransactionRouterMetricsTest, CommitStatsNotInReportStatsForSuccessfulCommitAfterAbort) {
+ beginTxnWithDefaultTxnNumber();
+ txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus);
+
+ // Note that it shouldn't be possible for this commit to succeed, but it is included as a test
+ // case for completeness.
+ auto future = beginAndPauseCommit();
+ tickSource()->advance(Microseconds(50));
+ expectCommitTransaction(kDummyOkRes);
+ future.default_timed_get();
+
+ auto activeState = txnRouter().reportState(operationContext(), true /* sessionIsActive */);
+ ASSERT(activeState.hasField("transaction"));
+ ASSERT(!activeState["transaction"].Obj().hasField("commitStartWallClockTime"));
+ ASSERT(!activeState["transaction"].Obj().hasField("commitType"));
+
+ auto inactiveState = txnRouter().reportState(operationContext(), false /* sessionIsActive */);
+ ASSERT(inactiveState.hasField("transaction"));
+ ASSERT(!inactiveState["transaction"].Obj().hasField("commitStartWallClockTime"));
+ ASSERT(!inactiveState["transaction"].Obj().hasField("commitType"));
+}
+
TEST_F(TransactionRouterMetricsTest, DurationsAdvanceAfterUnknownCommitResult) {
beginTxnWithDefaultTxnNumber();
@@ -3615,6 +3733,520 @@ TEST_F(TransactionRouterMetricsTest, DurationsAdvanceAfterUnknownCommitResult) {
assertCommitDurationIs(Microseconds(200));
}
+TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceSeparatelyAndSumToDuration) {
+ beginTxnWithDefaultTxnNumber();
+
+ // Both timeActive and timeInactive start at 0.
+ assertTimeActiveIs(Microseconds(0));
+ assertTimeInactiveIs(Microseconds(0));
+ assertDurationIs(Microseconds(0));
+
+ // Only timeActive will advance while a txn is active.
+ tickSource()->advance(Microseconds(50));
+ assertTimeActiveIs(Microseconds(50));
+ assertTimeInactiveIs(Microseconds(0));
+ assertDurationIs(Microseconds(50));
+
+ // Only timeInactive will advance while a txn is stashed.
+ txnRouter().stash(operationContext());
+ tickSource()->advance(Microseconds(100));
+ assertTimeActiveIs(Microseconds(50));
+ assertTimeInactiveIs(Microseconds(100));
+ assertDurationIs(Microseconds(150));
+
+ // Will not advance after commit.
+ // Neither can advance after a successful commit.
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ runCommit(kDummyOkRes);
+
+ tickSource()->advance(Microseconds(150));
+ assertTimeActiveIs(Microseconds(50));
+ assertTimeInactiveIs(Microseconds(100));
+ assertDurationIs(Microseconds(150));
+}
+
+TEST_F(TransactionRouterMetricsTest, StashIsIdempotent) {
+ // An error after checking out a session and before continuing a transaction can lead to
+ // back-to-back calls to TransactionRouter::stash(), so a repeated call to stash() shouldn't
+ // toggle the transaction back to the active state.
+
+ beginTxnWithDefaultTxnNumber();
+
+ setUpDefaultTimeActiveAndInactive();
+
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(kDefaultTimeInactive);
+
+ txnRouter().stash(operationContext());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
+
+ // Only timeInactive can advance.
+ tickSource()->advance(Microseconds(100));
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100));
+
+ txnRouter().stash(operationContext());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
+
+ // Still only timeInactive can advance.
+ tickSource()->advance(Microseconds(100));
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(200));
+}
+
+TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedStashedTxn) {
+ beginTxnWithDefaultTxnNumber();
+
+ setUpDefaultTimeActiveAndInactive();
+
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(kDefaultTimeInactive);
+ assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive);
+
+ txnRouter().stash(operationContext());
+
+ // At shutdown transactions are implicitly aborted without being continued so a transaction may
+ // be stashed when aborting, which should still lead to durations in a consistent state.
+ txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus);
+
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(kDefaultTimeInactive);
+ assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive);
+}
+
+TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedActiveTxn) {
+ beginTxnWithDefaultTxnNumber();
+
+ setUpDefaultTimeActiveAndInactive();
+
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(kDefaultTimeInactive);
+ assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive);
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ tickSource()->advance(Microseconds(100));
+
+ assertTimeActiveIs(kDefaultTimeActive + Microseconds(100));
+ assertTimeInactiveIs(kDefaultTimeInactive);
+ assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive + Microseconds(100));
+
+ txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus);
+
+ assertTimeActiveIs(kDefaultTimeActive + Microseconds(100));
+ assertTimeInactiveIs(kDefaultTimeInactive);
+ assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive + Microseconds(100));
+}
+
+TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedEndedTxn) {
+ beginTxnWithDefaultTxnNumber();
+
+ setUpDefaultTimeActiveAndInactive();
+
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(kDefaultTimeInactive);
+ assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive);
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ runCommit(kDummyOkRes);
+ txnRouter().stash(operationContext());
+
+ // At shutdown transactions are implicitly aborted without being continued, so an "ended"
+ // transaction (i.e. committed or aborted) may be implicitly aborted again. This shouldn't
+ // affect any transaction durations.
+ auto future = launchAsync(
+ [&] { return txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus); });
+ expectAbortTransactions({hostAndPort1}, getSessionId(), kTxnNumber);
+ future.default_timed_get();
+
+ assertTimeActiveIs(kDefaultTimeActive);
+ assertTimeInactiveIs(kDefaultTimeInactive);
+ assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive);
+}
+
+TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterSuccessfulCommit) {
+ beginTxnWithDefaultTxnNumber();
+
+ setUpDefaultTimeActiveAndInactive();
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ runCommit(kDummyOkRes);
+
+ // Neither can advance.
+ assertTimeActiveAndInactiveCannotAdvance(kDefaultTimeActive /*timeActive*/,
+ kDefaultTimeInactive /*timeInactive*/);
+}
+
+TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterFailedCommit) {
+ beginTxnWithDefaultTxnNumber();
+
+ setUpDefaultTimeActiveAndInactive();
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ runCommit(kDummyErrorRes);
+
+ // Neither can advance.
+ assertTimeActiveAndInactiveCannotAdvance(kDefaultTimeActive /*timeActive*/,
+ kDefaultTimeInactive /*timeInactive*/);
+}
+
+TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCommitResult) {
+ beginTxnWithDefaultTxnNumber();
+
+ setUpDefaultTimeActiveAndInactive();
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ runCommit(Status(ErrorCodes::HostUnreachable, "dummy"), true /* expectRetries */);
+
+ // timeActive can advance.
+ tickSource()->advance(Microseconds(100));
+ assertTimeActiveIs(kDefaultTimeActive + Microseconds(100));
+ assertTimeInactiveIs(kDefaultTimeInactive);
+
+ // timeInactive can advance.
+ txnRouter().stash(operationContext());
+ tickSource()->advance(Microseconds(100));
+ assertTimeActiveIs(kDefaultTimeActive + Microseconds(100));
+ assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100));
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ runCommit(kDummyRetryableErrorRes, true /* expectRetries */);
+
+ // timeActive can advance.
+ tickSource()->advance(Microseconds(100));
+ assertTimeActiveIs(kDefaultTimeActive + Microseconds(200));
+ assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100));
+
+ // timeInactive can advance.
+ txnRouter().stash(operationContext());
+ tickSource()->advance(Microseconds(100));
+ assertTimeActiveIs(kDefaultTimeActive + Microseconds(200));
+ assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(200));
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ runCommit(kDummyOkRes);
+
+ // The result is known, so neither can advance.
+ assertTimeActiveAndInactiveCannotAdvance(kDefaultTimeActive + Microseconds(200) /*timeActive*/,
+ kDefaultTimeInactive +
+ Microseconds(200) /*timeInactive*/);
+}
+
+TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterAbort) {
+ beginTxnWithDefaultTxnNumber();
+
+ setUpDefaultTimeActiveAndInactive();
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ ASSERT_THROWS_CODE(txnRouter().abortTransaction(operationContext()),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+
+ // Neither can advance.
+ assertTimeActiveAndInactiveCannotAdvance(kDefaultTimeActive /*timeActive*/,
+ kDefaultTimeInactive /*timeInactive*/);
+}
+
+TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterImplicitAbort) {
+ beginTxnWithDefaultTxnNumber();
+
+ setUpDefaultTimeActiveAndInactive();
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus);
+
+ // Neither can advance.
+ assertTimeActiveAndInactiveCannotAdvance(kDefaultTimeActive /*timeActive*/,
+ kDefaultTimeInactive /*timeInactive*/);
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_DefaultTo0) {
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginTxn) {
+ beginTxnWithDefaultTxnNumber();
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginRecover) {
+ beginRecoverCommitWithDefaultTxnNumber();
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_Stash) {
+ beginRecoverCommitWithDefaultTxnNumber();
+ txnRouter().stash(operationContext());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAfterStash) {
+ beginRecoverCommitWithDefaultTxnNumber();
+ txnRouter().stash(operationContext());
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_AreNotCumulative) {
+ // Test active.
+ beginTxnWithDefaultTxnNumber();
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+
+ // Test inactive.
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber + 2, TransactionRouter::TransactionActions::kStart);
+ txnRouter().stash(operationContext());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber + 3, TransactionRouter::TransactionActions::kStart);
+ txnRouter().stash(operationContext());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_TxnEnds) {
+ beginTxnWithDefaultTxnNumber();
+ runCommit(kDummyOkRes);
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_UnknownCommit) {
+ beginTxnWithDefaultTxnNumber();
+ runCommit(kDummyRetryableErrorRes, true /* expectRetries */);
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_ImplicitAbortForStashedTxn) {
+ beginTxnWithDefaultTxnNumber();
+ txnRouter().stash(operationContext());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
+
+ txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus);
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_ImplicitAbortForActiveTxn) {
+ beginTxnWithDefaultTxnNumber();
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+
+ txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus);
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAndStashForEndedTxn) {
+ beginTxnWithDefaultTxnNumber();
+ runCommit(kDummyOkRes);
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+
+ txnRouter().beginOrContinueTxn(
+ operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+
+ txnRouter().stash(operationContext());
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_CommitEndedTxn) {
+ beginTxnWithDefaultTxnNumber();
+ runCommit(kDummyOkRes);
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+
+ runCommit(kDummyOkRes);
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_ExplicitAbortEndedTxn) {
+ beginTxnWithDefaultTxnNumber();
+ runCommit(kDummyOkRes);
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+
+ auto future = launchAsync([&] { txnRouter().abortTransaction(operationContext()); });
+ expectAbortTransactions({hostAndPort1}, getSessionId(), kTxnNumber);
+ future.default_timed_get();
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_ImplicitAbortEndedTxn) {
+ beginTxnWithDefaultTxnNumber();
+ runCommit(kDummyOkRes);
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+
+ auto future = launchAsync(
+ [&] { txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus); });
+ expectAbortTransactions({hostAndPort1}, getSessionId(), kTxnNumber);
+ future.default_timed_get();
+
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
+}
+
+TEST_F(TransactionRouterTest, RouterMetricsCurrent_ReapForInactiveTxn) {
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(operationContext());
+ operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest());
+
+ {
+ // Check out a session to create one in the session catalog.
+ RouterOperationContextSession routerOpCtxSession(operationContext());
+
+ // Start a transaction on the session.
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), 5, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ ASSERT_EQUALS(1L, routerTxnMetrics->getCurrentOpen());
+ ASSERT_EQUALS(1L, routerTxnMetrics->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentInactive());
+ }
+
+ // The router session is out of scope, so the transaction is stashed.
+ ASSERT_EQUALS(1L, routerTxnMetrics->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentActive());
+ ASSERT_EQUALS(1L, routerTxnMetrics->getCurrentInactive());
+
+ // Mark the session for reap which will also erase it from the catalog.
+ auto catalog = SessionCatalog::get(operationContext()->getServiceContext());
+ catalog->scanSession(*operationContext()->getLogicalSessionId(),
+ [](ObservableSession& session) { session.markForReap(); });
+
+ // Verify the session was reaped.
+ catalog->scanSession(*operationContext()->getLogicalSessionId(), [](const ObservableSession&) {
+ FAIL("The callback was called for non-existent session");
+ });
+
+ // Verify serverStatus was updated correctly and the reaped transactions were not considered
+ // committed or aborted.
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentInactive());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getTotalCommitted());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getTotalAborted());
+}
+
+TEST_F(TransactionRouterTest, RouterMetricsCurrent_ReapForUnstartedTxn) {
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(operationContext());
+ operationContext()->setLogicalSessionId(makeLogicalSessionIdForTest());
+
+ {
+ // Check out a session to create one in the session catalog, but don't start a txn on it.
+ RouterOperationContextSession routerOpCtxSession(operationContext());
+ }
+
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentInactive());
+
+ // Mark the session for reap which will also erase it from the catalog.
+ auto catalog = SessionCatalog::get(operationContext()->getServiceContext());
+ catalog->scanSession(*operationContext()->getLogicalSessionId(),
+ [](ObservableSession& session) { session.markForReap(); });
+
+ // Verify the session was reaped.
+ catalog->scanSession(*operationContext()->getLogicalSessionId(), [](const ObservableSession&) {
+ FAIL("The callback was called for non-existent session");
+ });
+
+ // Verify serverStatus was not modified and the reaped transactions were not considered
+ // committed or aborted.
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentOpen());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentActive());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getCurrentInactive());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getTotalCommitted());
+ ASSERT_EQUALS(0L, routerTxnMetrics->getTotalAborted());
+}
+
+// The following three tests verify that the methods that end metrics tracking for a transaction
+// can't be called for an unstarted one.
+
+DEATH_TEST_F(TransactionRouterMetricsTest,
+ ImplicitlyAbortingUnstartedTxnCrashes,
+ "Invariant failure isInitialized()") {
+ txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus);
+}
+
+DEATH_TEST_F(TransactionRouterMetricsTest,
+ AbortingUnstartedTxnCrashes,
+ "Invariant failure isInitialized()") {
+ txnRouter().abortTransaction(operationContext());
+}
+
+DEATH_TEST_F(TransactionRouterMetricsTest,
+ CommittingUnstartedTxnCrashes,
+ "Invariant failure isInitialized()") {
+ txnRouter().commitTransaction(operationContext(), boost::none);
+}
+
TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalStarted_DefaultsTo0) {
ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalStarted());
}
@@ -4023,6 +4655,8 @@ TEST_F(TransactionRouterMetricsTest, ReportResources) {
.getValue(),
startTime);
ASSERT_GTE(transactionDocument.getField("timeOpenMicros").numberLong(), 0);
+ ASSERT_GTE(transactionDocument.getField("timeActiveMicros").numberLong(), 0);
+ ASSERT_GTE(transactionDocument.getField("timeInactiveMicros").numberLong(), 0);
ASSERT_EQ(transactionDocument.getField("numNonReadOnlyParticipants").numberInt(), 0);
ASSERT_EQ(transactionDocument.getField("numReadOnlyParticipants").numberInt(), 0);
@@ -4050,8 +4684,6 @@ TEST_F(TransactionRouterMetricsTest, ReportResourcesWithParticipantList) {
auto transactionDocument = state.getObjectField("transaction");
auto parametersDocument = transactionDocument.getObjectField("parameters");
- ASSERT_EQ(state.getField("desc").valueStringData().toString(), "active transaction");
- ASSERT_EQ(state.getField("type").valueStringData().toString(), "activeSession");
ASSERT_GTE(transactionDocument.getField("readTimestamp").timestamp(), Timestamp(0, 0));
ASSERT_EQ(dateFromISOString(transactionDocument.getField("startWallClockTime").String()),
startTime);
@@ -4102,8 +4734,9 @@ TEST_F(TransactionRouterMetricsTest, ReportResourcesWithParticipantList) {
ASSERT_EQ(transactionDocument.getField("numNonReadOnlyParticipants").numberInt(), 1);
ASSERT_EQ(transactionDocument.getField("numReadOnlyParticipants").numberInt(), 1);
- ASSERT_EQ(state.getField("active").boolean(), true);
ASSERT_GTE(transactionDocument.getField("timeOpenMicros").numberLong(), 0);
+ ASSERT_GTE(transactionDocument.getField("timeActiveMicros").numberLong(), 0);
+ ASSERT_GTE(transactionDocument.getField("timeInactiveMicros").numberLong(), 0);
}
TEST_F(TransactionRouterMetricsTest, ReportResourcesCommit) {
@@ -4135,5 +4768,10 @@ TEST_F(TransactionRouterMetricsTest, ReportResourcesRecoveryCommit) {
ASSERT_EQ(transactionDocument.hasField("participants"), false);
}
+TEST_F(TransactionRouterMetricsTest, ReportResourcesUnstartedTxn) {
+ auto state = txnRouter().reportState(operationContext(), true /* sessionIsActive */);
+ ASSERT_BSONOBJ_EQ(state, BSONObj());
+}
+
} // unnamed namespace
} // namespace mongo