diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/logical_session_id.h | 30 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator.h | 12 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_catalog_test.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test.cpp | 568 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_test_fixture.h | 1 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_util.cpp | 290 | ||||
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_util.h | 44 |
10 files changed, 709 insertions, 333 deletions
diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h index 3270eece38e..8e04e774a40 100644 --- a/src/mongo/db/logical_session_id.h +++ b/src/mongo/db/logical_session_id.h @@ -132,4 +132,34 @@ using LogicalSessionRecordSet = stdx::unordered_set<LogicalSessionRecord, Logica template <typename T> using LogicalSessionIdMap = stdx::unordered_map<LogicalSessionId, T, LogicalSessionIdHash>; +class TxnNumberAndRetryCounter { +public: + TxnNumberAndRetryCounter(TxnNumber txnNumber, TxnRetryCounter txnRetryCounter) + : _txnNumber(txnNumber), _txnRetryCounter(txnRetryCounter) {} + + TxnNumberAndRetryCounter(TxnNumber txnNumber) + : _txnNumber(txnNumber), _txnRetryCounter(boost::none) {} + + BSONObj toBSON() const { + BSONObjBuilder bob; + bob.append(OperationSessionInfo::kTxnNumberFieldName, _txnNumber); + if (_txnRetryCounter) { + bob.append(OperationSessionInfo::kTxnRetryCounterFieldName, *_txnRetryCounter); + } + return bob.obj(); + } + + const TxnNumber getTxnNumber() const { + return _txnNumber; + } + + const boost::optional<TxnRetryCounter> getTxnRetryCounter() const { + return _txnRetryCounter; + } + +private: + const TxnNumber _txnNumber; + const boost::optional<TxnRetryCounter> _txnRetryCounter; +}; + } // namespace mongo diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 39d418721d5..3a51bcbf981 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -178,6 +178,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/server_status', '$BUILD_DIR/mongo/db/commands/txn_cmd_request', '$BUILD_DIR/mongo/db/dbdirectclient', + '$BUILD_DIR/mongo/db/internal_transactions_feature_flag', '$BUILD_DIR/mongo/db/not_primary_error_tracker', '$BUILD_DIR/mongo/db/repl/wait_for_majority_service', '$BUILD_DIR/mongo/db/rw_concern_d', diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp index 35d7f5f00f8..beaeadac455 100644 --- a/src/mongo/db/s/transaction_coordinator.cpp +++ b/src/mongo/db/s/transaction_coordinator.cpp @@ -88,19 +88,21 @@ ExecutorFuture<void> waitForMajorityWithHangFailpoint(ServiceContext* service, } // namespace -TransactionCoordinator::TransactionCoordinator(OperationContext* operationContext, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - std::unique_ptr<txn::AsyncWorkScheduler> scheduler, - Date_t deadline) +TransactionCoordinator::TransactionCoordinator( + OperationContext* operationContext, + const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, + std::unique_ptr<txn::AsyncWorkScheduler> scheduler, + Date_t deadline) : _serviceContext(operationContext->getServiceContext()), _lsid(lsid), - _txnNumber(txnNumber), + _txnNumberAndRetryCounter(txnNumberAndRetryCounter), _scheduler(std::move(scheduler)), _sendPrepareScheduler(_scheduler->makeChildScheduler()), _transactionCoordinatorMetricsObserver( std::make_unique<TransactionCoordinatorMetricsObserver>()), _deadline(deadline) { + invariant(_txnNumberAndRetryCounter.getTxnRetryCounter()); auto apiParams = APIParameters::get(operationContext); auto kickOffCommitPF = makePromiseFuture<void>(); @@ -116,7 +118,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex 1, "TransactionCoordinator deadline reached", "sessionId"_attr = _lsid.getId(), - "txnNumber"_attr = _txnNumber); + "txnNumberAndRetryCounter"_attr = + _txnNumberAndRetryCounter); cancelIfCommitNotYetStarted(); // See the comments for sendPrepare about the purpose of this @@ -167,7 +170,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex } return txn::persistParticipantsList( - *_sendPrepareScheduler, _lsid, _txnNumber, *_participants); + *_sendPrepareScheduler, _lsid, _txnNumberAndRetryCounter, *_participants); }) .then([this](repl::OpTime opTime) { return waitForMajorityWithHangFailpoint( @@ -207,7 +210,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex return txn::sendPrepare(_serviceContext, *_sendPrepareScheduler, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, apiParams, *_participants) .then([this](PrepareVoteConsensus consensus) mutable { @@ -217,14 +220,15 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex } if (_decision->getDecision() == CommitDecision::kCommit) { - LOGV2_DEBUG(22446, - 3, - "{sessionId}:{txnNumber} Advancing cluster time to " - "the commit timestamp {commitTimestamp}", - "Advancing cluster time to the commit timestamp", - "sessionId"_attr = _lsid.getId(), - "txnNumber"_attr = _txnNumber, - "commitTimestamp"_attr = *_decision->getCommitTimestamp()); + LOGV2_DEBUG( + 22446, + 3, + "{sessionId}:{_txnNumberAndRetryCounter} Advancing cluster time to " + "the commit timestamp {commitTimestamp}", + "Advancing cluster time to the commit timestamp", + "sessionId"_attr = _lsid.getId(), + "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter, + "commitTimestamp"_attr = *_decision->getCommitTimestamp()); VectorClockMutable::get(_serviceContext) ->tickClusterTimeTo(LogicalTime(*_decision->getCommitTimestamp())); @@ -232,13 +236,13 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex }); }) .onError<ErrorCodes::TransactionCoordinatorReachedAbortDecision>( - [this, lsid, txnNumber](const Status& status) { + [this, lsid, txnNumberAndRetryCounter](const Status& status) { // Timeout happened, propagate the decision to abort the transaction to replicas // and convert the internal error code to the public one. LOGV2(5047001, "Transaction coordinator made abort decision", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "status"_attr = redact(status)); stdx::lock_guard<Latch> lg(_mutex); _decision = txn::PrepareVote::kAbort; @@ -266,7 +270,8 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex return Future<repl::OpTime>::makeReady(repl::OpTime()); } - return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision); + return txn::persistDecision( + *_scheduler, _lsid, _txnNumberAndRetryCounter, *_participants, *_decision); }) .then([this](repl::OpTime opTime) { switch (_decision->getDecision()) { @@ -313,14 +318,18 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex return txn::sendCommit(_serviceContext, *_scheduler, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, apiParams, *_participants, *_decision->getCommitTimestamp()); } case CommitDecision::kAbort: { - return txn::sendAbort( - _serviceContext, *_scheduler, _lsid, _txnNumber, apiParams, *_participants); + return txn::sendAbort(_serviceContext, + *_scheduler, + _lsid, + _txnNumberAndRetryCounter, + apiParams, + *_participants); } default: MONGO_UNREACHABLE; @@ -340,7 +349,7 @@ TransactionCoordinator::TransactionCoordinator(OperationContext* operationContex _serviceContext->getPreciseClockSource()->now()); } - return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumber); + return txn::deleteCoordinatorDoc(*_scheduler, _lsid, _txnNumberAndRetryCounter); }) .getAsync([this, deadlineFuture = std::move(deadlineFuture)](Status s) mutable { // Interrupt this coordinator's scheduler hierarchy and join the deadline task's future @@ -418,15 +427,16 @@ void TransactionCoordinator::_done(Status status) { // *receiving* node was stepping down. if (status == ErrorCodes::TransactionCoordinatorSteppingDown) status = Status(ErrorCodes::InterruptedDueToReplStateChange, - str::stream() << "Coordinator " << _lsid.getId() << ':' << _txnNumber + str::stream() << "Coordinator " << _lsid.getId() << ':' + << _txnNumberAndRetryCounter.toBSON() << " stopped due to: " << status.reason()); LOGV2_DEBUG(22447, 3, - "{sessionId}:{txnNumber} Two-phase commit completed with {status}", + "{sessionId}:{_txnNumberAndRetryCounter} Two-phase commit completed with {status}", "Two-phase commit completed", "sessionId"_attr = _lsid.getId(), - "txnNumber"_attr = _txnNumber, + "txnNumberAndRetryCounter"_attr = _txnNumberAndRetryCounter, "status"_attr = redact(status)); stdx::unique_lock<Latch> ul(_mutex); @@ -472,7 +482,8 @@ void TransactionCoordinator::_logSlowTwoPhaseCommit( _lsid.serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", _txnNumber); + parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber()); + parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter()); attrs.add("parameters", parametersBuilder.obj()); @@ -543,7 +554,8 @@ std::string TransactionCoordinator::_twoPhaseCommitInfoForLog( _lsid.serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", _txnNumber); + parametersBuilder.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber()); + parametersBuilder.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter()); s << " parameters:" << parametersBuilder.obj().toString(); @@ -616,7 +628,8 @@ void TransactionCoordinator::reportState(BSONObjBuilder& parent) const { BSONObjBuilder lsidBuilder(doc.subobjStart("lsid")); _lsid.serialize(&lsidBuilder); lsidBuilder.doneFast(); - doc.append("txnNumber", _txnNumber); + doc.append("txnNumber", _txnNumberAndRetryCounter.getTxnNumber()); + doc.append("txnRetryCounter", *_txnNumberAndRetryCounter.getTxnRetryCounter()); if (_participants) { doc.append("numParticipants", static_cast<long long>(_participants->size())); diff --git a/src/mongo/db/s/transaction_coordinator.h b/src/mongo/db/s/transaction_coordinator.h index cdc76424cbb..8adea56aab2 100644 --- a/src/mongo/db/s/transaction_coordinator.h +++ b/src/mongo/db/s/transaction_coordinator.h @@ -40,7 +40,7 @@ class TransactionCoordinatorMetricsObserver; /** * State machine, which implements the two-phase commit protocol for a specific transaction, - * identified by lsid + txnNumber. + * identified by lsid + txnNumber + txnRetryCounter. * * The lifetime of a coordinator starts with a construction and ends with the `onCompletion()` * future getting signaled. It is illegal to destroy a coordinator without waiting for @@ -64,8 +64,8 @@ public: }; /** - * Instantiates a new TransactioncCoordinator for the specified lsid + txnNumber pair and gives - * it a 'scheduler' to use for any asynchronous tasks it spawns. + * Instantiates a new TransactionCoordinator for the specified lsid + txnNumber + + * txnRetryCounter and gives it a 'scheduler' to use for any asynchronous tasks it spawns. * * If the 'coordinateCommitDeadline' parameter is specified, a timed task will be scheduled to * cause the coordinator to be put in a cancelled state, if runCommit is not eventually @@ -73,7 +73,7 @@ public: */ TransactionCoordinator(OperationContext* operationContext, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, std::unique_ptr<txn::AsyncWorkScheduler> scheduler, Date_t deadline); @@ -151,9 +151,9 @@ private: // Shortcut to the service context under which this coordinator runs ServiceContext* const _serviceContext; - // The lsid + transaction number that this coordinator is coordinating + // The lsid + txnNumber + txnRetryCounter that this coordinator is coordinating. const LogicalSessionId _lsid; - const TxnNumber _txnNumber; + const TxnNumberAndRetryCounter _txnNumberAndRetryCounter; // Scheduler and context wrapping all asynchronous work dispatched by this coordinator std::unique_ptr<txn::AsyncWorkScheduler> _scheduler; diff --git a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp index 891191ab220..1044412c91b 100644 --- a/src/mongo/db/s/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_catalog_test.cpp @@ -59,7 +59,7 @@ protected: auto newCoordinator = std::make_shared<TransactionCoordinator>( operationContext(), lsid, - txnNumber, + TxnNumberAndRetryCounter{txnNumber, 0}, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); @@ -172,11 +172,12 @@ TEST_F(TransactionCoordinatorCatalogTest, StepDownBeforeCoordinatorInsertedIntoC TransactionCoordinatorCatalog catalog; catalog.exitStepUp(Status::OK()); - auto coordinator = std::make_shared<TransactionCoordinator>(operationContext(), - lsid, - txnNumber, - aws.makeChildScheduler(), - network()->now() + Seconds{5}); + auto coordinator = + std::make_shared<TransactionCoordinator>(operationContext(), + lsid, + TxnNumberAndRetryCounter{txnNumber, 0}, + aws.makeChildScheduler(), + network()->now() + Seconds{5}); aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Test step down"}); catalog.onStepDown(); diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index 349db0ac079..cbefeb5eb25 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -80,8 +80,12 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, latestCoordinator->cancelIfCommitNotYetStarted(); } - auto coordinator = std::make_shared<TransactionCoordinator>( - opCtx, lsid, txnNumber, scheduler.makeChildScheduler(), commitDeadline); + auto coordinator = + std::make_shared<TransactionCoordinator>(opCtx, + lsid, + TxnNumberAndRetryCounter{txnNumber, 0}, + scheduler.makeChildScheduler(), + commitDeadline); try { catalog.insert(opCtx, lsid, txnNumber, coordinator); @@ -240,7 +244,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, auto coordinator = std::make_shared<TransactionCoordinator>( opCtx, lsid, - txnNumber, + TxnNumberAndRetryCounter{txnNumber, 0}, scheduler.makeChildScheduler(), clockSource->now() + Seconds(gTransactionLifetimeLimitSeconds.load())); diff --git a/src/mongo/db/s/transaction_coordinator_test.cpp b/src/mongo/db/s/transaction_coordinator_test.cpp index 602165fceb2..43fe5c3e48a 100644 --- a/src/mongo/db/s/transaction_coordinator_test.cpp +++ b/src/mongo/db/s/transaction_coordinator_test.cpp @@ -39,6 +39,7 @@ #include "mongo/db/s/transaction_coordinator_document_gen.h" #include "mongo/db/s/transaction_coordinator_metrics_observer.h" #include "mongo/db/s/transaction_coordinator_test_fixture.h" +#include "mongo/idl/server_parameter_test_util.h" #include "mongo/logv2/log.h" #include "mongo/unittest/log_test.h" #include "mongo/unittest/unittest.h" @@ -177,7 +178,8 @@ protected: } LogicalSessionId _lsid{makeLogicalSessionIdForTest()}; - TxnNumber _txnNumber{1}; + TxnNumberAndRetryCounter _txnNumberAndRetryCounter{1, 1}; + RAIIServerParameterControllerForTest _controller{"featureFlagInternalTransactions", true}; }; class TransactionCoordinatorDriverTest : public TransactionCoordinatorTestBase { @@ -194,25 +196,28 @@ protected: boost::optional<txn::AsyncWorkScheduler> _aws; }; -auto makeDummyPrepareCommand(const LogicalSessionId& lsid, const TxnNumber& txnNumber) { +auto makeDummyPrepareCommand(const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { PrepareTransaction prepareCmd; prepareCmd.setDbName(NamespaceString::kAdminDb); auto prepareObj = prepareCmd.toBSON( - BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false - << WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); - + BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumberAndRetryCounter.getTxnNumber() + << "txnRetryCounter" << *txnNumberAndRetryCounter.getTxnRetryCounter() + << "autocommit" << false << WriteConcernOptions::kWriteConcernField + << WriteConcernOptions::Majority)); return prepareObj; } TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOnImmediateSuccess) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = txn::sendDecisionToShard(getServiceContext(), - aws, - _lsid, - _txnNumber, - kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = + txn::sendDecisionToShard(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + kTwoShardIdList[0], + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithSuccess(); @@ -223,12 +228,13 @@ TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsOn TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsSuccessAfterOneFailureAndThenSuccess) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = txn::sendDecisionToShard(getServiceContext(), - aws, - _lsid, - _txnNumber, - kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = + txn::sendDecisionToShard(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + kTwoShardIdList[0], + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithRetryableError(); @@ -242,12 +248,13 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardReturnsSuccessAfterSeveralFailuresAndThenSuccess) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = txn::sendDecisionToShard(getServiceContext(), - aws, - _lsid, - _txnNumber, - kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = + txn::sendDecisionToShard(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + kTwoShardIdList[0], + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithRetryableError(); @@ -260,12 +267,13 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardInterpretsVoteToAbortAsSuccess) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = txn::sendDecisionToShard(getServiceContext(), - aws, - _lsid, - _txnNumber, - kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = + txn::sendDecisionToShard(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + kTwoShardIdList[0], + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -275,12 +283,13 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendDecisionToParticipantShardCanBeInterruptedAndReturnsError) { txn::AsyncWorkScheduler aws(getServiceContext()); - Future<void> future = txn::sendDecisionToShard(getServiceContext(), - aws, - _lsid, - _txnNumber, - kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + Future<void> future = + txn::sendDecisionToShard(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + kTwoShardIdList[0], + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); assertPrepareSentAndRespondWithRetryableError(); aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"}); @@ -296,9 +305,9 @@ TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsCommitDecision txn::sendPrepareToShard(getServiceContext(), aws, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithSuccess(); @@ -315,9 +324,9 @@ TEST_F(TransactionCoordinatorDriverTest, txn::sendPrepareToShard(getServiceContext(), aws, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); ASSERT(!future.isReady()); assertPrepareSentAndRespondWithRetryableError(); @@ -337,9 +346,9 @@ TEST_F(TransactionCoordinatorDriverTest, txn::sendPrepareToShard(getServiceContext(), aws, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); assertPrepareSentAndRespondWithRetryableError(); const auto shutdownStatus = @@ -360,9 +369,9 @@ TEST_F(TransactionCoordinatorDriverTest, txn::sendPrepareToShard(getServiceContext(), aws, _lsid, - _txnNumber, + _txnNumberAndRetryCounter, kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); assertPrepareSentAndRespondWithRetryableError(); aws.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Service shutting down"}); @@ -375,12 +384,13 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsAbortDecisionOnVoteAbortResponse) { txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepareToShard(getServiceContext(), - aws, - _lsid, - _txnNumber, - kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + auto future = + txn::sendPrepareToShard(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + kTwoShardIdList[0], + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -394,12 +404,13 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareToShardReturnsAbortDecisionOnRetryableErrorThenVoteAbortResponse) { txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepareToShard(getServiceContext(), - aws, - _lsid, - _txnNumber, - kTwoShardIdList[0], - makeDummyPrepareCommand(_lsid, _txnNumber)); + auto future = + txn::sendPrepareToShard(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + kTwoShardIdList[0], + makeDummyPrepareCommand(_lsid, _txnNumberAndRetryCounter)); assertPrepareSentAndRespondWithRetryableError(); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -414,8 +425,12 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesAbortAndSecondVotesCommit) { txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepare( - getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList); + auto future = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kTwoShardIdList); onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, [&](const executor::RemoteCommandRequest& request) { return kPrepareOk; }}); @@ -430,8 +445,12 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenFirstParticipantVotesCommitAndSecondVotesAbort) { txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepare( - getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList); + auto future = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(); assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -445,8 +464,12 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsAbortDecisionWhenBothParticipantsVoteAbort) { txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepare( - getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList); + auto future = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kTwoShardIdList); onCommands({[&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }, [&](const executor::RemoteCommandRequest& request) { return kNoSuchTransaction; }}); @@ -463,8 +486,12 @@ TEST_F(TransactionCoordinatorDriverTest, const auto maxPrepareTimestamp = Timestamp(2, 1); txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepare( - getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList); + auto future = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); @@ -481,8 +508,12 @@ TEST_F(TransactionCoordinatorDriverTest, const auto maxPrepareTimestamp = Timestamp(2, 1); txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepare( - getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList); + auto future = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); @@ -499,8 +530,12 @@ TEST_F(TransactionCoordinatorDriverTest, const auto maxPrepareTimestamp = Timestamp(2, 1); txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepare( - getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList); + auto future = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(firstPrepareTimestamp); assertPrepareSentAndRespondWithSuccess(maxPrepareTimestamp); @@ -516,8 +551,12 @@ TEST_F(TransactionCoordinatorDriverTest, const auto timestamp = Timestamp(1, 1); txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepare( - getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList); + auto future = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(timestamp); assertCommandSentAndRespondWith( @@ -533,8 +572,12 @@ TEST_F(TransactionCoordinatorDriverTest, TEST_F(TransactionCoordinatorDriverTest, SendPrepareReturnsErrorWhenOneShardReturnsReadConcernMajorityNotEnabled) { txn::AsyncWorkScheduler aws(getServiceContext()); - auto future = txn::sendPrepare( - getServiceContext(), aws, _lsid, _txnNumber, APIParameters(), kTwoShardIdList); + auto future = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kTwoShardIdList); assertPrepareSentAndRespondWithSuccess(Timestamp(100, 1)); assertCommandSentAndRespondWith( @@ -550,6 +593,95 @@ TEST_F(TransactionCoordinatorDriverTest, ASSERT_EQ(ErrorCodes::ReadConcernMajorityNotEnabled, decision.getAbortStatus()->code()); } +TEST_F(TransactionCoordinatorDriverTest, + SendPrepareAndDecisionAttachTxnRetryCounterIfFeatureFlagIsEnabled) { + txn::AsyncWorkScheduler aws(getServiceContext()); + auto prepareFuture = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kOneShardIdList); + onCommands({[&](const executor::RemoteCommandRequest& request) { + ASSERT_TRUE(request.cmdObj.hasField("txnRetryCounter")); + ASSERT_EQUALS(request.cmdObj.getIntField("txnRetryCounter"), + *_txnNumberAndRetryCounter.getTxnRetryCounter()); + return kNoSuchTransaction; + }}); + prepareFuture.get(); + + auto commitFuture = txn::sendCommit(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kOneShardIdList, + {}); + onCommands({[&](const executor::RemoteCommandRequest& request) { + ASSERT_TRUE(request.cmdObj.hasField("txnRetryCounter")); + ASSERT_EQUALS(request.cmdObj.getIntField("txnRetryCounter"), + *_txnNumberAndRetryCounter.getTxnRetryCounter()); + return kNoSuchTransaction; + }}); + commitFuture.get(); + + auto abortFuture = txn::sendAbort(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kOneShardIdList); + onCommands({[&](const executor::RemoteCommandRequest& request) { + ASSERT_TRUE(request.cmdObj.hasField("txnRetryCounter")); + ASSERT_EQUALS(request.cmdObj.getIntField("txnRetryCounter"), + *_txnNumberAndRetryCounter.getTxnRetryCounter()); + return kNoSuchTransaction; + }}); + abortFuture.get(); +} + +TEST_F(TransactionCoordinatorDriverTest, + SendPrepareAndDecisionDoesNotAttachTxnRetryCounterIfFeatureFlagIsNotEnabled) { + RAIIServerParameterControllerForTest controller{"featureFlagInternalTransactions", false}; + txn::AsyncWorkScheduler aws(getServiceContext()); + auto prepareFuture = txn::sendPrepare(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kOneShardIdList); + onCommands({[&](const executor::RemoteCommandRequest& request) { + ASSERT_FALSE(request.cmdObj.hasField("txnRetryCounter")); + return kNoSuchTransaction; + }}); + prepareFuture.get(); + + auto commitFuture = txn::sendCommit(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kOneShardIdList, + {}); + onCommands({[&](const executor::RemoteCommandRequest& request) { + ASSERT_FALSE(request.cmdObj.hasField("txnRetryCounter")); + return kNoSuchTransaction; + }}); + commitFuture.get(); + + auto abortFuture = txn::sendAbort(getServiceContext(), + aws, + _lsid, + _txnNumberAndRetryCounter, + APIParameters(), + kOneShardIdList); + onCommands({[&](const executor::RemoteCommandRequest& request) { + ASSERT_FALSE(request.cmdObj.hasField("txnRetryCounter")); + return kNoSuchTransaction; + }}); + abortFuture.get(); +} + class TransactionCoordinatorDriverPersistenceTest : public TransactionCoordinatorDriverTest { protected: void setUp() override { @@ -565,14 +697,17 @@ protected: static void assertDocumentMatches( TransactionCoordinatorDocument doc, LogicalSessionId expectedLsid, - TxnNumber expectedTxnNum, + TxnNumberAndRetryCounter expectedTxnNumberAndRetryCounter, std::vector<ShardId> expectedParticipants, boost::optional<txn::CommitDecision> expectedDecision = boost::none, boost::optional<Timestamp> expectedCommitTimestamp = boost::none) { ASSERT(doc.getId().getSessionId()); ASSERT_EQUALS(*doc.getId().getSessionId(), expectedLsid); ASSERT(doc.getId().getTxnNumber()); - ASSERT_EQUALS(*doc.getId().getTxnNumber(), expectedTxnNum); + ASSERT_EQUALS(*doc.getId().getTxnNumber(), expectedTxnNumberAndRetryCounter.getTxnNumber()); + ASSERT(doc.getId().getTxnRetryCounter()); + ASSERT_EQUALS(*doc.getId().getTxnRetryCounter(), + *expectedTxnNumberAndRetryCounter.getTxnRetryCounter()); ASSERT(doc.getParticipants() == expectedParticipants); @@ -593,23 +728,23 @@ protected: void persistParticipantListExpectSuccess(OperationContext* opCtx, LogicalSessionId lsid, - TxnNumber txnNumber, + TxnNumberAndRetryCounter txnNumberAndRetryCounter, const std::vector<ShardId>& participants) { - txn::persistParticipantsList(*_aws, lsid, txnNumber, participants).get(); + txn::persistParticipantsList(*_aws, lsid, txnNumberAndRetryCounter, participants).get(); auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); - assertDocumentMatches(allCoordinatorDocs[0], lsid, txnNumber, participants); + assertDocumentMatches(allCoordinatorDocs[0], lsid, txnNumberAndRetryCounter, participants); } void persistDecisionExpectSuccess(OperationContext* opCtx, LogicalSessionId lsid, - TxnNumber txnNumber, + TxnNumberAndRetryCounter txnNumberAndRetryCounter, const std::vector<ShardId>& participants, const boost::optional<Timestamp>& commitTimestamp) { txn::persistDecision(*_aws, lsid, - txnNumber, + txnNumberAndRetryCounter, participants, [&] { txn::CoordinatorCommitDecision decision; @@ -630,20 +765,23 @@ protected: if (commitTimestamp) { assertDocumentMatches(allCoordinatorDocs[0], lsid, - txnNumber, + txnNumberAndRetryCounter, participants, txn::CommitDecision::kCommit, *commitTimestamp); } else { - assertDocumentMatches( - allCoordinatorDocs[0], lsid, txnNumber, participants, txn::CommitDecision::kAbort); + assertDocumentMatches(allCoordinatorDocs[0], + lsid, + txnNumberAndRetryCounter, + participants, + txn::CommitDecision::kAbort); } } void deleteCoordinatorDocExpectSuccess(OperationContext* opCtx, LogicalSessionId lsid, - TxnNumber txnNumber) { - txn::deleteCoordinatorDoc(*_aws, lsid, txnNumber).get(); + TxnNumberAndRetryCounter txnNumberAndRetryCounter) { + txn::deleteCoordinatorDoc(*_aws, lsid, txnNumberAndRetryCounter).get(); auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(0)); @@ -659,13 +797,16 @@ protected: TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListWhenNoDocumentForTransactionExistsSucceeds) { - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); } TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListWhenMatchingDocumentForTransactionExistsSucceeds) { - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); } TEST_F(TransactionCoordinatorDriverPersistenceTest, @@ -673,12 +814,13 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, auto opCtx = operationContext(); std::vector<ShardId> participants{ ShardId("shard0001"), ShardId("shard0002"), ShardId("shard0003")}; - persistParticipantListExpectSuccess(opCtx, _lsid, _txnNumber, participants); + persistParticipantListExpectSuccess(opCtx, _lsid, _txnNumberAndRetryCounter, participants); // We should retry until shutdown. The original participants should be persisted. std::vector<ShardId> smallerParticipantList{ShardId("shard0001"), ShardId("shard0002")}; - auto future = txn::persistParticipantsList(*_aws, _lsid, _txnNumber, smallerParticipantList); + auto future = txn::persistParticipantsList( + *_aws, _lsid, _txnNumberAndRetryCounter, smallerParticipantList); _aws->shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, "Shutdown for test"}); advanceClockAndExecuteScheduledTasks(); @@ -687,24 +829,37 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, auto allCoordinatorDocs = txn::readAllCoordinatorDocs(opCtx); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); - assertDocumentMatches(allCoordinatorDocs[0], _lsid, _txnNumber, participants); + assertDocumentMatches(allCoordinatorDocs[0], _lsid, _txnNumberAndRetryCounter, participants); } TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMultipleTransactionsOnSameSession) { for (int i = 1; i <= 3; i++) { - auto txnNumber = TxnNumber{i}; - txn::persistParticipantsList(*_aws, _lsid, txnNumber, _participants).get(); + txn::persistParticipantsList( + *_aws, _lsid, {i, *_txnNumberAndRetryCounter.getTxnRetryCounter()}, _participants) + .get(); auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i)); } } +TEST_F(TransactionCoordinatorDriverPersistenceTest, + PersistParticipantListForOneTransactionMultipleTxnRetryCountersOnSameSession) { + const auto numRetries = 3; + for (int i = 1; i <= numRetries; i++) { + txn::persistParticipantsList( + *_aws, _lsid, {_txnNumberAndRetryCounter.getTxnNumber(), i}, _participants) + .get(); + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i)); + } +} + TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMultipleSessions) { for (int i = 1; i <= 3; i++) { auto lsid = makeLogicalSessionIdForTest(); - txn::persistParticipantsList(*_aws, lsid, _txnNumber, _participants).get(); + txn::persistParticipantsList(*_aws, lsid, _txnNumberAndRetryCounter, _participants).get(); auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(i)); @@ -713,19 +868,29 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistParticipantListForMul TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistAbortDecisionWhenDocumentExistsWithoutDecisionSucceeds) { - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); - persistDecisionExpectSuccess( - operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); + persistDecisionExpectSuccess(operationContext(), + _lsid, + _txnNumberAndRetryCounter, + _participants, + boost::none /* abort */); } TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistAbortDecisionWhenDocumentExistsWithSameDecisionSucceeds) { - - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); - persistDecisionExpectSuccess( - operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */); - persistDecisionExpectSuccess( - operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); + persistDecisionExpectSuccess(operationContext(), + _lsid, + _txnNumberAndRetryCounter, + _participants, + boost::none /* abort */); + persistDecisionExpectSuccess(operationContext(), + _lsid, + _txnNumberAndRetryCounter, + _participants, + boost::none /* abort */); } TEST_F(TransactionCoordinatorDriverPersistenceTest, @@ -734,7 +899,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, { FailPointEnableBlock failpoint("hangBeforeWritingDecision"); - future = txn::persistDecision(*_aws, _lsid, _txnNumber, _participants, [&] { + future = txn::persistDecision(*_aws, _lsid, _txnNumberAndRetryCounter, _participants, [&] { txn::CoordinatorCommitDecision decision(txn::CommitDecision::kCommit); decision.setCommitTimestamp(_commitTimestamp); return decision; @@ -749,56 +914,108 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistCommitDecisionWhenDocumentExistsWithoutDecisionSucceeds) { - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); - persistDecisionExpectSuccess( - operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); + persistDecisionExpectSuccess(operationContext(), + _lsid, + _txnNumberAndRetryCounter, + _participants, + _commitTimestamp /* commit */); } TEST_F(TransactionCoordinatorDriverPersistenceTest, PersistCommitDecisionWhenDocumentExistsWithSameDecisionSucceeds) { - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); - persistDecisionExpectSuccess( - operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */); - persistDecisionExpectSuccess( - operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); + persistDecisionExpectSuccess(operationContext(), + _lsid, + _txnNumberAndRetryCounter, + _participants, + _commitTimestamp /* commit */); + persistDecisionExpectSuccess(operationContext(), + _lsid, + _txnNumberAndRetryCounter, + _participants, + _commitTimestamp /* commit */); } TEST_F(TransactionCoordinatorDriverPersistenceTest, DeleteCoordinatorDocWhenNoDocumentExistsFails) { + ASSERT_THROWS_CODE(txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumberAndRetryCounter).get(), + AssertionException, + 51027); +} + +TEST_F(TransactionCoordinatorDriverPersistenceTest, + DeleteCoordinatorDocWhenDocumentExistsWithDifferentTxnNumberFails) { + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); + ASSERT_THROWS_CODE(txn::deleteCoordinatorDoc(*_aws, + _lsid, + {_txnNumberAndRetryCounter.getTxnNumber() + 1, + *_txnNumberAndRetryCounter.getTxnRetryCounter()}) + .get(), + AssertionException, + 51027); +} + +TEST_F(TransactionCoordinatorDriverPersistenceTest, + DeleteCoordinatorDocWhenDocumentExistsWithDifferentTxnRetryCounterFails) { + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); ASSERT_THROWS_CODE( - txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumber).get(), AssertionException, 51027); + txn::deleteCoordinatorDoc(*_aws, + _lsid, + {_txnNumberAndRetryCounter.getTxnNumber(), + *_txnNumberAndRetryCounter.getTxnRetryCounter() + 1}) + .get(), + AssertionException, + 51027); } TEST_F(TransactionCoordinatorDriverPersistenceTest, DeleteCoordinatorDocWhenDocumentExistsWithoutDecisionFails) { - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); - ASSERT_THROWS_CODE( - txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumber).get(), AssertionException, 51027); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); + ASSERT_THROWS_CODE(txn::deleteCoordinatorDoc(*_aws, _lsid, _txnNumberAndRetryCounter).get(), + AssertionException, + 51027); } TEST_F(TransactionCoordinatorDriverPersistenceTest, DeleteCoordinatorDocWhenDocumentExistsWithAbortDecisionSucceeds) { - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); - persistDecisionExpectSuccess( - operationContext(), _lsid, _txnNumber, _participants, boost::none /* abort */); - deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumber); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); + persistDecisionExpectSuccess(operationContext(), + _lsid, + _txnNumberAndRetryCounter, + _participants, + boost::none /* abort */); + deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumberAndRetryCounter); } TEST_F(TransactionCoordinatorDriverPersistenceTest, DeleteCoordinatorDocWhenDocumentExistsWithCommitDecisionSucceeds) { - persistParticipantListExpectSuccess(operationContext(), _lsid, _txnNumber, _participants); - persistDecisionExpectSuccess( - operationContext(), _lsid, _txnNumber, _participants, _commitTimestamp /* commit */); - deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumber); + persistParticipantListExpectSuccess( + operationContext(), _lsid, _txnNumberAndRetryCounter, _participants); + persistDecisionExpectSuccess(operationContext(), + _lsid, + _txnNumberAndRetryCounter, + _participants, + _commitTimestamp /* commit */); + deleteCoordinatorDocExpectSuccess(operationContext(), _lsid, _txnNumberAndRetryCounter); } TEST_F(TransactionCoordinatorDriverPersistenceTest, - MultipleCommitDecisionsPersistedAndDeleteOneSuccessfullyRemovesCorrectDecision) { - const auto txnNumber1 = TxnNumber{4}; - const auto txnNumber2 = TxnNumber{5}; + MultipleTxnNumbersCommitDecisionsPersistedAndDeleteOneSuccessfullyRemovesCorrectDecision) { + const TxnNumberAndRetryCounter txnNumberAndRetryCounter1{ + _txnNumberAndRetryCounter.getTxnNumber(), *_txnNumberAndRetryCounter.getTxnRetryCounter()}; + const TxnNumberAndRetryCounter txnNumberAndRetryCounter2{ + _txnNumberAndRetryCounter.getTxnNumber() + 1, + *_txnNumberAndRetryCounter.getTxnRetryCounter()}; // Insert coordinator documents for two transactions. - txn::persistParticipantsList(*_aws, _lsid, txnNumber1, _participants).get(); - txn::persistParticipantsList(*_aws, _lsid, txnNumber2, _participants).get(); + txn::persistParticipantsList(*_aws, _lsid, txnNumberAndRetryCounter1, _participants).get(); + txn::persistParticipantsList(*_aws, _lsid, txnNumberAndRetryCounter2, _participants).get(); auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(2)); @@ -807,7 +1024,7 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, // document still exists. txn::persistDecision(*_aws, _lsid, - txnNumber1, + txnNumberAndRetryCounter1, _participants, [&] { txn::CoordinatorCommitDecision decision(txn::CommitDecision::kAbort); @@ -816,13 +1033,48 @@ TEST_F(TransactionCoordinatorDriverPersistenceTest, return decision; }()) .get(); - txn::deleteCoordinatorDoc(*_aws, _lsid, txnNumber1).get(); + txn::deleteCoordinatorDoc(*_aws, _lsid, txnNumberAndRetryCounter1).get(); allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); - assertDocumentMatches(allCoordinatorDocs[0], _lsid, txnNumber2, _participants); + assertDocumentMatches(allCoordinatorDocs[0], _lsid, txnNumberAndRetryCounter2, _participants); } +TEST_F( + TransactionCoordinatorDriverPersistenceTest, + MultipleTxnRetryCountersCommitDecisionsPersistedAndDeleteOneSuccessfullyRemovesCorrectDecision) { + const TxnNumberAndRetryCounter txnNumberAndRetryCounter1{ + _txnNumberAndRetryCounter.getTxnNumber(), *_txnNumberAndRetryCounter.getTxnRetryCounter()}; + const TxnNumberAndRetryCounter txnNumberAndRetryCounter2{ + _txnNumberAndRetryCounter.getTxnNumber(), + *_txnNumberAndRetryCounter.getTxnRetryCounter() + 1}; + + // Insert coordinator documents for two transactions. + txn::persistParticipantsList(*_aws, _lsid, txnNumberAndRetryCounter1, _participants).get(); + txn::persistParticipantsList(*_aws, _lsid, txnNumberAndRetryCounter2, _participants).get(); + + auto allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(2)); + + // Delete the document for the first transaction and check that only the second transaction's + // document still exists. + txn::persistDecision(*_aws, + _lsid, + txnNumberAndRetryCounter1, + _participants, + [&] { + txn::CoordinatorCommitDecision decision(txn::CommitDecision::kAbort); + decision.setAbortStatus( + Status(ErrorCodes::NoSuchTransaction, "Test abort error")); + return decision; + }()) + .get(); + txn::deleteCoordinatorDoc(*_aws, _lsid, txnNumberAndRetryCounter1).get(); + + allCoordinatorDocs = txn::readAllCoordinatorDocs(operationContext()); + ASSERT_EQUALS(allCoordinatorDocs.size(), size_t(1)); + assertDocumentMatches(allCoordinatorDocs[0], _lsid, txnNumberAndRetryCounter2, _participants); +} using TransactionCoordinatorTest = TransactionCoordinatorTestBase; @@ -830,7 +1082,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesCommitDecisionOnTwoCommitRes TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); coordinator.runCommit(operationContext(), kTwoShardIdList); @@ -852,7 +1104,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnAbortAndCommi TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); coordinator.runCommit(operationContext(), kTwoShardIdList); @@ -875,7 +1127,7 @@ TEST_F(TransactionCoordinatorTest, TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); coordinator.runCommit(operationContext(), kTwoShardIdList); @@ -898,7 +1150,7 @@ TEST_F(TransactionCoordinatorTest, TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); coordinator.runCommit(operationContext(), kTwoShardIdList); @@ -921,7 +1173,7 @@ TEST_F(TransactionCoordinatorTest, RunCommitProducesAbortDecisionOnSingleAbortRe TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); coordinator.runCommit(operationContext(), kTwoShardIdList); @@ -944,7 +1196,7 @@ TEST_F(TransactionCoordinatorTest, TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); coordinator.runCommit(operationContext(), kTwoShardIdList); @@ -972,7 +1224,7 @@ TEST_F(TransactionCoordinatorTest, TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); coordinator.runCommit(operationContext(), kTwoShardIdList); @@ -997,7 +1249,7 @@ TEST_F(TransactionCoordinatorTest, TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); coordinator.runCommit(operationContext(), kTwoShardIdList); @@ -1029,7 +1281,7 @@ TEST_F(TransactionCoordinatorTest, TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); coordinator.runCommit(operationContext(), kTwoShardIdList); @@ -1053,7 +1305,6 @@ TEST_F(TransactionCoordinatorTest, ErrorCodes::ReadConcernMajorityNotEnabled); } - class TransactionCoordinatorMetricsTest : public TransactionCoordinatorTestBase { public: void setUp() override { @@ -1265,7 +1516,7 @@ public: TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); @@ -1565,7 +1816,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SimpleTwoPhaseCommitRealCoordinator) { TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); const auto& stats = @@ -1740,7 +1991,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorIsCanceledWhileInactive) { TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); const auto& stats = @@ -1785,7 +2036,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); auto awsPtr = aws.get(); TransactionCoordinator coordinator( - operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -1828,7 +2079,7 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); TransactionCoordinator coordinator( - operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -1890,7 +2141,7 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); auto awsPtr = aws.get(); TransactionCoordinator coordinator( - operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -1954,7 +2205,7 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); TransactionCoordinator coordinator( - operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2022,7 +2273,7 @@ TEST_F(TransactionCoordinatorMetricsTest, auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); auto awsPtr = aws.get(); TransactionCoordinator coordinator( - operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2093,7 +2344,7 @@ TEST_F(TransactionCoordinatorMetricsTest, CoordinatorsAWSIsShutDownWhileCoordina auto aws = std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()); auto awsPtr = aws.get(); TransactionCoordinator coordinator( - operationContext(), _lsid, _txnNumber, std::move(aws), Date_t::max()); + operationContext(), _lsid, _txnNumberAndRetryCounter, std::move(aws), Date_t::max()); const auto& stats = coordinator.getMetricsObserverForTest().getSingleTransactionCoordinatorStats(); @@ -2179,7 +2430,7 @@ TEST_F(TransactionCoordinatorMetricsTest, DoesNotLogTransactionsUnderSlowMSThres TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); @@ -2211,7 +2462,7 @@ TEST_F( TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); @@ -2241,7 +2492,7 @@ TEST_F(TransactionCoordinatorMetricsTest, LogsTransactionsOverSlowMSThreshold) { TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); @@ -2267,8 +2518,9 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesTransactionParamete _lsid.serialize(&lsidBob); ASSERT_EQUALS(1, countBSONFormatLogLinesIsSubset(BSON( - "attr" << BSON("parameters" << BSON("lsid" << lsidBob.obj() << "txnNumber" - << _txnNumber))))); + "attr" << BSON("parameters" << BSON( + "lsid" << lsidBob.obj() << "txnNumber" + << _txnNumberAndRetryCounter.getTxnNumber()))))); } TEST_F(TransactionCoordinatorMetricsTest, @@ -2287,7 +2539,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesTerminationCauseFor TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); @@ -2338,7 +2590,7 @@ TEST_F(TransactionCoordinatorMetricsTest, SlowLogLineIncludesStepDurationsAndTot TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); @@ -2460,7 +2712,7 @@ TEST_F(TransactionCoordinatorMetricsTest, RecoveryFromFailureIndicatedInReportSt TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); @@ -2496,7 +2748,7 @@ TEST_F(TransactionCoordinatorMetricsTest, ClientInformationIncludedInReportState TransactionCoordinator coordinator( operationContext(), _lsid, - _txnNumber, + _txnNumberAndRetryCounter, std::make_unique<txn::AsyncWorkScheduler>(getServiceContext()), Date_t::max()); diff --git a/src/mongo/db/s/transaction_coordinator_test_fixture.h b/src/mongo/db/s/transaction_coordinator_test_fixture.h index 6a60538a9fa..d7d12eb058a 100644 --- a/src/mongo/db/s/transaction_coordinator_test_fixture.h +++ b/src/mongo/db/s/transaction_coordinator_test_fixture.h @@ -72,6 +72,7 @@ protected: */ static void associateClientMetadata(Client* client, std::string appName); + const std::vector<ShardId> kOneShardIdList{{"s1"}}; const std::vector<ShardId> kTwoShardIdList{{"s1"}, {"s2"}}; const std::set<ShardId> kTwoShardIdSet{{"s1"}, {"s2"}}; const std::vector<ShardId> kThreeShardIdList{{"s1"}, {"s2"}, {"s3"}}; diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp index 9e07378295d..bb6832314b7 100644 --- a/src/mongo/db/s/transaction_coordinator_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_util.cpp @@ -38,6 +38,7 @@ #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/internal_transactions_feature_flag_gen.h" #include "mongo/db/ops/write_ops.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/transaction_coordinator_futures_util.h" @@ -102,16 +103,17 @@ bool shouldRetryPersistingCoordinatorState(const StatusWith<T>& responseStatus) } // namespace namespace { -repl::OpTime persistParticipantListBlocking(OperationContext* opCtx, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - const std::vector<ShardId>& participantList) { +repl::OpTime persistParticipantListBlocking( + OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, + const std::vector<ShardId>& participantList) { LOGV2_DEBUG(22463, 3, - "{sessionId}:{txnNumber} Going to write participant list", + "{sessionId}:{txnNumberAndRetryCounter} Going to write participant list", "Going to write participant list", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber); + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter); if (MONGO_unlikely(hangBeforeWritingParticipantList.shouldFail())) { LOGV2(22464, "Hit hangBeforeWritingParticipantList failpoint"); @@ -120,7 +122,11 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx, OperationSessionInfo sessionInfo; sessionInfo.setSessionId(lsid); - sessionInfo.setTxnNumber(txnNumber); + sessionInfo.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber()); + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + sessionInfo.setTxnRetryCounter(*txnNumberAndRetryCounter.getTxnRetryCounter()); + } DBDirectClient client(opCtx); @@ -167,7 +173,7 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx, uasserted(51025, str::stream() << "While attempting to write participant list " << buildParticipantListString(participantList) << " for " - << lsid.getId() << ':' << txnNumber + << lsid.getId() << ':' << txnNumberAndRetryCounter.toBSON() << ", found document with a different participant list: " << doc); } @@ -176,29 +182,35 @@ repl::OpTime persistParticipantListBlocking(OperationContext* opCtx, LOGV2_DEBUG(22465, 3, - "{sessionId}:{txnNumber} Wrote participant list", + "{sessionId}:{txnNumberAndRetryCounter} Wrote participant list", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber); + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter); return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); } } // namespace -Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - const txn::ParticipantsList& participants) { +Future<repl::OpTime> persistParticipantsList( + txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, + const txn::ParticipantsList& participants) { return txn::doWhile( scheduler, boost::none /* no need for a backoff */, [](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); }, - [&scheduler, lsid, txnNumber, participants] { - return scheduler.scheduleWork([lsid, txnNumber, participants](OperationContext* opCtx) { - FlowControl::Bypass flowControlBypass(opCtx); - getTransactionCoordinatorWorkerCurOpRepository()->set( - opCtx, lsid, txnNumber, CoordinatorAction::kWritingParticipantList); - return persistParticipantListBlocking(opCtx, lsid, txnNumber, participants); - }); + [&scheduler, lsid, txnNumberAndRetryCounter, participants] { + return scheduler.scheduleWork( + [lsid, txnNumberAndRetryCounter, participants](OperationContext* opCtx) { + FlowControl::Bypass flowControlBypass(opCtx); + getTransactionCoordinatorWorkerCurOpRepository()->set( + opCtx, + lsid, + txnNumberAndRetryCounter.getTxnNumber(), + CoordinatorAction::kWritingParticipantList); + return persistParticipantListBlocking( + opCtx, lsid, txnNumberAndRetryCounter, participants); + }); }); } @@ -232,14 +244,20 @@ CoordinatorCommitDecision PrepareVoteConsensus::decision() const { Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const APIParameters& apiParams, const txn::ParticipantsList& participants) { PrepareTransaction prepareTransaction; prepareTransaction.setDbName(NamespaceString::kAdminDb); - BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" + BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber" + << txnNumberAndRetryCounter.getTxnNumber() << "autocommit" << false << WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + bob.append(OperationSessionInfo::kTxnRetryCounterFieldName, + *txnNumberAndRetryCounter.getTxnRetryCounter()); + } apiParams.appendInfo(&bob); auto prepareObj = prepareTransaction.toBSON(bob.obj()); @@ -249,10 +267,14 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, // vector of responses. auto prepareScheduler = scheduler.makeChildScheduler(); - OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) { + OperationContextFn operationContextFn = [lsid, + txnNumberAndRetryCounter](OperationContext* opCtx) { invariant(opCtx); getTransactionCoordinatorWorkerCurOpRepository()->set( - opCtx, lsid, txnNumber, CoordinatorAction::kSendingPrepare); + opCtx, + lsid, + txnNumberAndRetryCounter.getTxnNumber(), + CoordinatorAction::kSendingPrepare); if (MONGO_unlikely(hangBeforeSendingPrepare.shouldFail())) { LOGV2(22466, "Hit hangBeforeSendingPrepare failpoint"); @@ -264,7 +286,7 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, responses.emplace_back(sendPrepareToShard(service, *prepareScheduler, lsid, - txnNumber, + txnNumberAndRetryCounter, participant, prepareObj, operationContextFn)); @@ -278,8 +300,8 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, // Initial value PrepareVoteConsensus{int(participants.size())}, // Aggregates an incoming response (next) with the existing aggregate value (result) - [&prepareScheduler = *prepareScheduler, txnNumber](PrepareVoteConsensus& result, - const PrepareResponse& next) { + [&prepareScheduler = *prepareScheduler, txnNumberAndRetryCounter]( + PrepareVoteConsensus& result, const PrepareResponse& next) { result.registerVote(next); if (next.vote == PrepareVote::kAbort) { @@ -287,7 +309,7 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, 1, "Received abort prepare vote from node", "shardId"_attr = next.shardId, - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "error"_attr = (next.abortReason.has_value() ? next.abortReason.value().reason() : "")); @@ -308,15 +330,15 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, namespace { repl::OpTime persistDecisionBlocking(OperationContext* opCtx, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const std::vector<ShardId>& participantList, const txn::CoordinatorCommitDecision& decision) { const bool isCommit = decision.getDecision() == txn::CommitDecision::kCommit; LOGV2_DEBUG(22467, 3, - "{sessionId}:{txnNumber} Going to write decision {decision}", + "{sessionId}:{txnNumberAndRetryCounter} Going to write decision {decision}", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "decision"_attr = (isCommit ? "commit" : "abort")); if (MONGO_unlikely(hangBeforeWritingDecision.shouldFail())) { @@ -326,7 +348,11 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx, OperationSessionInfo sessionInfo; sessionInfo.setSessionId(lsid); - sessionInfo.setTxnNumber(txnNumber); + sessionInfo.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber()); + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + sessionInfo.setTxnRetryCounter(*txnNumberAndRetryCounter.getTxnRetryCounter()); + } DBDirectClient client(opCtx); @@ -379,7 +405,7 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx, uasserted(51026, str::stream() << "While attempting to write decision " << (isCommit ? "'commit'" : "'abort'") << " for" << lsid.getId() - << ':' << txnNumber + << ':' << txnNumberAndRetryCounter.toBSON() << ", either failed to find document for this lsid:txnNumber or " "document existed with a different participant list, decision " "or commitTimestamp: " @@ -388,10 +414,10 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx, LOGV2_DEBUG(22469, 3, - "{sessionId}:{txnNumber} Wrote decision {decision}", + "{sessionId}:{txnNumberAndRetryCounter} Wrote decision {decision}", "Wrote decision", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "decision"_attr = (isCommit ? "commit" : "abort")); return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); @@ -400,20 +426,24 @@ repl::OpTime persistDecisionBlocking(OperationContext* opCtx, Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const txn::ParticipantsList& participants, const txn::CoordinatorCommitDecision& decision) { return txn::doWhile( scheduler, boost::none /* no need for a backoff */, [](const StatusWith<repl::OpTime>& s) { return shouldRetryPersistingCoordinatorState(s); }, - [&scheduler, lsid, txnNumber, participants, decision] { + [&scheduler, lsid, txnNumberAndRetryCounter, participants, decision] { return scheduler.scheduleWork( - [lsid, txnNumber, participants, decision](OperationContext* opCtx) { + [lsid, txnNumberAndRetryCounter, participants, decision](OperationContext* opCtx) { FlowControl::Bypass flowControlBypass(opCtx); getTransactionCoordinatorWorkerCurOpRepository()->set( - opCtx, lsid, txnNumber, CoordinatorAction::kWritingDecision); - return persistDecisionBlocking(opCtx, lsid, txnNumber, participants, decision); + opCtx, + lsid, + txnNumberAndRetryCounter.getTxnNumber(), + CoordinatorAction::kWritingDecision); + return persistDecisionBlocking( + opCtx, lsid, txnNumberAndRetryCounter, participants, decision); }); }); } @@ -421,23 +451,33 @@ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler, Future<void> sendCommit(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const APIParameters& apiParams, const txn::ParticipantsList& participants, Timestamp commitTimestamp) { CommitTransaction commitTransaction; commitTransaction.setDbName(NamespaceString::kAdminDb); commitTransaction.setCommitTimestamp(commitTimestamp); - BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" + BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber" + << txnNumberAndRetryCounter.getTxnNumber() << "autocommit" << false << WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + bob.append(OperationSessionInfo::kTxnRetryCounterFieldName, + *txnNumberAndRetryCounter.getTxnRetryCounter()); + } apiParams.appendInfo(&bob); auto commitObj = commitTransaction.toBSON(bob.obj()); - OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) { + OperationContextFn operationContextFn = [lsid, + txnNumberAndRetryCounter](OperationContext* opCtx) { invariant(opCtx); getTransactionCoordinatorWorkerCurOpRepository()->set( - opCtx, lsid, txnNumber, CoordinatorAction::kSendingCommit); + opCtx, + lsid, + txnNumberAndRetryCounter.getTxnNumber(), + CoordinatorAction::kSendingCommit); if (MONGO_unlikely(hangBeforeSendingCommit.shouldFail())) { LOGV2(22470, "Hit hangBeforeSendingCommit failpoint"); @@ -447,8 +487,13 @@ Future<void> sendCommit(ServiceContext* service, std::vector<Future<void>> responses; for (const auto& participant : participants) { - responses.push_back(sendDecisionToShard( - service, scheduler, lsid, txnNumber, participant, commitObj, operationContextFn)); + responses.push_back(sendDecisionToShard(service, + scheduler, + lsid, + txnNumberAndRetryCounter, + participant, + commitObj, + operationContextFn)); } return txn::whenAll(responses); } @@ -456,21 +501,28 @@ Future<void> sendCommit(ServiceContext* service, Future<void> sendAbort(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const APIParameters& apiParams, const txn::ParticipantsList& participants) { AbortTransaction abortTransaction; abortTransaction.setDbName(NamespaceString::kAdminDb); - BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" + BSONObjBuilder bob(BSON("lsid" << lsid.toBSON() << "txnNumber" + << txnNumberAndRetryCounter.getTxnNumber() << "autocommit" << false << WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + bob.append(OperationSessionInfo::kTxnRetryCounterFieldName, + *txnNumberAndRetryCounter.getTxnRetryCounter()); + } apiParams.appendInfo(&bob); auto abortObj = abortTransaction.toBSON(bob.obj()); - OperationContextFn operationContextFn = [lsid, txnNumber](OperationContext* opCtx) { + OperationContextFn operationContextFn = [lsid, + txnNumberAndRetryCounter](OperationContext* opCtx) { invariant(opCtx); getTransactionCoordinatorWorkerCurOpRepository()->set( - opCtx, lsid, txnNumber, CoordinatorAction::kSendingAbort); + opCtx, lsid, txnNumberAndRetryCounter.getTxnNumber(), CoordinatorAction::kSendingAbort); if (MONGO_unlikely(hangBeforeSendingAbort.shouldFail())) { LOGV2(22471, "Hit hangBeforeSendingAbort failpoint"); @@ -480,8 +532,13 @@ Future<void> sendAbort(ServiceContext* service, std::vector<Future<void>> responses; for (const auto& participant : participants) { - responses.push_back(sendDecisionToShard( - service, scheduler, lsid, txnNumber, participant, abortObj, operationContextFn)); + responses.push_back(sendDecisionToShard(service, + scheduler, + lsid, + txnNumberAndRetryCounter, + participant, + abortObj, + operationContextFn)); } return txn::whenAll(responses); } @@ -489,13 +546,13 @@ Future<void> sendAbort(ServiceContext* service, namespace { void deleteCoordinatorDocBlocking(OperationContext* opCtx, const LogicalSessionId& lsid, - TxnNumber txnNumber) { + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { LOGV2_DEBUG(22472, 3, - "{sessionId}:{txnNumber} Going to delete coordinator doc", + "{sessionId}:{txnNumberAndRetryCounter} Going to delete coordinator doc", "Going to delete coordinator doc", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber); + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter); if (MONGO_unlikely(hangBeforeDeletingCoordinatorDoc.shouldFail())) { LOGV2(22473, "Hit hangBeforeDeletingCoordinatorDoc failpoint"); @@ -504,7 +561,11 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, OperationSessionInfo sessionInfo; sessionInfo.setSessionId(lsid); - sessionInfo.setTxnNumber(txnNumber); + sessionInfo.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber()); + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + sessionInfo.setTxnRetryCounter(*txnNumberAndRetryCounter.getTxnRetryCounter()); + } DBDirectClient client(opCtx); @@ -545,7 +606,7 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, BSON(TransactionCoordinatorDocument::kIdFieldName << sessionInfo.toBSON())); uasserted(51027, str::stream() << "While attempting to delete document for " << lsid.getId() << ':' - << txnNumber + << txnNumberAndRetryCounter.toBSON() << ", either failed to find document for this lsid:txnNumber or " "document existed without a decision: " << doc); @@ -553,10 +614,10 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, LOGV2_DEBUG(22474, 3, - "{sessionId}:{txnNumber} Deleted coordinator doc", + "{sessionId}:{txnNumberAndRetryCounter} Deleted coordinator doc", "Deleted coordinator doc", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber); + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter); hangAfterDeletingCoordinatorDoc.execute([&](const BSONObj& data) { LOGV2(22475, "Hit hangAfterDeletingCoordinatorDoc failpoint"); @@ -571,19 +632,22 @@ void deleteCoordinatorDocBlocking(OperationContext* opCtx, Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber) { - return txn::doWhile( - scheduler, - boost::none /* no need for a backoff */, - [](const Status& s) { return s == ErrorCodes::Interrupted; }, - [&scheduler, lsid, txnNumber] { - return scheduler.scheduleWork([lsid, txnNumber](OperationContext* opCtx) { - FlowControl::Bypass flowControlBypass(opCtx); - getTransactionCoordinatorWorkerCurOpRepository()->set( - opCtx, lsid, txnNumber, CoordinatorAction::kDeletingCoordinatorDoc); - deleteCoordinatorDocBlocking(opCtx, lsid, txnNumber); - }); - }); + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { + return txn::doWhile(scheduler, + boost::none /* no need for a backoff */, + [](const Status& s) { return s == ErrorCodes::Interrupted; }, + [&scheduler, lsid, txnNumberAndRetryCounter] { + return scheduler.scheduleWork([lsid, txnNumberAndRetryCounter]( + OperationContext* opCtx) { + FlowControl::Bypass flowControlBypass(opCtx); + getTransactionCoordinatorWorkerCurOpRepository()->set( + opCtx, + lsid, + txnNumberAndRetryCounter.getTxnNumber(), + CoordinatorAction::kDeletingCoordinatorDoc); + deleteCoordinatorDocBlocking(opCtx, lsid, txnNumberAndRetryCounter); + }); + }); } std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationContext* opCtx) { @@ -607,7 +671,7 @@ std::vector<TransactionCoordinatorDocument> readAllCoordinatorDocs(OperationCont Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const ShardId& shardId, const BSONObj& commandObj, OperationContextFn operationContextFn) { @@ -624,18 +688,18 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, }, [&scheduler, lsid, - txnNumber, + txnNumberAndRetryCounter, shardId, isLocalShard, commandObj = commandObj.getOwned(), operationContextFn] { LOGV2_DEBUG(22476, 3, - "{sessionId}:{txnNumber} Coordinator going to send command " + "{sessionId}:{txnNumberAndRetryCounter} Coordinator going to send command " "{command} to {localOrRemote} shard {shardId}", "Coordinator going to send command to shard", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "command"_attr = commandObj, "localOrRemote"_attr = (isLocalShard ? "local" : "remote"), "shardId"_attr = shardId); @@ -643,7 +707,7 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, return scheduler .scheduleRemoteCommand( shardId, kPrimaryReadPreference, commandObj, operationContextFn) - .then([lsid, txnNumber, shardId, commandObj = commandObj.getOwned()]( + .then([lsid, txnNumberAndRetryCounter, shardId, commandObj = commandObj.getOwned()]( ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); @@ -667,10 +731,10 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, << ", which is not an expected behavior. " "Interpreting the response as vote to abort"); LOGV2(22477, - "{sessionId}:{txnNumber} {error}", + "{sessionId}:{txnNumberAndRetryCounter} {error}", "Coordinator received error from transaction participant", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "error"_attr = redact(abortStatus)); return PrepareResponse{ @@ -680,12 +744,12 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, LOGV2_DEBUG( 22478, 3, - "{sessionId}:{txnNumber} Coordinator shard received a " + "{sessionId}:{txnNumberAndRetryCounter} Coordinator shard received a " "vote to commit from shard {shardId} with prepareTimestamp: " "{prepareTimestamp}", "Coordinator shard received a vote to commit from participant shard", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "shardId"_attr = shardId, "prepareTimestampField"_attr = prepareTimestampField.timestamp()); @@ -697,11 +761,11 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, LOGV2_DEBUG(22479, 3, - "{sessionId}:{txnNumber} Coordinator shard received " + "{sessionId}:{txnNumberAndRetryCounter} Coordinator shard received " "{error} from shard {shardId} for {command}", "Coordinator shard received response from shard", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "error"_attr = status, "shardId"_attr = shardId, "command"_attr = commandObj); @@ -732,14 +796,15 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, }); return std::move(f).onError<ErrorCodes::TransactionCoordinatorReachedAbortDecision>( - [lsid, txnNumber, shardId](const Status& status) { - LOGV2_DEBUG(22480, - 3, - "{sessionId}:{txnNumber} Prepare stopped retrying due to retrying " - "being cancelled", - "Prepare stopped retrying due to retrying being cancelled", - "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber); + [lsid, txnNumberAndRetryCounter, shardId](const Status& status) { + LOGV2_DEBUG( + 22480, + 3, + "{sessionId}:{txnNumberAndRetryCounter} Prepare stopped retrying due to retrying " + "being cancelled", + "Prepare stopped retrying due to retrying being cancelled", + "sessionId"_attr = lsid.getId(), + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter); return PrepareResponse{shardId, boost::none, boost::none, @@ -750,7 +815,7 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, Future<void> sendDecisionToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const ShardId& shardId, const BSONObj& commandObj, OperationContextFn operationContextFn) { @@ -766,18 +831,18 @@ Future<void> sendDecisionToShard(ServiceContext* service, }, [&scheduler, lsid, - txnNumber, + txnNumberAndRetryCounter, shardId, isLocalShard, operationContextFn, commandObj = commandObj.getOwned()] { LOGV2_DEBUG(22481, 3, - "{sessionId}:{txnNumber} Coordinator going to send command " + "{sessionId}:{txnNumberAndRetryCounter} Coordinator going to send command " "{command} to {localOrRemote} shard {shardId}", "Coordinator going to send command to shard", "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "command"_attr = commandObj, "localOrRemote"_attr = (isLocalShard ? "local" : "remote"), "shardId"_attr = shardId); @@ -785,7 +850,7 @@ Future<void> sendDecisionToShard(ServiceContext* service, return scheduler .scheduleRemoteCommand( shardId, kPrimaryReadPreference, commandObj, operationContextFn) - .then([lsid, txnNumber, shardId, commandObj = commandObj.getOwned()]( + .then([lsid, txnNumberAndRetryCounter, shardId, commandObj = commandObj.getOwned()]( ResponseStatus response) { auto status = getStatusFromCommandResult(response.data); auto wcStatus = getWriteConcernStatusFromCommandResult(response.data); @@ -796,16 +861,17 @@ Future<void> sendDecisionToShard(ServiceContext* service, status = wcStatus; } - LOGV2_DEBUG(22482, - 3, - "{sessionId}:{txnNumber} Coordinator shard received " - "{status} in response to {command} from shard {shardId}", - "Coordinator shard received response from shard", - "sessionId"_attr = lsid.getId(), - "txnNumber"_attr = txnNumber, - "status"_attr = status, - "command"_attr = commandObj, - "shardId"_attr = shardId); + LOGV2_DEBUG( + 22482, + 3, + "{sessionId}:{txnNumberAndRetryCounter} Coordinator shard received " + "{status} in response to {command} from shard {shardId}", + "Coordinator shard received response from shard", + "sessionId"_attr = lsid.getId(), + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, + "status"_attr = status, + "command"_attr = commandObj, + "shardId"_attr = shardId); if (ErrorCodes::isVoteAbortError(status.code())) { // Interpret voteAbort errors as an ack. @@ -824,8 +890,14 @@ Future<void> sendDecisionToShard(ServiceContext* service, }); } -std::string txnIdToString(const LogicalSessionId& lsid, TxnNumber txnNumber) { - return str::stream() << lsid.getId() << ':' << txnNumber; +std::string txnIdToString(const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { + str::stream ss; + ss << lsid.getId() << ':' << txnNumberAndRetryCounter.getTxnNumber(); + if (auto retryCounter = txnNumberAndRetryCounter.getTxnRetryCounter()) { + ss << ':' << *retryCounter; + } + return ss; } } // namespace txn diff --git a/src/mongo/db/s/transaction_coordinator_util.h b/src/mongo/db/s/transaction_coordinator_util.h index 36a2b2fcf3d..ba125b287cb 100644 --- a/src/mongo/db/s/transaction_coordinator_util.h +++ b/src/mongo/db/s/transaction_coordinator_util.h @@ -42,7 +42,7 @@ namespace txn { * Upserts a document of the form: * * { - * _id: {lsid: <lsid>, txnNumber: <txnNumber>} + * _id: {lsid: <lsid>, txnNumber: <txnNumber>, txnRetryCounter: <txnRetryCounter>} * participants: ["shard0000", "shard0001"] * } * @@ -51,12 +51,13 @@ namespace txn { * Throws if the upsert fails or waiting for writeConcern fails. * * If the upsert returns a DuplicateKey error, converts it to an anonymous error, because it means a - * document for the (lsid, txnNumber) exists with a different participant list. + * document for the (lsid, txnNumber, txnRetryCounter) exists with a different participant list. */ -Future<repl::OpTime> persistParticipantsList(txn::AsyncWorkScheduler& scheduler, - const LogicalSessionId& lsid, - TxnNumber txnNumber, - const txn::ParticipantsList& participants); +Future<repl::OpTime> persistParticipantsList( + txn::AsyncWorkScheduler& scheduler, + const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, + const txn::ParticipantsList& participants); struct PrepareResponse; class PrepareVoteConsensus { @@ -97,7 +98,7 @@ private: Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const APIParameters& apiParams, const txn::ParticipantsList& participants); @@ -105,10 +106,10 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, * If 'commitTimestamp' is boost::none, updates the document in config.transaction_coordinators * for * - * (lsid, txnNumber) to be: + * (lsid, txnNumber, txnRetryCounter) to be: * * { - * _id: {lsid: <lsid>, txnNumber: <txnNumber>} + * _id: {lsid: <lsid>, txnNumber: <txnNumber>, txnRetryCounter: <txnRetryCounter>} * participants: ["shard0000", "shard0001"] * decision: "abort" * } @@ -116,7 +117,7 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, * else updates the document to be: * * { - * _id: {lsid: <lsid>, txnNumber: <txnNumber>} + * _id: {lsid: <lsid>, txnNumber: <txnNumber>, txnRetryCounter: <txnRetryCounter>} * participants: ["shard0000", "shard0001"] * decision: "commit" * commitTimestamp: Timestamp(xxxxxxxx, x), @@ -127,12 +128,12 @@ Future<PrepareVoteConsensus> sendPrepare(ServiceContext* service, * Throws if the update fails or waiting for writeConcern fails. * * If the update succeeds but did not update any document, throws an anonymous error, because it - * means either no document for (lsid, txnNumber) exists, or a document exists but has a different - * participant list, different decision, or different commit Timestamp. + * means either no document for (lsid, txnNumber, txnRetryCounter) exists, or a document exists but + * has a different participant list, different decision, or different commit Timestamp. */ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const txn::ParticipantsList& participants, const txn::CoordinatorCommitDecision& decision); @@ -143,7 +144,7 @@ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler, Future<void> sendCommit(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const APIParameters& apiParams, const txn::ParticipantsList& participants, Timestamp commitTimestamp); @@ -155,7 +156,7 @@ Future<void> sendCommit(ServiceContext* service, Future<void> sendAbort(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const APIParameters& apiParams, const txn::ParticipantsList& participants); @@ -167,12 +168,12 @@ Future<void> sendAbort(ServiceContext* service, * Throws if the update fails. * * If the delete succeeds but did not delete any document, throws an anonymous error, because it - * means either no document for (lsid, txnNumber) exists, or a document exists but without a - * decision. + * means either no document for (lsid, txnNumber, txnRetryCounter) exists, or a document exists but + * without a decision. */ Future<void> deleteCoordinatorDoc(txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber); + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter); /** * Reads and returns all documents in config.transaction_coordinators. @@ -208,7 +209,7 @@ struct PrepareResponse { Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const ShardId& shardId, const BSONObj& prepareCommandObj, OperationContextFn operationContextFn = @@ -228,7 +229,7 @@ Future<PrepareResponse> sendPrepareToShard(ServiceContext* service, Future<void> sendDecisionToShard(ServiceContext* service, txn::AsyncWorkScheduler& scheduler, const LogicalSessionId& lsid, - TxnNumber txnNumber, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter, const ShardId& shardId, const BSONObj& commandObj, OperationContextFn operationContextFn = [](OperationContext*) {}); @@ -237,7 +238,8 @@ Future<void> sendDecisionToShard(ServiceContext* service, * Returns a string representation of the transaction id represented by the given session id and * transaction number. */ -std::string txnIdToString(const LogicalSessionId& lsid, TxnNumber txnNumber); +std::string txnIdToString(const LogicalSessionId& lsid, + const TxnNumberAndRetryCounter& txnNumberAndRetryCounter); } // namespace txn } // namespace mongo |