summaryrefslogtreecommitdiff
path: root/src/mongo/s/transaction_router.cpp
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2019-05-07 15:19:18 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2019-05-29 15:50:08 -0400
commitffd64883d70c9139d7b56d076e249f3fef77e54e (patch)
tree5d9fda9e4c9bf1b704516a63fa55f6b223303242 /src/mongo/s/transaction_router.cpp
parentd0194cc5133c4e71868156320fee86f0166e0f7e (diff)
downloadmongo-ffd64883d70c9139d7b56d076e249f3fef77e54e.tar.gz
SERVER-40980 SERVER-40984 Basic transactions serverStatus on mongos
Diffstat (limited to 'src/mongo/s/transaction_router.cpp')
-rw-r--r--src/mongo/s/transaction_router.cpp57
1 files changed, 45 insertions, 12 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index a5b8a864bbc..d2659e22662 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -49,6 +49,7 @@
#include "mongo/s/cluster_commands_helpers.h"
#include "mongo/s/grid.h"
#include "mongo/s/multi_statement_transaction_requests_sender.h"
+#include "mongo/s/router_transactions_metrics.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
@@ -68,7 +69,7 @@ const auto getTransactionRouter = Session::declareDecoration<TransactionRouter>(
bool isTransactionCommand(const BSONObj& cmd) {
auto cmdName = cmd.firstElement().fieldNameStringData();
return cmdName == "abortTransaction" || cmdName == "commitTransaction" ||
- cmdName == "prepareTransaction";
+ cmdName == "prepareTransaction" || cmdName == CoordinateCommitTransaction::kCommandName;
}
/**
@@ -402,7 +403,10 @@ const boost::optional<ShardId>& TransactionRouter::getRecoveryShardId() const {
return _recoveryShardId;
}
-BSONObj TransactionRouter::attachTxnFieldsIfNeeded(const ShardId& shardId, const BSONObj& cmdObj) {
+BSONObj TransactionRouter::attachTxnFieldsIfNeeded(OperationContext* opCtx,
+ const ShardId& shardId,
+ const BSONObj& cmdObj) {
+ RouterTransactionsMetrics::get(opCtx)->incrementTotalRequestsTargeted();
if (auto txnPart = getParticipant(shardId)) {
LOG(4) << txnIdToString()
<< " Sending transaction fields to existing participant: " << shardId;
@@ -411,6 +415,10 @@ BSONObj TransactionRouter::attachTxnFieldsIfNeeded(const ShardId& shardId, const
auto txnPart = _createParticipant(shardId);
LOG(4) << txnIdToString() << " Sending transaction fields to new participant: " << shardId;
+ if (!_isRecoveringCommit) {
+ // Don't update participant stats during recovery since the participant list isn't known.
+ RouterTransactionsMetrics::get(opCtx)->incrementTotalContactedParticipants();
+ }
return txnPart.attachTxnFieldsIfNeeded(cmdObj, true);
}
@@ -731,8 +739,6 @@ BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx)
const auto coordinateCommitCmdObj = coordinateCommitCmd.toBSON(
BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON()));
- _commitType = CommitType::kTwoPhaseCommit;
-
LOG(3) << txnIdToString()
<< " Committing using two-phase commit, coordinator: " << *_coordinatorId;
@@ -753,8 +759,6 @@ BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx)
BSONObj TransactionRouter::commitTransaction(
OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken) {
- _onStartCommit(opCtx);
-
auto commitRes = _commitTransaction(opCtx, recoveryToken);
auto commitStatus = getStatusFromCommandResult(commitRes);
@@ -787,6 +791,7 @@ BSONObj TransactionRouter::_commitTransaction(
"Cannot recover the transaction decision without a recoveryToken",
recoveryToken);
_commitType = CommitType::kRecoverWithToken;
+ _onStartCommit(opCtx);
return _commitWithRecoveryToken(opCtx, *recoveryToken);
}
@@ -798,6 +803,7 @@ BSONObj TransactionRouter::_commitTransaction(
"Cannot commit without participants",
_txnNumber != kUninitializedTxnNumber);
_commitType = CommitType::kNoShards;
+ _onStartCommit(opCtx);
return BSON("ok" << 1);
}
@@ -826,6 +832,7 @@ BSONObj TransactionRouter::_commitTransaction(
LOG(3) << txnIdToString()
<< " Committing single-shard transaction, single participant: " << shardId;
_commitType = CommitType::kSingleShard;
+ _onStartCommit(opCtx);
return sendCommitDirectlyToShards(opCtx, {shardId});
}
@@ -833,6 +840,7 @@ BSONObj TransactionRouter::_commitTransaction(
LOG(3) << txnIdToString() << " Committing read-only transaction on "
<< readOnlyShards.size() << " shards";
_commitType = CommitType::kReadOnly;
+ _onStartCommit(opCtx);
return sendCommitDirectlyToShards(opCtx, readOnlyShards);
}
@@ -841,6 +849,7 @@ BSONObj TransactionRouter::_commitTransaction(
<< readOnlyShards.size()
<< " read-only shards, write shard: " << writeShards.front();
_commitType = CommitType::kSingleWriteShard;
+ _onStartCommit(opCtx);
const auto readOnlyShardsResponse = sendCommitDirectlyToShards(opCtx, readOnlyShards);
if (!getStatusFromCommandResult(readOnlyShardsResponse).isOK() ||
@@ -850,6 +859,8 @@ BSONObj TransactionRouter::_commitTransaction(
return sendCommitDirectlyToShards(opCtx, writeShards);
}
+ _commitType = CommitType::kTwoPhaseCommit;
+ _onStartCommit(opCtx);
return _handOffCommitToCoordinator(opCtx);
}
@@ -995,10 +1006,7 @@ BSONObj TransactionRouter::_commitWithRecoveryToken(OperationContext* opCtx,
auto rawCoordinateCommit = coordinateCommitCmd.toBSON(
BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON()));
- auto existingParticipant = getParticipant(recoveryShardId);
- auto recoveryParticipant =
- existingParticipant ? existingParticipant : &_createParticipant(recoveryShardId);
- return recoveryParticipant->attachTxnFieldsIfNeeded(rawCoordinateCommit, false);
+ return attachTxnFieldsIfNeeded(opCtx, recoveryShardId, rawCoordinateCommit);
}();
auto recoveryShard = uassertStatusOK(shardRegistry->getShard(opCtx, recoveryShardId));
@@ -1085,11 +1093,17 @@ std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx,
void TransactionRouter::_onNewTransaction(OperationContext* opCtx) {
auto tickSource = opCtx->getServiceContext()->getTickSource();
_timingStats.startTime = tickSource->getTicks();
+
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
+ routerTxnMetrics->incrementTotalStarted();
}
void TransactionRouter::_onBeginRecoveringDecision(OperationContext* opCtx) {
auto tickSource = opCtx->getServiceContext()->getTickSource();
_timingStats.startTime = tickSource->getTicks();
+
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
+ routerTxnMetrics->incrementTotalStarted();
}
void TransactionRouter::_onImplicitAbort(OperationContext* opCtx, const Status& errorStatus) {
@@ -1122,9 +1136,20 @@ void TransactionRouter::_onExplicitAbort(OperationContext* opCtx) {
}
void TransactionRouter::_onStartCommit(OperationContext* opCtx) {
+ invariant(_commitType != CommitType::kNotInitiated);
+
+ if (_timingStats.commitStartTime != 0) {
+ return;
+ }
+
auto tickSource = opCtx->getServiceContext()->getTickSource();
- if (_timingStats.commitStartTime == 0) {
- _timingStats.commitStartTime = tickSource->getTicks();
+ _timingStats.commitStartTime = tickSource->getTicks();
+
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
+ routerTxnMetrics->incrementCommitInitiated(_commitType);
+ if (_commitType != CommitType::kRecoverWithToken) {
+ // We only know the participant list if we're not recovering a decision.
+ routerTxnMetrics->addToTotalParticipantsAtCommit(_participants.size());
}
}
@@ -1157,6 +1182,14 @@ void TransactionRouter::_endTransactionTrackingIfNecessary(OperationContext* opC
_timingStats.getDuration(tickSource, curTicks) > Milliseconds(serverGlobalParams.slowMS)) {
_logSlowTransaction(opCtx, terminationCause);
}
+
+ auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
+ if (terminationCause == TerminationCause::kAborted) {
+ routerTxnMetrics->incrementTotalAborted();
+ } else {
+ routerTxnMetrics->incrementTotalCommitted();
+ routerTxnMetrics->incrementCommitSuccessful(_commitType);
+ }
}
Microseconds TransactionRouter::TimingStats::getDuration(TickSource* tickSource,