summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/transaction_coordinator_util.cpp
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2019-08-19 17:08:55 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2019-08-22 13:57:22 -0400
commitf92b912452b540fdcbb1b3b959391fb31e64d408 (patch)
tree9c8deb150690193e50415792bfaa8e324219e357 /src/mongo/db/s/transaction_coordinator_util.cpp
parent4db2f7c5f0fa6a19be5cb5f36ad29b670f162813 (diff)
downloadmongo-f92b912452b540fdcbb1b3b959391fb31e64d408.tar.gz
SERVER-39573 Prefix TransactionCoordinator logs with transaction id
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_util.cpp')
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.cpp78
1 files changed, 50 insertions, 28 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp
index 237ec82525e..9e6b78c445a 100644
--- a/src/mongo/db/s/transaction_coordinator_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_util.cpp
@@ -105,7 +105,7 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx,
const LogicalSessionId& lsid,
TxnNumber txnNumber,
const std::vector<ShardId>& participantList) {
- LOG(3) << "Going to write participant list for " << lsid.getId() << ':' << txnNumber;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Going to write participant list";
if (MONGO_FAIL_POINT(hangBeforeWritingParticipantList)) {
LOG(0) << "Hit hangBeforeWritingParticipantList failpoint";
@@ -167,7 +167,7 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx,
// Throw any other error.
uassertStatusOK(upsertStatus);
- LOG(3) << "Wrote participant list for " << lsid.getId() << ':' << txnNumber;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Wrote participant list";
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
}
@@ -246,8 +246,13 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service,
};
for (const auto& participant : participants) {
- responses.emplace_back(sendPrepareToShard(
- service, *prepareScheduler, participant, prepareObj, operationContextFn));
+ responses.emplace_back(sendPrepareToShard(service,
+ *prepareScheduler,
+ lsid,
+ txnNumber,
+ participant,
+ prepareObj,
+ operationContextFn));
}
// Asynchronously aggregate all prepare responses to find the decision and max prepare timestamp
@@ -284,8 +289,8 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx,
const std::vector<ShardId>& participantList,
const txn::CoordinatorCommitDecision& decision) {
const bool isCommit = decision.getDecision() == txn::CommitDecision::kCommit;
- LOG(3) << "Going to write decision " << (isCommit ? "commit" : "abort") << " for "
- << lsid.getId() << ':' << txnNumber;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Going to write decision "
+ << (isCommit ? "commit" : "abort");
if (MONGO_FAIL_POINT(hangBeforeWritingDecision)) {
LOG(0) << "Hit hangBeforeWritingDecision failpoint";
@@ -352,8 +357,8 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx,
<< doc);
}
- LOG(3) << "Wrote decision " << (isCommit ? "commit" : "abort") << " for " << lsid.getId() << ':'
- << txnNumber;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Wrote decision "
+ << (isCommit ? "commit" : "abort");
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
}
@@ -404,8 +409,8 @@ Future<void> sendCommit(ServiceContext* service,
std::vector<Future<void>> responses;
for (const auto& participant : participants) {
- responses.push_back(
- sendDecisionToShard(service, scheduler, participant, commitObj, operationContextFn));
+ responses.push_back(sendDecisionToShard(
+ service, scheduler, lsid, txnNumber, participant, commitObj, operationContextFn));
}
return txn::whenAll(responses);
}
@@ -434,8 +439,8 @@ Future<void> sendAbort(ServiceContext* service,
std::vector<Future<void>> responses;
for (const auto& participant : participants) {
- responses.push_back(
- sendDecisionToShard(service, scheduler, participant, abortObj, operationContextFn));
+ responses.push_back(sendDecisionToShard(
+ service, scheduler, lsid, txnNumber, participant, abortObj, operationContextFn));
}
return txn::whenAll(responses);
}
@@ -444,7 +449,7 @@ namespace {
void deleteCoordinatorDocBlocking(OperationContext* opCtx,
const LogicalSessionId& lsid,
TxnNumber txnNumber) {
- LOG(3) << "Going to delete coordinator doc for " << lsid.getId() << ':' << txnNumber;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Going to delete coordinator doc";
if (MONGO_FAIL_POINT(hangBeforeDeletingCoordinatorDoc)) {
LOG(0) << "Hit hangBeforeDeletingCoordinatorDoc failpoint";
@@ -499,7 +504,7 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx,
<< doc);
}
- LOG(3) << "Deleted coordinator doc for " << lsid.getId() << ':' << txnNumber;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Deleted coordinator doc";
MONGO_FAIL_POINT_BLOCK(hangAfterDeletingCoordinatorDoc, fp) {
LOG(0) << "Hit hangAfterDeletingCoordinatorDoc failpoint";
@@ -549,6 +554,8 @@ std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationCont
Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
const ShardId& shardId,
const BSONObj& commandObj,
OperationContextFn operationContextFn) {
@@ -564,17 +571,20 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
swPrepareResponse != ErrorCodes::TransactionCoordinatorReachedAbortDecision;
},
[&scheduler,
+ lsid,
+ txnNumber,
shardId,
isLocalShard,
commandObj = commandObj.getOwned(),
operationContextFn] {
- LOG(3) << "Coordinator going to send command " << commandObj << " to "
- << (isLocalShard ? " local " : "") << " shard " << shardId;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Coordinator going to send command "
+ << commandObj << " to " << (isLocalShard ? "local " : "") << "shard " << shardId;
return scheduler
.scheduleRemoteCommand(
shardId, kPrimaryReadPreference, commandObj, operationContextFn)
- .then([shardId, commandObj = commandObj.getOwned()](ResponseStatus response) {
+ .then([lsid, txnNumber, shardId, commandObj = commandObj.getOwned()](
+ ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
@@ -596,13 +606,14 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
<< shardId
<< ", which is not an expected behavior. "
"Interpreting the response as vote to abort");
- LOG(0) << redact(abortStatus);
+ LOG(0) << txnIdToString(lsid, txnNumber) << " " << redact(abortStatus);
return PrepareResponse{
shardId, PrepareVote::kAbort, boost::none, abortStatus};
}
- LOG(3) << "Coordinator shard received a vote to commit from shard "
+ LOG(3) << txnIdToString(lsid, txnNumber)
+ << " Coordinator shard received a vote to commit from shard "
<< shardId
<< " with prepareTimestamp: " << prepareTimestampField.timestamp();
@@ -612,8 +623,8 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
boost::none};
}
- LOG(3) << "Coordinator shard received " << status << " from shard " << shardId
- << " for " << commandObj;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Coordinator shard received "
+ << status << " from shard " << shardId << " for " << commandObj;
if (ErrorCodes::isVoteAbortError(status.code())) {
return PrepareResponse{
@@ -641,14 +652,17 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service,
});
return std::move(f).onError<ErrorCodes::TransactionCoordinatorReachedAbortDecision>(
- [shardId](const Status& status) {
- LOG(3) << "Prepare stopped retrying due to retrying being cancelled";
+ [lsid, txnNumber, shardId](const Status& status) {
+ LOG(3) << txnIdToString(lsid, txnNumber)
+ << " Prepare stopped retrying due to retrying being cancelled";
return PrepareResponse{shardId, boost::none, boost::none, status};
});
}
Future<void> sendDecisionToShard(ServiceContext* service,
txn::AsyncWorkScheduler& scheduler,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
const ShardId& shardId,
const BSONObj& commandObj,
OperationContextFn operationContextFn) {
@@ -663,17 +677,20 @@ Future<void> sendDecisionToShard(ServiceContext* service,
return !s.isOK() && s != ErrorCodes::TransactionCoordinatorSteppingDown;
},
[&scheduler,
+ lsid,
+ txnNumber,
shardId,
isLocalShard,
operationContextFn,
commandObj = commandObj.getOwned()] {
- LOG(3) << "Coordinator going to send command " << commandObj << " to "
- << (isLocalShard ? "local" : "") << " shard " << shardId;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Coordinator going to send command "
+ << commandObj << " to " << (isLocalShard ? "local " : "") << "shard " << shardId;
return scheduler
.scheduleRemoteCommand(
shardId, kPrimaryReadPreference, commandObj, operationContextFn)
- .then([shardId, commandObj = commandObj.getOwned()](ResponseStatus response) {
+ .then([lsid, txnNumber, shardId, commandObj = commandObj.getOwned()](
+ ResponseStatus response) {
auto status = getStatusFromCommandResult(response.data);
auto wcStatus = getWriteConcernStatusFromCommandResult(response.data);
@@ -683,8 +700,9 @@ Future<void> sendDecisionToShard(ServiceContext* service,
status = wcStatus;
}
- LOG(3) << "Coordinator shard received " << status << " in response to "
- << commandObj << " from shard " << shardId;
+ LOG(3) << txnIdToString(lsid, txnNumber) << " Coordinator shard received "
+ << status << " in response to " << commandObj << " from shard "
+ << shardId;
if (ErrorCodes::isVoteAbortError(status.code())) {
// Interpret voteAbort errors as an ack.
@@ -703,5 +721,9 @@ Future<void> sendDecisionToShard(ServiceContext* service,
});
}
+std::string txnIdToString(const LogicalSessionId& lsid, TxnNumber txnNumber) {
+ return str::stream() << lsid.getId() << ':' << txnNumber;
+}
+
} // namespace txn
} // namespace mongo