summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/transaction_coordinator.cpp
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2021-10-11 18:54:09 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-10-11 19:38:20 +0000
commitdccba0661624787d8f398058928b7a29a31a2a86 (patch)
tree24b66cefce9a3db8bd94441b291235ad83bc789c /src/mongo/db/s/transaction_coordinator.cpp
parent13f0ae71f634409f2e219616ac489b45057d56bb (diff)
downloadmongo-dccba0661624787d8f398058928b7a29a31a2a86.tar.gz
SERVER-60323 Make TransactionCoordinator support txnRetryCounter
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator.cpp')
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp73
1 files changed, 43 insertions, 30 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 35d7f5f00f8..beaeadac455 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -88,19 +88,21 @@ ExecutorFuture<void> waitForMajorityWithHangFailpoint(ServiceContext* service,
} // namespace
-TransactionCoordinator::TransactionCoordinator(OperationContext* operationContext,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
- Date_t deadline)
+TransactionCoordinator::TransactionCoordinator(
+ OperationContext* operationContext,
+ const LogicalSessionId& lsid,
+ const TxnNumberAndRetryCounter& txnNumberAndRetryCounter,
+ std::unique_ptr<txn::AsyncWorkScheduler> scheduler,
+ Date_t deadline)
: _serviceContext(operationContext->getServiceContext()),
_lsid(lsid),
- _txnNumber(txnNumber),
+ _txnNumberAndRetryCounter(txnNumberAndRetryCounter),
_scheduler(std::move(scheduler)),
_sendPrepareScheduler(_scheduler->makeChildScheduler()),
_transactionCoordinatorMetricsObserver(
std::make_unique<TransactionCoordinatorMetricsObserver>()),
_deadline(deadline) {
+ invariant(_txnNumberAndRetryCounter.getTxnRetryCounter());
auto apiParams = APIParameters::get(operationContext);
auto kickOffCommitPF = makePromiseFuture<void>();
@@ -116,7 +118,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
1,
"TransactionCoordinator deadline reached",
"sessionId"_attr = _lsid.getId(),
- "txnNumber"_attr = _txnNumber);
+ "txnNumberAndRetryCounter"_attr =
+ _txnNumberAndRetryCounter);
cancelIfCommitNotYetStarted();
// See the comments for sendPrepare about the purpose of this
@@ -167,7 +170,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
}
return txn::persistParticipantsList(
- *_sendPrepareScheduler, _lsid, _txnNumber, *_participants);
+ *_sendPrepareScheduler, _lsid, _txnNumberAndRetryCounter, *_participants);
})
.then([this](repl::OpTime opTime) {
return waitForMajorityWithHangFailpoint(
@@ -207,7 +210,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
return txn::sendPrepare(_serviceContext,
*_sendPrepareScheduler,
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
apiParams,
*_participants)
.then([this](PrepareVoteConsensus consensus) mutable {
@@ -217,14 +220,15 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
}
if (_decision->getDecision() == CommitDecision::kCommit) {
- LOGV2_DEBUG(22446,
- 3,
- "{sessionId}:{txnNumber} Advancing cluster time to "
- "the commit timestamp {commitTimestamp}",
- "Advancing cluster time to the commit timestamp",
- "sessionId"_attr = _lsid.getId(),
- "txnNumber"_attr = _txnNumber,
- "commitTimestamp"_attr = *_decision->getCommitTimestamp());
+ LOGV2_DEBUG(
+ 22446,
+ 3,
+ "{sessionId}:{_txnNumberAndRetryCounter} Advancing cluster time to "
+ "the commit timestamp {commitTimestamp}",
+ "Advancing cluster time to the commit timestamp",
+ "sessionId"_attr = _lsid.getId(),
+ "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter,
+ "commitTimestamp"_attr = *_decision->getCommitTimestamp());
VectorClockMutable::get(_serviceContext)
->tickClusterTimeTo(LogicalTime(*_decision->getCommitTimestamp()));
@@ -232,13 +236,13 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
});
})
.onError<ErrorCodes::TransactionCoordinatorReachedAbortDecision>(
- [this, lsid, txnNumber](const Status& status) {
+ [this, lsid, txnNumberAndRetryCounter](const Status& status) {
// Timeout happened, propagate the decision to abort the transaction to replicas
// and convert the internal error code to the public one.
LOGV2(5047001,
"Transaction coordinator made abort decision",
"sessionId"_attr = lsid.getId(),
- "txnNumber"_attr = txnNumber,
+ "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter,
"status"_attr = redact(status));
stdx::lock_guard<Latch> lg(_mutex);
_decision = txn::PrepareVote::kAbort;
@@ -266,7 +270,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
return Future<repl::OpTime>::makeReady(repl::OpTime());
}
- return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision);
+ return txn::persistDecision(
+ *_scheduler, _lsid, _txnNumberAndRetryCounter, *_participants, *_decision);
})
.then([this](repl::OpTime opTime) {
switch (_decision->getDecision()) {
@@ -313,14 +318,18 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
return txn::sendCommit(_serviceContext,
*_scheduler,
_lsid,
- _txnNumber,
+ _txnNumberAndRetryCounter,
apiParams,
*_participants,
*_decision->getCommitTimestamp());
}
case CommitDecision::kAbort: {
- return txn::sendAbort(
- _serviceContext, *_scheduler, _lsid, _txnNumber, apiParams, *_participants);
+ return txn::sendAbort(_serviceContext,
+ *_scheduler,
+ _lsid,
+ _txnNumberAndRetryCounter,
+ apiParams,
+ *_participants);
}
default:
MONGO_UNREACHABLE;
@@ -340,7 +349,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex
_serviceContext->getPreciseClockSource()->now());
}
- return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber);
+ return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumberAndRetryCounter);
})
.getAsync([this, deadlineFuture = std::move(deadlineFuture)](Status s) mutable {
// Interrupt this coordinator's scheduler hierarchy and join the deadline task's future
@@ -418,15 +427,16 @@ void TransactionCoordinator::_done(Status status) {
// *receiving* node was stepping down.
if (status == ErrorCodes::TransactionCoordinatorSteppingDown)
status = Status(ErrorCodes::InterruptedDueToReplStateChange,
- str::stream() << "Coordinator " << _lsid.getId() << ':' << _txnNumber
+ str::stream() << "Coordinator " << _lsid.getId() << ':'
+ << _txnNumberAndRetryCounter.toBSON()
<< " stopped due to: " << status.reason());
LOGV2_DEBUG(22447,
3,
- "{sessionId}:{txnNumber} Two-phase commit completed with {status}",
+ "{sessionId}:{_txnNumberAndRetryCounter} Two-phase commit completed with {status}",
"Two-phase commit completed",
"sessionId"_attr = _lsid.getId(),
- "txnNumber"_attr = _txnNumber,
+ "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter,
"status"_attr = redact(status));
stdx::unique_lock<Latch> ul(_mutex);
@@ -472,7 +482,8 @@ void TransactionCoordinator::_logSlowTwoPhaseCommit(
_lsid.serialize(&lsidBuilder);
lsidBuilder.doneFast();
- parametersBuilder.append("txnNumber", _txnNumber);
+ parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber());
+ parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter());
attrs.add("parameters", parametersBuilder.obj());
@@ -543,7 +554,8 @@ std::string TransactionCoordinator::_twoPhaseCommitInfoForLog(
_lsid.serialize(&lsidBuilder);
lsidBuilder.doneFast();
- parametersBuilder.append("txnNumber", _txnNumber);
+ parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber());
+ parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter());
s << " parameters:" << parametersBuilder.obj().toString();
@@ -616,7 +628,8 @@ void TransactionCoordinator::reportState(BSONObjBuilder& parent) const {
BSONObjBuilder lsidBuilder(doc.subobjStart("lsid"));
_lsid.serialize(&lsidBuilder);
lsidBuilder.doneFast();
- doc.append("txnNumber", _txnNumber);
+ doc.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber());
+ doc.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter());
if (_participants) {
doc.append("numParticipants", static_cast<long long>(_participants->size()));