summaryrefslogtreecommitdiff
path: root/src/mongo/s/transaction_router.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/s/transaction_router.cpp')
-rw-r--r--src/mongo/s/transaction_router.cpp148
1 files changed, 111 insertions, 37 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 44796119e1a..a7e4325e833 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -44,6 +44,7 @@
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/transaction_validation.h"
#include "mongo/executor/task_executor_pool.h"
+#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/async_requests_sender.h"
#include "mongo/s/cluster_commands_helpers.h"
@@ -483,7 +484,11 @@ void TransactionRouter::Router::processParticipantResponse(OperationContext* opC
if (txnResponseMetadata.getReadOnly()) {
if (participant->readOnly == Participant::ReadOnly::kUnset) {
- LOG(3) << txnIdToString() << " Marking " << shardId << " as read-only";
+ LOGV2_DEBUG(22880,
+ 3,
+ "{txnIdToString} Marking {shardId} as read-only",
+ "txnIdToString"_attr = txnIdToString(),
+ "shardId"_attr = shardId);
_setReadOnlyForParticipant(opCtx, shardId, Participant::ReadOnly::kReadOnly);
return;
}
@@ -499,12 +504,20 @@ void TransactionRouter::Router::processParticipantResponse(OperationContext* opC
// The shard reported readOnly:false on this statement.
if (participant->readOnly != Participant::ReadOnly::kNotReadOnly) {
- LOG(3) << txnIdToString() << " Marking " << shardId << " as having done a write";
+ LOGV2_DEBUG(22881,
+ 3,
+ "{txnIdToString} Marking {shardId} as having done a write",
+ "txnIdToString"_attr = txnIdToString(),
+ "shardId"_attr = shardId);
_setReadOnlyForParticipant(opCtx, shardId, Participant::ReadOnly::kNotReadOnly);
if (!p().recoveryShardId) {
- LOG(3) << txnIdToString() << " Choosing " << shardId << " as recovery shard";
+ LOGV2_DEBUG(22882,
+ 3,
+ "{txnIdToString} Choosing {shardId} as recovery shard",
+ "txnIdToString"_attr = txnIdToString(),
+ "shardId"_attr = shardId);
p().recoveryShardId = shardId;
}
}
@@ -552,13 +565,20 @@ BSONObj TransactionRouter::Router::attachTxnFieldsIfNeeded(OperationContext* opC
const BSONObj& cmdObj) {
RouterTransactionsMetrics::get(opCtx)->incrementTotalRequestsTargeted();
if (auto txnPart = getParticipant(shardId)) {
- LOG(4) << txnIdToString()
- << " Sending transaction fields to existing participant: " << shardId;
+ LOGV2_DEBUG(22883,
+ 4,
+ "{txnIdToString} Sending transaction fields to existing participant: {shardId}",
+ "txnIdToString"_attr = txnIdToString(),
+ "shardId"_attr = shardId);
return txnPart->attachTxnFieldsIfNeeded(cmdObj, false);
}
auto txnPart = _createParticipant(opCtx, shardId);
- LOG(4) << txnIdToString() << " Sending transaction fields to new participant: " << shardId;
+ LOGV2_DEBUG(22884,
+ 4,
+ "{txnIdToString} Sending transaction fields to new participant: {shardId}",
+ "txnIdToString"_attr = txnIdToString(),
+ "shardId"_attr = shardId);
if (!p().isRecoveringCommit) {
// Don't update participant stats during recovery since the participant list isn't known.
RouterTransactionsMetrics::get(opCtx)->incrementTotalContactedParticipants();
@@ -740,8 +760,12 @@ void TransactionRouter::Router::onStaleShardOrDbError(OperationContext* opCtx,
const Status& errorStatus) {
invariant(canContinueOnStaleShardOrDbError(cmdName));
- LOG(3) << txnIdToString()
- << " Clearing pending participants after stale version error: " << errorStatus;
+ LOGV2_DEBUG(
+ 22885,
+ 3,
+ "{txnIdToString} Clearing pending participants after stale version error: {errorStatus}",
+ "txnIdToString"_attr = txnIdToString(),
+ "errorStatus"_attr = errorStatus);
// Remove participants created during the current statement so they are sent the correct options
// if they are targeted again by the retry.
@@ -752,8 +776,12 @@ void TransactionRouter::Router::onViewResolutionError(OperationContext* opCtx,
const NamespaceString& nss) {
// The router can always retry on a view resolution error.
- LOG(3) << txnIdToString()
- << " Clearing pending participants after view resolution error on namespace: " << nss;
+ LOGV2_DEBUG(22886,
+ 3,
+ "{txnIdToString} Clearing pending participants after view resolution error on "
+ "namespace: {nss}",
+ "txnIdToString"_attr = txnIdToString(),
+ "nss"_attr = nss);
// Requests against views are always routed to the primary shard for its database, but the retry
// on the resolved namespace does not have to re-target the primary, so pending participants
@@ -773,10 +801,14 @@ void TransactionRouter::Router::onSnapshotError(OperationContext* opCtx,
const Status& errorStatus) {
invariant(canContinueOnSnapshotError());
- LOG(3) << txnIdToString()
- << " Clearing pending participants and resetting global snapshot "
- "timestamp after snapshot error: "
- << errorStatus << ", previous timestamp: " << o().atClusterTime->getTime();
+ LOGV2_DEBUG(22887,
+ 3,
+ "{txnIdToString} Clearing pending participants and resetting global snapshot "
+ "timestamp after snapshot error: {errorStatus}, previous timestamp: "
+ "{o_atClusterTime_getTime}",
+ "txnIdToString"_attr = txnIdToString(),
+ "errorStatus"_attr = errorStatus,
+ "o_atClusterTime_getTime"_attr = o().atClusterTime->getTime());
// The transaction must be restarted on all participants because a new read timestamp will be
// selected, so clear all pending participants. Snapshot errors are only retryable on the first
@@ -814,8 +846,13 @@ void TransactionRouter::Router::_setAtClusterTime(
return;
}
- LOG(2) << txnIdToString() << " Setting global snapshot timestamp to " << candidateTime
- << " on statement " << p().latestStmtId;
+ LOGV2_DEBUG(22888,
+ 2,
+ "{txnIdToString} Setting global snapshot timestamp to {candidateTime} on statement "
+ "{p_latestStmtId}",
+ "txnIdToString"_attr = txnIdToString(),
+ "candidateTime"_attr = candidateTime,
+ "p_latestStmtId"_attr = p().latestStmtId);
o(lk).atClusterTime->setTime(candidateTime, p().latestStmtId);
}
@@ -877,7 +914,10 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
o(lk).atClusterTime.emplace();
}
- LOG(3) << txnIdToString() << " New transaction started";
+ LOGV2_DEBUG(22889,
+ 3,
+ "{txnIdToString} New transaction started",
+ "txnIdToString"_attr = txnIdToString());
break;
}
case TransactionActions::kContinue: {
@@ -892,7 +932,10 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
// means that the client is attempting to recover a commit decision.
p().isRecoveringCommit = true;
- LOG(3) << txnIdToString() << " Commit recovery started";
+ LOGV2_DEBUG(22890,
+ 3,
+ "{txnIdToString} Commit recovery started",
+ "txnIdToString"_attr = txnIdToString());
break;
}
};
@@ -929,8 +972,11 @@ BSONObj TransactionRouter::Router::_handOffCommitToCoordinator(OperationContext*
const auto coordinateCommitCmdObj = coordinateCommitCmd.toBSON(
BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON()));
- LOG(3) << txnIdToString()
- << " Committing using two-phase commit, coordinator: " << *o().coordinatorId;
+ LOGV2_DEBUG(22891,
+ 3,
+ "{txnIdToString} Committing using two-phase commit, coordinator: {o_coordinatorId}",
+ "txnIdToString"_attr = txnIdToString(),
+ "o_coordinatorId"_attr = *o().coordinatorId);
MultiStatementTransactionRequestsSender ars(
opCtx,
@@ -1031,8 +1077,12 @@ BSONObj TransactionRouter::Router::_commitTransaction(
if (o().participants.size() == 1) {
ShardId shardId = o().participants.cbegin()->first;
- LOG(3) << txnIdToString()
- << " Committing single-shard transaction, single participant: " << shardId;
+ LOGV2_DEBUG(
+ 22892,
+ 3,
+ "{txnIdToString} Committing single-shard transaction, single participant: {shardId}",
+ "txnIdToString"_attr = txnIdToString(),
+ "shardId"_attr = shardId);
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -1045,8 +1095,12 @@ BSONObj TransactionRouter::Router::_commitTransaction(
}
if (writeShards.size() == 0) {
- LOG(3) << txnIdToString() << " Committing read-only transaction on "
- << readOnlyShards.size() << " shards";
+ LOGV2_DEBUG(
+ 22893,
+ 3,
+ "{txnIdToString} Committing read-only transaction on {readOnlyShards_size} shards",
+ "txnIdToString"_attr = txnIdToString(),
+ "readOnlyShards_size"_attr = readOnlyShards.size());
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
o(lk).commitType = CommitType::kReadOnly;
@@ -1057,9 +1111,13 @@ BSONObj TransactionRouter::Router::_commitTransaction(
}
if (writeShards.size() == 1) {
- LOG(3) << txnIdToString() << " Committing single-write-shard transaction with "
- << readOnlyShards.size()
- << " read-only shards, write shard: " << writeShards.front();
+ LOGV2_DEBUG(22894,
+ 3,
+ "{txnIdToString} Committing single-write-shard transaction with "
+ "{readOnlyShards_size} read-only shards, write shard: {writeShards_front}",
+ "txnIdToString"_attr = txnIdToString(),
+ "readOnlyShards_size"_attr = readOnlyShards.size(),
+ "writeShards_front"_attr = writeShards.front());
{
stdx::lock_guard<Client> lk(*opCtx->getClient());
o(lk).commitType = CommitType::kSingleWriteShard;
@@ -1106,8 +1164,11 @@ BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) {
abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd);
}
- LOG(3) << txnIdToString() << " Aborting transaction on " << o().participants.size()
- << " shard(s)";
+ LOGV2_DEBUG(22895,
+ 3,
+ "{txnIdToString} Aborting transaction on {o_participants_size} shard(s)",
+ "txnIdToString"_attr = txnIdToString(),
+ "o_participants_size"_attr = o().participants.size());
const auto responses = gatherResponses(opCtx,
NamespaceString::kAdminDb,
@@ -1145,9 +1206,11 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC
if (o().commitType == CommitType::kTwoPhaseCommit ||
o().commitType == CommitType::kRecoverWithToken) {
- LOG(3) << txnIdToString()
- << " Router not sending implicit abortTransaction because commit "
- "may have been handed off to the coordinator";
+ LOGV2_DEBUG(22896,
+ 3,
+ "{txnIdToString} Router not sending implicit abortTransaction because commit "
+ "may have been handed off to the coordinator",
+ "txnIdToString"_attr = txnIdToString());
return;
}
@@ -1167,8 +1230,13 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC
abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd);
}
- LOG(3) << txnIdToString() << " Implicitly aborting transaction on " << o().participants.size()
- << " shard(s) due to error: " << errorStatus;
+ LOGV2_DEBUG(22897,
+ 3,
+ "{txnIdToString} Implicitly aborting transaction on {o_participants_size} shard(s) "
+ "due to error: {errorStatus}",
+ "txnIdToString"_attr = txnIdToString(),
+ "o_participants_size"_attr = o().participants.size(),
+ "errorStatus"_attr = errorStatus);
try {
// Ignore the responses.
@@ -1178,8 +1246,11 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC
Shard::RetryPolicy::kIdempotent,
abortRequests);
} catch (const DBException& ex) {
- LOG(3) << txnIdToString() << " Implicitly aborting transaction failed "
- << causedBy(ex.toStatus());
+ LOGV2_DEBUG(22898,
+ 3,
+ "{txnIdToString} Implicitly aborting transaction failed {causedBy_ex_toStatus}",
+ "txnIdToString"_attr = txnIdToString(),
+ "causedBy_ex_toStatus"_attr = causedBy(ex.toStatus()));
// Ignore any exceptions.
}
}
@@ -1262,7 +1333,10 @@ BSONObj TransactionRouter::Router::_commitWithRecoveryToken(OperationContext* op
void TransactionRouter::Router::_logSlowTransaction(OperationContext* opCtx,
TerminationCause terminationCause) const {
- log() << "transaction " << _transactionInfoForLog(opCtx, terminationCause);
+ LOGV2(22899,
+ "transaction {transactionInfoForLog_opCtx_terminationCause}",
+ "transactionInfoForLog_opCtx_terminationCause"_attr =
+ _transactionInfoForLog(opCtx, terminationCause));
}
std::string TransactionRouter::Router::_transactionInfoForLog(