diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2019-08-19 17:08:55 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2019-08-22 13:57:22 -0400 |
commit | f92b912452b540fdcbb1b3b959391fb31e64d408 (patch) | |
tree | 9c8deb150690193e50415792bfaa8e324219e357 /src/mongo/db/s/transaction_coordinator_util.cpp | |
parent | 4db2f7c5f0fa6a19be5cb5f36ad29b670f162813 (diff) | |
download | mongo-f92b912452b540fdcbb1b3b959391fb31e64d408.tar.gz |
SERVER-39573 Prefix TransactionCoordinator logs with transaction id
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_util.cpp')
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_util.cpp | 78 |
1 files changed, 50 insertions, 28 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp index 237ec82525e..9e6b78c445a 100644 --- a/src/mongo/db/s/transaction_coordinator_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_util.cpp @@ -105,7 +105,7 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber, const std::vector<ShardId>& participantList) { - LOG(3) << "Going to write participant list for " << lsid.getId() << ':' << txnNumber; + LOG(3) << txnIdToString(lsid, txnNumber) << " Going to write participant list"; if (MONGO_FAIL_POINT(hangBeforeWritingParticipantList)) { LOG(0) << "Hit hangBeforeWritingParticipantList failpoint"; @@ -167,7 +167,7 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx, // Throw any other error. uassertStatusOK(upsertStatus); - LOG(3) << "Wrote participant list for " << lsid.getId() << ':' << txnNumber; + LOG(3) << txnIdToString(lsid, txnNumber) << " Wrote participant list"; return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); } @@ -246,8 +246,13 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, }; for (const auto& participant : participants) { - responses.emplace_back(sendPrepareToShard( - service, *prepareScheduler, participant, prepareObj, operationContextFn)); + responses.emplace_back(sendPrepareToShard(service, + *prepareScheduler, + lsid, + txnNumber, + participant, + prepareObj, + operationContextFn)); } // Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp @@ -284,8 +289,8 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx, const std::vector<ShardId>& participantList, const txn::CoordinatorCommitDecision& decision) { const bool isCommit = decision.getDecision() == txn::CommitDecision::kCommit; - LOG(3) << "Going to write decision " << (isCommit ? "commit" : "abort") << " for " - << lsid.getId() << ':' << txnNumber; + LOG(3) << txnIdToString(lsid, txnNumber) << " Going to write decision " + << (isCommit ? "commit" : "abort"); if (MONGO_FAIL_POINT(hangBeforeWritingDecision)) { LOG(0) << "Hit hangBeforeWritingDecision failpoint"; @@ -352,8 +357,8 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx, << doc); } - LOG(3) << "Wrote decision " << (isCommit ? "commit" : "abort") << " for " << lsid.getId() << ':' - << txnNumber; + LOG(3) << txnIdToString(lsid, txnNumber) << " Wrote decision " + << (isCommit ? "commit" : "abort"); return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); } @@ -404,8 +409,8 @@ Future<void> sendCommit(ServiceContext* service, std::vector<Future<void>> responses; for (const auto& participant : participants) { - responses.push_back( - sendDecisionToShard(service, scheduler, participant, commitObj, operationContextFn)); + responses.push_back(sendDecisionToShard( + service, scheduler, lsid, txnNumber, participant, commitObj, operationContextFn)); } return txn::whenAll(responses); } @@ -434,8 +439,8 @@ Future<void> sendAbort(ServiceContext* service, std::vector<Future<void>> responses; for (const auto& participant : participants) { - responses.push_back( - sendDecisionToShard(service, scheduler, participant, abortObj, operationContextFn)); + responses.push_back(sendDecisionToShard( + service, scheduler, lsid, txnNumber, participant, abortObj, operationContextFn)); } return txn::whenAll(responses); } @@ -444,7 +449,7 @@ namespace { void deleteCoordinatorDocBlocking(OperationContext* opCtx, const LogicalSessionId& lsid, TxnNumber txnNumber) { - LOG(3) << "Going to delete coordinator doc for " << lsid.getId() << ':' << txnNumber; + LOG(3) << txnIdToString(lsid, txnNumber) << " Going to delete coordinator doc"; if (MONGO_FAIL_POINT(hangBeforeDeletingCoordinatorDoc)) { LOG(0) << "Hit hangBeforeDeletingCoordinatorDoc failpoint"; @@ -499,7 +504,7 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, << doc); } - LOG(3) << "Deleted coordinator doc for " << lsid.getId() << ':' << txnNumber; + LOG(3) << txnIdToString(lsid, txnNumber) << " Deleted coordinator doc"; MONGO_FAIL_POINT_BLOCK(hangAfterDeletingCoordinatorDoc, fp) { LOG(0) << "Hit hangAfterDeletingCoordinatorDoc failpoint"; @@ -549,6 +554,8 @@ std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationCont Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, const ShardId& shardId, const BSONObj& commandObj, OperationContextFn operationContextFn) { @@ -564,17 +571,20 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, swPrepareResponse != ErrorCodes::TransactionCoordinatorReachedAbortDecision; }, [&scheduler, + lsid, + txnNumber, shardId, isLocalShard, commandObj = commandObj.getOwned(), operationContextFn] { - LOG(3) << "Coordinator going to send command " << commandObj << " to " - << (isLocalShard ? " local " : "") << " shard " << shardId; + LOG(3) << txnIdToString(lsid, txnNumber) << " Coordinator going to send command " + << commandObj << " to " << (isLocalShard ? "local " : "") << "shard " << shardId; return scheduler .scheduleRemoteCommand( shardId, kPrimaryReadPreference, commandObj, operationContextFn) - .then([shardId, commandObj = commandObj.getOwned()](ResponseStatus response) { + .then([lsid, txnNumber, shardId, commandObj = commandObj.getOwned()]( + ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); @@ -596,13 +606,14 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, << shardId << ", which is not an expected behavior. " "Interpreting the response as vote to abort"); - LOG(0) << redact(abortStatus); + LOG(0) << txnIdToString(lsid, txnNumber) << " " << redact(abortStatus); return PrepareResponse{ shardId, PrepareVote::kAbort, boost::none, abortStatus}; } - LOG(3) << "Coordinator shard received a vote to commit from shard " + LOG(3) << txnIdToString(lsid, txnNumber) + << " Coordinator shard received a vote to commit from shard " << shardId << " with prepareTimestamp: " << prepareTimestampField.timestamp(); @@ -612,8 +623,8 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, boost::none}; } - LOG(3) << "Coordinator shard received " << status << " from shard " << shardId - << " for " << commandObj; + LOG(3) << txnIdToString(lsid, txnNumber) << " Coordinator shard received " + << status << " from shard " << shardId << " for " << commandObj; if (ErrorCodes::isVoteAbortError(status.code())) { return PrepareResponse{ @@ -641,14 +652,17 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, }); return std::move(f).onError<ErrorCodes::TransactionCoordinatorReachedAbortDecision>( - [shardId](const Status& status) { - LOG(3) << "Prepare stopped retrying due to retrying being cancelled"; + [lsid, txnNumber, shardId](const Status& status) { + LOG(3) << txnIdToString(lsid, txnNumber) + << " Prepare stopped retrying due to retrying being cancelled"; return PrepareResponse{shardId, boost::none, boost::none, status}; }); } Future<void> sendDecisionToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + TxnNumber txnNumber, const ShardId& shardId, const BSONObj& commandObj, OperationContextFn operationContextFn) { @@ -663,17 +677,20 @@ Future<void> sendDecisionToShard(ServiceContext* service, return !s.isOK() && s != ErrorCodes::TransactionCoordinatorSteppingDown; }, [&scheduler, + lsid, + txnNumber, shardId, isLocalShard, operationContextFn, commandObj = commandObj.getOwned()] { - LOG(3) << "Coordinator going to send command " << commandObj << " to " - << (isLocalShard ? "local" : "") << " shard " << shardId; + LOG(3) << txnIdToString(lsid, txnNumber) << " Coordinator going to send command " + << commandObj << " to " << (isLocalShard ? "local " : "") << "shard " << shardId; return scheduler .scheduleRemoteCommand( shardId, kPrimaryReadPreference, commandObj, operationContextFn) - .then([shardId, commandObj = commandObj.getOwned()](ResponseStatus response) { + .then([lsid, txnNumber, shardId, commandObj = commandObj.getOwned()]( + ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); @@ -683,8 +700,9 @@ Future<void> sendDecisionToShard(ServiceContext* service, status = wcStatus; } - LOG(3) << "Coordinator shard received " << status << " in response to " - << commandObj << " from shard " << shardId; + LOG(3) << txnIdToString(lsid, txnNumber) << " Coordinator shard received " + << status << " in response to " << commandObj << " from shard " + << shardId; if (ErrorCodes::isVoteAbortError(status.code())) { // Interpret voteAbort errors as an ack. @@ -703,5 +721,9 @@ Future<void> sendDecisionToShard(ServiceContext* service, }); } +std::string txnIdToString(const LogicalSessionId& lsid, TxnNumber txnNumber) { + return str::stream() << lsid.getId() << ':' << txnNumber; +} + } // namespace txn } // namespace mongo |