diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2019-05-07 15:19:18 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2019-05-29 15:50:08 -0400 |
commit | ffd64883d70c9139d7b56d076e249f3fef77e54e (patch) | |
tree | 5d9fda9e4c9bf1b704516a63fa55f6b223303242 /src/mongo/s/transaction_router.cpp | |
parent | d0194cc5133c4e71868156320fee86f0166e0f7e (diff) | |
download | mongo-ffd64883d70c9139d7b56d076e249f3fef77e54e.tar.gz |
SERVER-40980 SERVER-40984 Basic transactions serverStatus on mongos
Diffstat (limited to 'src/mongo/s/transaction_router.cpp')
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 57 |
1 files changed, 45 insertions, 12 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index a5b8a864bbc..d2659e22662 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -49,6 +49,7 @@ #include "mongo/s/cluster_commands_helpers.h" #include "mongo/s/grid.h" #include "mongo/s/multi_statement_transaction_requests_sender.h" +#include "mongo/s/router_transactions_metrics.h" #include "mongo/util/assert_util.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" @@ -68,7 +69,7 @@ const auto getTransactionRouter = Session::declareDecoration<TransactionRouter>( bool isTransactionCommand(const BSONObj& cmd) { auto cmdName = cmd.firstElement().fieldNameStringData(); return cmdName == "abortTransaction" || cmdName == "commitTransaction" || - cmdName == "prepareTransaction"; + cmdName == "prepareTransaction" || cmdName == CoordinateCommitTransaction::kCommandName; } /** @@ -402,7 +403,10 @@ const boost::optional<ShardId>& TransactionRouter::getRecoveryShardId() const { return _recoveryShardId; } -BSONObj TransactionRouter::attachTxnFieldsIfNeeded(const ShardId& shardId, const BSONObj& cmdObj) { +BSONObj TransactionRouter::attachTxnFieldsIfNeeded(OperationContext* opCtx, + const ShardId& shardId, + const BSONObj& cmdObj) { + RouterTransactionsMetrics::get(opCtx)->incrementTotalRequestsTargeted(); if (auto txnPart = getParticipant(shardId)) { LOG(4) << txnIdToString() << " Sending transaction fields to existing participant: " << shardId; @@ -411,6 +415,10 @@ BSONObj TransactionRouter::attachTxnFieldsIfNeeded(const ShardId& shardId, const auto txnPart = _createParticipant(shardId); LOG(4) << txnIdToString() << " Sending transaction fields to new participant: " << shardId; + if (!_isRecoveringCommit) { + // Don't update participant stats during recovery since the participant list isn't known. + RouterTransactionsMetrics::get(opCtx)->incrementTotalContactedParticipants(); + } return txnPart.attachTxnFieldsIfNeeded(cmdObj, true); } @@ -731,8 +739,6 @@ BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx) const auto coordinateCommitCmdObj = coordinateCommitCmd.toBSON( BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON())); - _commitType = CommitType::kTwoPhaseCommit; - LOG(3) << txnIdToString() << " Committing using two-phase commit, coordinator: " << *_coordinatorId; @@ -753,8 +759,6 @@ BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx) BSONObj TransactionRouter::commitTransaction( OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken) { - _onStartCommit(opCtx); - auto commitRes = _commitTransaction(opCtx, recoveryToken); auto commitStatus = getStatusFromCommandResult(commitRes); @@ -787,6 +791,7 @@ BSONObj TransactionRouter::_commitTransaction( "Cannot recover the transaction decision without a recoveryToken", recoveryToken); _commitType = CommitType::kRecoverWithToken; + _onStartCommit(opCtx); return _commitWithRecoveryToken(opCtx, *recoveryToken); } @@ -798,6 +803,7 @@ BSONObj TransactionRouter::_commitTransaction( "Cannot commit without participants", _txnNumber != kUninitializedTxnNumber); _commitType = CommitType::kNoShards; + _onStartCommit(opCtx); return BSON("ok" << 1); } @@ -826,6 +832,7 @@ BSONObj TransactionRouter::_commitTransaction( LOG(3) << txnIdToString() << " Committing single-shard transaction, single participant: " << shardId; _commitType = CommitType::kSingleShard; + _onStartCommit(opCtx); return sendCommitDirectlyToShards(opCtx, {shardId}); } @@ -833,6 +840,7 @@ BSONObj TransactionRouter::_commitTransaction( LOG(3) << txnIdToString() << " Committing read-only transaction on " << readOnlyShards.size() << " shards"; _commitType = CommitType::kReadOnly; + _onStartCommit(opCtx); return sendCommitDirectlyToShards(opCtx, readOnlyShards); } @@ -841,6 +849,7 @@ BSONObj TransactionRouter::_commitTransaction( << readOnlyShards.size() << " read-only shards, write shard: " << writeShards.front(); _commitType = CommitType::kSingleWriteShard; + _onStartCommit(opCtx); const auto readOnlyShardsResponse = sendCommitDirectlyToShards(opCtx, readOnlyShards); if (!getStatusFromCommandResult(readOnlyShardsResponse).isOK() || @@ -850,6 +859,8 @@ BSONObj TransactionRouter::_commitTransaction( return sendCommitDirectlyToShards(opCtx, writeShards); } + _commitType = CommitType::kTwoPhaseCommit; + _onStartCommit(opCtx); return _handOffCommitToCoordinator(opCtx); } @@ -995,10 +1006,7 @@ BSONObj TransactionRouter::_commitWithRecoveryToken(OperationContext* opCtx, auto rawCoordinateCommit = coordinateCommitCmd.toBSON( BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON())); - auto existingParticipant = getParticipant(recoveryShardId); - auto recoveryParticipant = - existingParticipant ? existingParticipant : &_createParticipant(recoveryShardId); - return recoveryParticipant->attachTxnFieldsIfNeeded(rawCoordinateCommit, false); + return attachTxnFieldsIfNeeded(opCtx, recoveryShardId, rawCoordinateCommit); }(); auto recoveryShard = uassertStatusOK(shardRegistry->getShard(opCtx, recoveryShardId)); @@ -1085,11 +1093,17 @@ std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx, void TransactionRouter::_onNewTransaction(OperationContext* opCtx) { auto tickSource = opCtx->getServiceContext()->getTickSource(); _timingStats.startTime = tickSource->getTicks(); + + auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); + routerTxnMetrics->incrementTotalStarted(); } void TransactionRouter::_onBeginRecoveringDecision(OperationContext* opCtx) { auto tickSource = opCtx->getServiceContext()->getTickSource(); _timingStats.startTime = tickSource->getTicks(); + + auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); + routerTxnMetrics->incrementTotalStarted(); } void TransactionRouter::_onImplicitAbort(OperationContext* opCtx, const Status& errorStatus) { @@ -1122,9 +1136,20 @@ void TransactionRouter::_onExplicitAbort(OperationContext* opCtx) { } void TransactionRouter::_onStartCommit(OperationContext* opCtx) { + invariant(_commitType != CommitType::kNotInitiated); + + if (_timingStats.commitStartTime != 0) { + return; + } + auto tickSource = opCtx->getServiceContext()->getTickSource(); - if (_timingStats.commitStartTime == 0) { - _timingStats.commitStartTime = tickSource->getTicks(); + _timingStats.commitStartTime = tickSource->getTicks(); + + auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); + routerTxnMetrics->incrementCommitInitiated(_commitType); + if (_commitType != CommitType::kRecoverWithToken) { + // We only know the participant list if we're not recovering a decision. + routerTxnMetrics->addToTotalParticipantsAtCommit(_participants.size()); } } @@ -1157,6 +1182,14 @@ void TransactionRouter::_endTransactionTrackingIfNecessary(OperationContext* opC _timingStats.getDuration(tickSource, curTicks) > Milliseconds(serverGlobalParams.slowMS)) { _logSlowTransaction(opCtx, terminationCause); } + + auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); + if (terminationCause == TerminationCause::kAborted) { + routerTxnMetrics->incrementTotalAborted(); + } else { + routerTxnMetrics->incrementTotalCommitted(); + routerTxnMetrics->incrementCommitSuccessful(_commitType); + } } Microseconds TransactionRouter::TimingStats::getDuration(TickSource* tickSource, |