From dccba0661624787d8f398058928b7a29a31a2a86 Mon Sep 17 00:00:00 2001 From: Cheahuychou Mao Date: Mon, 11 Oct 2021 18:54:09 +0000 Subject: SERVER-60323 Make TransactionCoordinator support txnRetryCounter --- src/mongo/db/s/transaction_coordinator.cpp | 73 ++++++++++++++++++------------ 1 file changed, 43 insertions(+), 30 deletions(-) (limited to 'src/mongo/db/s/transaction_coordinator.cpp') diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 35d7f5f00f8..beaeadac455 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -88,19 +88,21 @@ ExecutorFuture waitForMajorityWithHangFailpoint(ServiceContext* service, } // namespace -TransactionCoordinator::TransactionCoordinator(OperationContext* operationContext, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - std::unique_ptr scheduler, - Date_t deadline) +TransactionCoordinator::TransactionCoordinator( + OperationContext* operationContext, + const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, + std::unique_ptr scheduler, + Date_t deadline) : _serviceContext(operationContext->getServiceContext()), _lsid(lsid), - _txnNumber(txnNumber), + _txnNumberAndRetryCounter(txnNumberAndRetryCounter), _scheduler(std::move(scheduler)), _sendPrepareScheduler(_scheduler->makeChildScheduler()), _transactionCoordinatorMetricsObserver( std::make_unique()), _deadline(deadline) { + invariant(_txnNumberAndRetryCounter.getTxnRetryCounter()); auto apiParams = APIParameters::get(operationContext); auto kickOffCommitPF = makePromiseFuture(); @@ -116,7 +118,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex 1, "TransactionCoordinator deadline reached", "sessionId"_attr = _lsid.getId(), - "txnNumber"_attr = _txnNumber); + "txnNumberAndRetryCounter"_attr = + _txnNumberAndRetryCounter); cancelIfCommitNotYetStarted(); // See the comments for sendPrepare about the purpose of this @@ -167,7 +170,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex } return txn::persistParticipantsList( - *_sendPrepareScheduler, _lsid, _txnNumber, *_participants); + *_sendPrepareScheduler, _lsid, _txnNumberAndRetryCounter, *_participants); }) .then([this](repl::OpTime opTime) { return waitForMajorityWithHangFailpoint( @@ -207,7 +210,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex return txn::sendPrepare(_serviceContext, *_sendPrepareScheduler, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, apiParams, *_participants) .then([this](PrepareVoteConsensus consensus) mutable { @@ -217,14 +220,15 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex } if (_decision->getDecision() == CommitDecision::kCommit) { - LOGV2_DEBUG(22446, - 3, - "{sessionId}:{txnNumber} Advancing cluster time to " - "the commit timestamp {commitTimestamp}", - "Advancing cluster time to the commit timestamp", - "sessionId"_attr = _lsid.getId(), - "txnNumber"_attr = _txnNumber, - "commitTimestamp"_attr = *_decision->getCommitTimestamp()); + LOGV2_DEBUG( + 22446, + 3, + "{sessionId}:{_txnNumberAndRetryCounter} Advancing cluster time to " + "the commit timestamp {commitTimestamp}", + "Advancing cluster time to the commit timestamp", + "sessionId"_attr = _lsid.getId(), + "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter, + "commitTimestamp"_attr = *_decision->getCommitTimestamp()); VectorClockMutable::get(_serviceContext) ->tickClusterTimeTo(LogicalTime(*_decision->getCommitTimestamp())); @@ -232,13 +236,13 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex }); }) .onError( - [this, lsid, txnNumber](const Status& status) { + [this, lsid, txnNumberAndRetryCounter](const Status& status) { // Timeout happened, propagate the decision to abort the transaction to replicas // and convert the internal error code to the public one. LOGV2(5047001, "Transaction coordinator made abort decision", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "status"_attr = redact(status)); stdx::lock_guard lg(_mutex); _decision = txn::PrepareVote::kAbort; @@ -266,7 +270,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex return Future::makeReady(repl::OpTime()); } - return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision); + return txn::persistDecision( + *_scheduler, _lsid, _txnNumberAndRetryCounter, *_participants, *_decision); }) .then([this](repl::OpTime opTime) { switch (_decision->getDecision()) { @@ -313,14 +318,18 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex return txn::sendCommit(_serviceContext, *_scheduler, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, apiParams, *_participants, *_decision->getCommitTimestamp()); } case CommitDecision::kAbort: { - return txn::sendAbort( - _serviceContext, *_scheduler, _lsid, _txnNumber, apiParams, *_participants); + return txn::sendAbort(_serviceContext, + *_scheduler, + _lsid, + _txnNumberAndRetryCounter, + apiParams, + *_participants); } default: MONGO_UNREACHABLE; @@ -340,7 +349,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex _serviceContext->getPreciseClockSource()->now()); } - return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber); + return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumberAndRetryCounter); }) .getAsync([this, deadlineFuture = std::move(deadlineFuture)](Status s) mutable { // Interrupt this coordinator's scheduler hierarchy and join the deadline task's future @@ -418,15 +427,16 @@ void TransactionCoordinator::_done(Status status) { // *receiving* node was stepping down. if (status == ErrorCodes::TransactionCoordinatorSteppingDown) status = Status(ErrorCodes::InterruptedDueToReplStateChange, - str::stream() << "Coordinator " << _lsid.getId() << ':' << _txnNumber + str::stream() << "Coordinator " << _lsid.getId() << ':' + << _txnNumberAndRetryCounter.toBSON() << " stopped due to: " << status.reason()); LOGV2_DEBUG(22447, 3, - "{sessionId}:{txnNumber} Two-phase commit completed with {status}", + "{sessionId}:{_txnNumberAndRetryCounter} Two-phase commit completed with {status}", "Two-phase commit completed", "sessionId"_attr = _lsid.getId(), - "txnNumber"_attr = _txnNumber, + "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter, "status"_attr = redact(status)); stdx::unique_lock ul(_mutex); @@ -472,7 +482,8 @@ void TransactionCoordinator::_logSlowTwoPhaseCommit( _lsid.serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", _txnNumber); + parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber()); + parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter()); attrs.add("parameters", parametersBuilder.obj()); @@ -543,7 +554,8 @@ std::string TransactionCoordinator::_twoPhaseCommitInfoForLog( _lsid.serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", _txnNumber); + parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber()); + parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter()); s << " parameters:" << parametersBuilder.obj().toString(); @@ -616,7 +628,8 @@ void TransactionCoordinator::reportState(BSONObjBuilder& parent) const { BSONObjBuilder lsidBuilder(doc.subobjStart("lsid")); _lsid.serialize(&lsidBuilder); lsidBuilder.doneFast(); - doc.append("txnNumber", _txnNumber); + doc.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber()); + doc.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter()); if (_participants) { doc.append("numParticipants", static_cast(_participants->size())); -- cgit v1.2.1