diff options
Diffstat (limited to 'src/mongo/s/transaction_router.cpp')
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 148 |
1 files changed, 111 insertions, 37 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 44796119e1a..a7e4325e833 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -44,6 +44,7 @@ #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/transaction_validation.h" #include "mongo/executor/task_executor_pool.h" +#include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/async_requests_sender.h" #include "mongo/s/cluster_commands_helpers.h" @@ -483,7 +484,11 @@ void TransactionRouter::Router::processParticipantResponse(OperationContext* opC if (txnResponseMetadata.getReadOnly()) { if (participant->readOnly == Participant::ReadOnly::kUnset) { - LOG(3) << txnIdToString() << " Marking " << shardId << " as read-only"; + LOGV2_DEBUG(22880, + 3, + "{txnIdToString} Marking {shardId} as read-only", + "txnIdToString"_attr = txnIdToString(), + "shardId"_attr = shardId); _setReadOnlyForParticipant(opCtx, shardId, Participant::ReadOnly::kReadOnly); return; } @@ -499,12 +504,20 @@ void TransactionRouter::Router::processParticipantResponse(OperationContext* opC // The shard reported readOnly:false on this statement. if (participant->readOnly != Participant::ReadOnly::kNotReadOnly) { - LOG(3) << txnIdToString() << " Marking " << shardId << " as having done a write"; + LOGV2_DEBUG(22881, + 3, + "{txnIdToString} Marking {shardId} as having done a write", + "txnIdToString"_attr = txnIdToString(), + "shardId"_attr = shardId); _setReadOnlyForParticipant(opCtx, shardId, Participant::ReadOnly::kNotReadOnly); if (!p().recoveryShardId) { - LOG(3) << txnIdToString() << " Choosing " << shardId << " as recovery shard"; + LOGV2_DEBUG(22882, + 3, + "{txnIdToString} Choosing {shardId} as recovery shard", + "txnIdToString"_attr = txnIdToString(), + "shardId"_attr = shardId); p().recoveryShardId = shardId; } } @@ -552,13 +565,20 @@ BSONObj TransactionRouter::Router::attachTxnFieldsIfNeeded(OperationContext* opC const BSONObj& cmdObj) { RouterTransactionsMetrics::get(opCtx)->incrementTotalRequestsTargeted(); if (auto txnPart = getParticipant(shardId)) { - LOG(4) << txnIdToString() - << " Sending transaction fields to existing participant: " << shardId; + LOGV2_DEBUG(22883, + 4, + "{txnIdToString} Sending transaction fields to existing participant: {shardId}", + "txnIdToString"_attr = txnIdToString(), + "shardId"_attr = shardId); return txnPart->attachTxnFieldsIfNeeded(cmdObj, false); } auto txnPart = _createParticipant(opCtx, shardId); - LOG(4) << txnIdToString() << " Sending transaction fields to new participant: " << shardId; + LOGV2_DEBUG(22884, + 4, + "{txnIdToString} Sending transaction fields to new participant: {shardId}", + "txnIdToString"_attr = txnIdToString(), + "shardId"_attr = shardId); if (!p().isRecoveringCommit) { // Don't update participant stats during recovery since the participant list isn't known. RouterTransactionsMetrics::get(opCtx)->incrementTotalContactedParticipants(); @@ -740,8 +760,12 @@ void TransactionRouter::Router::onStaleShardOrDbError(OperationContext* opCtx, const Status& errorStatus) { invariant(canContinueOnStaleShardOrDbError(cmdName)); - LOG(3) << txnIdToString() - << " Clearing pending participants after stale version error: " << errorStatus; + LOGV2_DEBUG( + 22885, + 3, + "{txnIdToString} Clearing pending participants after stale version error: {errorStatus}", + "txnIdToString"_attr = txnIdToString(), + "errorStatus"_attr = errorStatus); // Remove participants created during the current statement so they are sent the correct options // if they are targeted again by the retry. @@ -752,8 +776,12 @@ void TransactionRouter::Router::onViewResolutionError(OperationContext* opCtx, const NamespaceString& nss) { // The router can always retry on a view resolution error. - LOG(3) << txnIdToString() - << " Clearing pending participants after view resolution error on namespace: " << nss; + LOGV2_DEBUG(22886, + 3, + "{txnIdToString} Clearing pending participants after view resolution error on " + "namespace: {nss}", + "txnIdToString"_attr = txnIdToString(), + "nss"_attr = nss); // Requests against views are always routed to the primary shard for its database, but the retry // on the resolved namespace does not have to re-target the primary, so pending participants @@ -773,10 +801,14 @@ void TransactionRouter::Router::onSnapshotError(OperationContext* opCtx, const Status& errorStatus) { invariant(canContinueOnSnapshotError()); - LOG(3) << txnIdToString() - << " Clearing pending participants and resetting global snapshot " - "timestamp after snapshot error: " - << errorStatus << ", previous timestamp: " << o().atClusterTime->getTime(); + LOGV2_DEBUG(22887, + 3, + "{txnIdToString} Clearing pending participants and resetting global snapshot " + "timestamp after snapshot error: {errorStatus}, previous timestamp: " + "{o_atClusterTime_getTime}", + "txnIdToString"_attr = txnIdToString(), + "errorStatus"_attr = errorStatus, + "o_atClusterTime_getTime"_attr = o().atClusterTime->getTime()); // The transaction must be restarted on all participants because a new read timestamp will be // selected, so clear all pending participants. Snapshot errors are only retryable on the first @@ -814,8 +846,13 @@ void TransactionRouter::Router::_setAtClusterTime( return; } - LOG(2) << txnIdToString() << " Setting global snapshot timestamp to " << candidateTime - << " on statement " << p().latestStmtId; + LOGV2_DEBUG(22888, + 2, + "{txnIdToString} Setting global snapshot timestamp to {candidateTime} on statement " + "{p_latestStmtId}", + "txnIdToString"_attr = txnIdToString(), + "candidateTime"_attr = candidateTime, + "p_latestStmtId"_attr = p().latestStmtId); o(lk).atClusterTime->setTime(candidateTime, p().latestStmtId); } @@ -877,7 +914,10 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, o(lk).atClusterTime.emplace(); } - LOG(3) << txnIdToString() << " New transaction started"; + LOGV2_DEBUG(22889, + 3, + "{txnIdToString} New transaction started", + "txnIdToString"_attr = txnIdToString()); break; } case TransactionActions::kContinue: { @@ -892,7 +932,10 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, // means that the client is attempting to recover a commit decision. p().isRecoveringCommit = true; - LOG(3) << txnIdToString() << " Commit recovery started"; + LOGV2_DEBUG(22890, + 3, + "{txnIdToString} Commit recovery started", + "txnIdToString"_attr = txnIdToString()); break; } }; @@ -929,8 +972,11 @@ BSONObj TransactionRouter::Router::_handOffCommitToCoordinator(OperationContext* const auto coordinateCommitCmdObj = coordinateCommitCmd.toBSON( BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON())); - LOG(3) << txnIdToString() - << " Committing using two-phase commit, coordinator: " << *o().coordinatorId; + LOGV2_DEBUG(22891, + 3, + "{txnIdToString} Committing using two-phase commit, coordinator: {o_coordinatorId}", + "txnIdToString"_attr = txnIdToString(), + "o_coordinatorId"_attr = *o().coordinatorId); MultiStatementTransactionRequestsSender ars( opCtx, @@ -1031,8 +1077,12 @@ BSONObj TransactionRouter::Router::_commitTransaction( if (o().participants.size() == 1) { ShardId shardId = o().participants.cbegin()->first; - LOG(3) << txnIdToString() - << " Committing single-shard transaction, single participant: " << shardId; + LOGV2_DEBUG( + 22892, + 3, + "{txnIdToString} Committing single-shard transaction, single participant: {shardId}", + "txnIdToString"_attr = txnIdToString(), + "shardId"_attr = shardId); { stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -1045,8 +1095,12 @@ BSONObj TransactionRouter::Router::_commitTransaction( } if (writeShards.size() == 0) { - LOG(3) << txnIdToString() << " Committing read-only transaction on " - << readOnlyShards.size() << " shards"; + LOGV2_DEBUG( + 22893, + 3, + "{txnIdToString} Committing read-only transaction on {readOnlyShards_size} shards", + "txnIdToString"_attr = txnIdToString(), + "readOnlyShards_size"_attr = readOnlyShards.size()); { stdx::lock_guard<Client> lk(*opCtx->getClient()); o(lk).commitType = CommitType::kReadOnly; @@ -1057,9 +1111,13 @@ BSONObj TransactionRouter::Router::_commitTransaction( } if (writeShards.size() == 1) { - LOG(3) << txnIdToString() << " Committing single-write-shard transaction with " - << readOnlyShards.size() - << " read-only shards, write shard: " << writeShards.front(); + LOGV2_DEBUG(22894, + 3, + "{txnIdToString} Committing single-write-shard transaction with " + "{readOnlyShards_size} read-only shards, write shard: {writeShards_front}", + "txnIdToString"_attr = txnIdToString(), + "readOnlyShards_size"_attr = readOnlyShards.size(), + "writeShards_front"_attr = writeShards.front()); { stdx::lock_guard<Client> lk(*opCtx->getClient()); o(lk).commitType = CommitType::kSingleWriteShard; @@ -1106,8 +1164,11 @@ BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) { abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd); } - LOG(3) << txnIdToString() << " Aborting transaction on " << o().participants.size() - << " shard(s)"; + LOGV2_DEBUG(22895, + 3, + "{txnIdToString} Aborting transaction on {o_participants_size} shard(s)", + "txnIdToString"_attr = txnIdToString(), + "o_participants_size"_attr = o().participants.size()); const auto responses = gatherResponses(opCtx, NamespaceString::kAdminDb, @@ -1145,9 +1206,11 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC if (o().commitType == CommitType::kTwoPhaseCommit || o().commitType == CommitType::kRecoverWithToken) { - LOG(3) << txnIdToString() - << " Router not sending implicit abortTransaction because commit " - "may have been handed off to the coordinator"; + LOGV2_DEBUG(22896, + 3, + "{txnIdToString} Router not sending implicit abortTransaction because commit " + "may have been handed off to the coordinator", + "txnIdToString"_attr = txnIdToString()); return; } @@ -1167,8 +1230,13 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd); } - LOG(3) << txnIdToString() << " Implicitly aborting transaction on " << o().participants.size() - << " shard(s) due to error: " << errorStatus; + LOGV2_DEBUG(22897, + 3, + "{txnIdToString} Implicitly aborting transaction on {o_participants_size} shard(s) " + "due to error: {errorStatus}", + "txnIdToString"_attr = txnIdToString(), + "o_participants_size"_attr = o().participants.size(), + "errorStatus"_attr = errorStatus); try { // Ignore the responses. @@ -1178,8 +1246,11 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC Shard::RetryPolicy::kIdempotent, abortRequests); } catch (const DBException& ex) { - LOG(3) << txnIdToString() << " Implicitly aborting transaction failed " - << causedBy(ex.toStatus()); + LOGV2_DEBUG(22898, + 3, + "{txnIdToString} Implicitly aborting transaction failed {causedBy_ex_toStatus}", + "txnIdToString"_attr = txnIdToString(), + "causedBy_ex_toStatus"_attr = causedBy(ex.toStatus())); // Ignore any exceptions. } } @@ -1262,7 +1333,10 @@ BSONObj TransactionRouter::Router::_commitWithRecoveryToken(OperationContext* op void TransactionRouter::Router::_logSlowTransaction(OperationContext* opCtx, TerminationCause terminationCause) const { - log() << "transaction " << _transactionInfoForLog(opCtx, terminationCause); + LOGV2(22899, + "transaction {transactionInfoForLog_opCtx_terminationCause}", + "transactionInfoForLog_opCtx_terminationCause"_attr = + _transactionInfoForLog(opCtx, terminationCause)); } std::string TransactionRouter::Router::_transactionInfoForLog( |