diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2021-10-11 18:54:09 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-10-11 19:38:20 +0000 |
commit | dccba0661624787d8f398058928b7a29a31a2a86 (patch) | |
tree | 24b66cefce9a3db8bd94441b291235ad83bc789c /src/mongo/db/s/transaction_coordinator.cpp | |
parent | 13f0ae71f634409f2e219616ac489b45057d56bb (diff) | |
download | mongo-dccba0661624787d8f398058928b7a29a31a2a86.tar.gz |
SERVER-60323 Make TransactionCoordinator support txnRetryCounter
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator.cpp')
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 73 |
1 files changed, 43 insertions, 30 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 35d7f5f00f8..beaeadac455 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -88,19 +88,21 @@ ExecutorFuture<void> waitForMajorityWithHangFailpoint(ServiceContext* service, } // namespace -TransactionCoordinator::TransactionCoordinator(OperationContext* operationContext, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - std::unique_ptr<txn::AsyncWorkScheduler> scheduler, - Date_t deadline) +TransactionCoordinator::TransactionCoordinator( + OperationContext* operationContext, + const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, + std::unique_ptr<txn::AsyncWorkScheduler> scheduler, + Date_t deadline) : _serviceContext(operationContext->getServiceContext()), _lsid(lsid), - _txnNumber(txnNumber), + _txnNumberAndRetryCounter(txnNumberAndRetryCounter), _scheduler(std::move(scheduler)), _sendPrepareScheduler(_scheduler->makeChildScheduler()), _transactionCoordinatorMetricsObserver( std::make_unique<TransactionCoordinatorMetricsObserver>()), _deadline(deadline) { + invariant(_txnNumberAndRetryCounter.getTxnRetryCounter()); auto apiParams = APIParameters::get(operationContext); auto kickOffCommitPF = makePromiseFuture<void>(); @@ -116,7 +118,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex 1, "TransactionCoordinator deadline reached", "sessionId"_attr = _lsid.getId(), - "txnNumber"_attr = _txnNumber); + "txnNumberAndRetryCounter"_attr = + _txnNumberAndRetryCounter); cancelIfCommitNotYetStarted(); // See the comments for sendPrepare about the purpose of this @@ -167,7 +170,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex } return txn::persistParticipantsList( - *_sendPrepareScheduler, _lsid, _txnNumber, *_participants); + *_sendPrepareScheduler, _lsid, _txnNumberAndRetryCounter, *_participants); }) .then([this](repl::OpTime opTime) { return waitForMajorityWithHangFailpoint( @@ -207,7 +210,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex return txn::sendPrepare(_serviceContext, *_sendPrepareScheduler, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, apiParams, *_participants) .then([this](PrepareVoteConsensus consensus) mutable { @@ -217,14 +220,15 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex } if (_decision->getDecision() == CommitDecision::kCommit) { - LOGV2_DEBUG(22446, - 3, - "{sessionId}:{txnNumber} Advancing cluster time to " - "the commit timestamp {commitTimestamp}", - "Advancing cluster time to the commit timestamp", - "sessionId"_attr = _lsid.getId(), - "txnNumber"_attr = _txnNumber, - "commitTimestamp"_attr = *_decision->getCommitTimestamp()); + LOGV2_DEBUG( + 22446, + 3, + "{sessionId}:{_txnNumberAndRetryCounter} Advancing cluster time to " + "the commit timestamp {commitTimestamp}", + "Advancing cluster time to the commit timestamp", + "sessionId"_attr = _lsid.getId(), + "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter, + "commitTimestamp"_attr = *_decision->getCommitTimestamp()); VectorClockMutable::get(_serviceContext) ->tickClusterTimeTo(LogicalTime(*_decision->getCommitTimestamp())); @@ -232,13 +236,13 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex }); }) .onError<ErrorCodes::TransactionCoordinatorReachedAbortDecision>( - [this, lsid, txnNumber](const Status& status) { + [this, lsid, txnNumberAndRetryCounter](const Status& status) { // Timeout happened, propagate the decision to abort the transaction to replicas // and convert the internal error code to the public one. LOGV2(5047001, "Transaction coordinator made abort decision", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "status"_attr = redact(status)); stdx::lock_guard<Latch> lg(_mutex); _decision = txn::PrepareVote::kAbort; @@ -266,7 +270,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex return Future<repl::OpTime>::makeReady(repl::OpTime()); } - return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision); + return txn::persistDecision( + *_scheduler, _lsid, _txnNumberAndRetryCounter, *_participants, *_decision); }) .then([this](repl::OpTime opTime) { switch (_decision->getDecision()) { @@ -313,14 +318,18 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex return txn::sendCommit(_serviceContext, *_scheduler, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, apiParams, *_participants, *_decision->getCommitTimestamp()); } case CommitDecision::kAbort: { - return txn::sendAbort( - _serviceContext, *_scheduler, _lsid, _txnNumber, apiParams, *_participants); + return txn::sendAbort(_serviceContext, + *_scheduler, + _lsid, + _txnNumberAndRetryCounter, + apiParams, + *_participants); } default: MONGO_UNREACHABLE; @@ -340,7 +349,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex _serviceContext->getPreciseClockSource()->now()); } - return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber); + return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumberAndRetryCounter); }) .getAsync([this, deadlineFuture = std::move(deadlineFuture)](Status s) mutable { // Interrupt this coordinator's scheduler hierarchy and join the deadline task's future @@ -418,15 +427,16 @@ void TransactionCoordinator::_done(Status status) { // *receiving* node was stepping down. if (status == ErrorCodes::TransactionCoordinatorSteppingDown) status = Status(ErrorCodes::InterruptedDueToReplStateChange, - str::stream() << "Coordinator " << _lsid.getId() << ':' << _txnNumber + str::stream() << "Coordinator " << _lsid.getId() << ':' + << _txnNumberAndRetryCounter.toBSON() << " stopped due to: " << status.reason()); LOGV2_DEBUG(22447, 3, - "{sessionId}:{txnNumber} Two-phase commit completed with {status}", + "{sessionId}:{_txnNumberAndRetryCounter} Two-phase commit completed with {status}", "Two-phase commit completed", "sessionId"_attr = _lsid.getId(), - "txnNumber"_attr = _txnNumber, + "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter, "status"_attr = redact(status)); stdx::unique_lock<Latch> ul(_mutex); @@ -472,7 +482,8 @@ void TransactionCoordinator::_logSlowTwoPhaseCommit( _lsid.serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", _txnNumber); + parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber()); + parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter()); attrs.add("parameters", parametersBuilder.obj()); @@ -543,7 +554,8 @@ std::string TransactionCoordinator::_twoPhaseCommitInfoForLog( _lsid.serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", _txnNumber); + parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber()); + parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter()); s << " parameters:" << parametersBuilder.obj().toString(); @@ -616,7 +628,8 @@ void TransactionCoordinator::reportState(BSONObjBuilder& parent) const { BSONObjBuilder lsidBuilder(doc.subobjStart("lsid")); _lsid.serialize(&lsidBuilder); lsidBuilder.doneFast(); - doc.append("txnNumber", _txnNumber); + doc.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber()); + doc.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter()); if (_participants) { doc.append("numParticipants", static_cast<long long>(_participants->size())); |