diff options
author | Jason Zhang <jason.zhang@mongodb.com> | 2021-11-03 15:48:36 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-03 16:50:44 +0000 |
commit | eca169a0b7009a783b0271f81677aadcdfd1a78b (patch) | |
tree | 5b345f5a9a496e319cfa3a17791dbb6ee4139a3b | |
parent | d50284111ab07054194fdff0b9892daa8cc82265 (diff) | |
download | mongo-eca169a0b7009a783b0271f81677aadcdfd1a78b.tar.gz |
SERVER-60529 Make TransactionParticipant take in txnNumberAndRetryCounter instead of txnNumber and txnRetryCounter separately
22 files changed, 504 insertions, 653 deletions
diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp index f5070365d4e..f8b235c4d2c 100644 --- a/src/mongo/db/kill_sessions_local.cpp +++ b/src/mongo/db/kill_sessions_local.cpp @@ -134,12 +134,14 @@ void killAllExpiredTransactions(OperationContext* opCtx) { if (txnParticipant.transactionIsInProgress() || txnParticipant.transactionIsAborted()) { LOGV2( 20707, - "Aborting transaction with session id {sessionId} and txnNumber {txnNumber} " + "Aborting transaction with session id {sessionId} and txnNumberAndRetryCounter " + "{txnNumberAndRetryCounter} " "because it has been running for longer than 'transactionLifetimeLimitSeconds'", "Aborting transaction because it has been running for longer than " "'transactionLifetimeLimitSeconds'", "sessionId"_attr = session.getSessionId().getId(), - "txnNumber"_attr = txnParticipant.getActiveTxnNumber()); + "txnNumberAndRetryCounter"_attr = + txnParticipant.getActiveTxnNumberAndRetryCounter()); if (txnParticipant.transactionIsInProgress()) { txnParticipant.abortTransaction(opCtx); } @@ -193,30 +195,32 @@ void yieldLocksForPreparedTransactions(OperationContext* opCtx) { LOGV2(6015318, "Yielding locks for prepared transactions."); SessionKiller::Matcher matcherAllSessions( KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(newOpCtx.get())}); - killSessionsAction(newOpCtx.get(), - matcherAllSessions, - [](const ObservableSession& session) { - return TransactionParticipant::get(session).transactionIsPrepared(); - }, - [](OperationContext* killerOpCtx, const SessionToKill& session) { - auto txnParticipant = TransactionParticipant::get(session); - // Yield locks for prepared transactions. When scanning and killing - // operations, all prepared transactions are included in the list. Even - // though new sessions may be created after the scan, none of them can - // become prepared during stepdown, since the RSTL has been enqueued, - // preventing any new writes. - if (txnParticipant.transactionIsPrepared()) { - LOGV2_DEBUG(20708, - 3, - "Yielding locks of prepared transaction. SessionId: " - "{sessionId} TxnNumber: {txnNumber}", - "Yielding locks of prepared transaction", - "sessionId"_attr = session.getSessionId().getId(), - "txnNumber"_attr = txnParticipant.getActiveTxnNumber()); - txnParticipant.refreshLocksForPreparedTransaction(killerOpCtx, true); - } - }, - ErrorCodes::InterruptedDueToReplStateChange); + killSessionsAction( + newOpCtx.get(), + matcherAllSessions, + [](const ObservableSession& session) { + return TransactionParticipant::get(session).transactionIsPrepared(); + }, + [](OperationContext* killerOpCtx, const SessionToKill& session) { + auto txnParticipant = TransactionParticipant::get(session); + // Yield locks for prepared transactions. When scanning and killing + // operations, all prepared transactions are included in the list. Even + // though new sessions may be created after the scan, none of them can + // become prepared during stepdown, since the RSTL has been enqueued, + // preventing any new writes. + if (txnParticipant.transactionIsPrepared()) { + LOGV2_DEBUG(20708, + 3, + "Yielding locks of prepared transaction. SessionId: " + "{sessionId} TxnNumberAndRetryCounter: {txnNumberAndRetryCounter}", + "Yielding locks of prepared transaction", + "sessionId"_attr = session.getSessionId().getId(), + "txnNumberAndRetryCounter"_attr = + txnParticipant.getActiveTxnNumberAndRetryCounter()); + txnParticipant.refreshLocksForPreparedTransaction(killerOpCtx, true); + } + }, + ErrorCodes::InterruptedDueToReplStateChange); } void invalidateSessionsForStepdown(OperationContext* opCtx) { diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h index 354a2e1aa26..f79bc462755 100644 --- a/src/mongo/db/logical_session_id.h +++ b/src/mongo/db/logical_session_id.h @@ -132,7 +132,7 @@ using LogicalSessionIdMap = stdx::unordered_map<LogicalSessionId, T, LogicalSess class TxnNumberAndRetryCounter { public: - TxnNumberAndRetryCounter(TxnNumber txnNumber, TxnRetryCounter txnRetryCounter) + TxnNumberAndRetryCounter(TxnNumber txnNumber, boost::optional<TxnRetryCounter> txnRetryCounter) : _txnNumber(txnNumber), _txnRetryCounter(txnRetryCounter) {} TxnNumberAndRetryCounter(TxnNumber txnNumber) @@ -155,9 +155,17 @@ public: return _txnRetryCounter; } + void setTxnNumber(const TxnNumber txnNumber) { + _txnNumber = txnNumber; + } + + void setTxnRetryCounter(const boost::optional<TxnRetryCounter> txnRetryCounter) { + _txnRetryCounter = txnRetryCounter; + } + private: - const TxnNumber _txnNumber; - const boost::optional<TxnRetryCounter> _txnRetryCounter; + TxnNumber _txnNumber; + boost::optional<TxnRetryCounter> _txnRetryCounter; }; inline bool operator==(const TxnNumberAndRetryCounter& l, const TxnNumberAndRetryCounter& r) { diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 66b803d4a81..a28dc301cec 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -630,11 +630,8 @@ public: NamespaceString nss, TxnNumber txnNum, StmtId stmtId) { - txnParticipant.beginOrContinue(opCtx, - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); { AutoGetCollection autoColl(opCtx, nss, MODE_IX); @@ -809,10 +806,9 @@ public: void setUp() override { OpObserverTxnParticipantTest::setUp(); txnParticipant().beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + true /* startTransaction */); } protected: @@ -1500,11 +1496,8 @@ class OpObserverRetryableFindAndModifyTest : public OpObserverTxnParticipantTest public: void setUp() override { OpObserverTxnParticipantTest::setUp(); - txnParticipant().beginOrContinue(opCtx(), - txnNum(), - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant().beginOrContinue( + opCtx(), {txnNum()}, boost::none /* autocommit */, boost::none /* startTransaction */); } void tearDown() override { @@ -1749,10 +1742,9 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { contextSession.emplace(opCtx); txnParticipant.emplace(TransactionParticipant::get(opCtx)); txnParticipant->beginOrContinue(opCtx, - TxnNumber(testIdx), + {TxnNumber(testIdx)}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); } updateArgs.preImageDoc = boost::none; @@ -1887,10 +1879,9 @@ TEST_F(OpObserverTest, TestFundamentalOnInsertsOutputs) { contextSession.emplace(opCtx); txnParticipant.emplace(TransactionParticipant::get(opCtx)); txnParticipant->beginOrContinue(opCtx, - TxnNumber(testIdx), + {TxnNumber(testIdx)}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); } // Phase 2: Call the code we're testing. @@ -2020,10 +2011,9 @@ TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) { contextSession.emplace(opCtx); txnParticipant.emplace(TransactionParticipant::get(opCtx)); txnParticipant->beginOrContinue(opCtx, - TxnNumber(testIdx), + {TxnNumber(testIdx)}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); } switch (testCase.retryableOptions) { case kNotRetryable: diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 587b76d7859..2d826c9d841 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -976,6 +976,7 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr "Migration attempting to commit transaction", "sessionId"_attr = sessionId, "txnNumber"_attr = txnNumber, + "txnRetryCounter"_attr = optTxnRetryCounter, "tenantId"_attr = getTenantId(), "migrationId"_attr = getMigrationUUID(), "entry"_attr = entry); @@ -996,16 +997,20 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr // If the entry's transaction number is stale/older than the current active transaction number // on the participant, fail the migration. uassert(ErrorCodes::TransactionTooOld, - str::stream() << "Migration cannot apply transaction " << txnNumber << " on session " - << sessionId << " because a newer transaction " - << txnParticipant.getActiveTxnNumber() << " has already started", - txnParticipant.getActiveTxnNumber() <= txnNumber); - if (txnParticipant.getActiveTxnNumber() == txnNumber) { + str::stream() << "Migration cannot apply transaction with tranaction number " + << txnNumber << " and transaction retry counter " << optTxnRetryCounter + << " on session " << sessionId + << " because a newer transaction with txnNumberAndRetryCounter: " + << txnParticipant.getActiveTxnNumberAndRetryCounter().toBSON() + << " has already started", + txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber() <= txnNumber); + if (txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber() == txnNumber) { // If the txn numbers are equal, move on to the next entry. return; } - txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, txnNumber, optTxnRetryCounter); + txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, + {txnNumber, optTxnRetryCounter}); MutableOplogEntry noopEntry; noopEntry.setOpType(repl::OpTypeEnum::kNoop); diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 2b401445441..60c6c1a63ed 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -591,6 +591,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( "Tenant Oplog Applier committing transaction", "sessionId"_attr = sessionId, "txnNumber"_attr = txnNumber, + "txnRetryCounter"_attr = optTxnRetryCounter, "tenant"_attr = _tenantId, "migrationUuid"_attr = _migrationUuid, "op"_attr = redact(entry.toBSONForLogging())); @@ -609,11 +610,12 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( uassert(5351501, str::stream() << "Tenant oplog application cannot apply transaction " << txnNumber << " on session " << sessionId - << " because the transaction number " - << txnParticipant.getActiveTxnNumber() << " has already started", - txnParticipant.getActiveTxnNumber() < txnNumber); + << " because the transaction with txnNumberAndRetryCounter " + << txnParticipant.getActiveTxnNumberAndRetryCounter().toBSON() + << " has already started", + txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber() < txnNumber); txnParticipant.beginOrContinueTransactionUnconditionally( - opCtx.get(), txnNumber, optTxnRetryCounter); + opCtx.get(), {txnNumber, optTxnRetryCounter}); // Only set sessionId, txnNumber and txnRetryCounter for the final applyOp in a // transaction. @@ -743,11 +745,11 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( txnParticipant); // beginOrContinue throws on failure, which will abort the migration. Failure should // only result from out-of-order processing, which should not happen. + TxnNumberAndRetryCounter txnNumberAndRetryCounter{txnNumber}; txnParticipant.beginOrContinue(opCtx.get(), - txnNumber, + txnNumberAndRetryCounter, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); // We could have an existing lastWriteOpTime for the same retryable write chain from a // previously aborted migration. This could also happen if the tenant being migrated has @@ -775,11 +777,11 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( "migrationUuid"_attr = _migrationUuid); txnParticipant.invalidate(opCtx.get()); txnParticipant.refreshFromStorageIfNeededNoOplogEntryFetch(opCtx.get()); + TxnNumberAndRetryCounter txnNumberAndRetryCounter{txnNumber}; txnParticipant.beginOrContinue(opCtx.get(), - txnNumber, + txnNumberAndRetryCounter, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); } // We should never process the same donor statement twice, except in failover diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 4cf9b4b3904..d6d44e380d2 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -110,10 +110,9 @@ BSONObj makeLocalReadConcernWithAfterClusterTime(Timestamp afterClusterTime) { void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) { MongoDOperationContextSession::checkOut(opCtx); TransactionParticipant::get(opCtx).beginOrContinue(opCtx, - *opCtx->getTxnNumber(), + {*opCtx->getTxnNumber()}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); } template <typename Callable> @@ -1062,10 +1061,9 @@ void MigrationDestinationManager::_migrateThread(bool skipToCritSecTaken) { auto txnParticipant = TransactionParticipant::get(opCtx); txnParticipant.beginOrContinue(opCtx, - *opCtx->getTxnNumber(), + {*opCtx->getTxnNumber()}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); _migrateDriver(opCtx, skipToCritSecTaken); } catch (...) { _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp index 0efb2179306..9f7b7bf6aff 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp @@ -259,11 +259,8 @@ boost::optional<SharedSemiFuture<void>> withSessionCheckedOut(OperationContext* auto txnParticipant = TransactionParticipant::get(opCtx); try { - txnParticipant.beginOrContinue(opCtx, - txnNumber, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNumber}, boost::none /* autocommit */, boost::none /* startTransaction */); if (stmtId && txnParticipant.checkStatementExecuted(opCtx, *stmtId)) { // Skip the incoming statement because it has already been logged locally. diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index 9bce751d916..eadf96b76bc 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -96,11 +96,8 @@ void runWithTransaction(OperationContext* opCtx, } }); - txnParticipant.beginOrContinue(asr.opCtx(), - txnNumber, - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + asr.opCtx(), {txnNumber}, false /* autocommit */, true /* startTransaction */); txnParticipant.unstashTransactionResources(asr.opCtx(), "reshardingOplogApplication"); func(asr.opCtx()); diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp index eb95ef2a215..684f3f4990a 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp @@ -181,11 +181,8 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - txnNumber, - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNumber}, false /* autocommit */, true /* startTransaction */); txnParticipant.unstashTransactionResources(opCtx, "prepareTransaction"); @@ -205,11 +202,8 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - txnNumber, - false /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNumber}, false /* autocommit */, boost::none /* startTransaction */); txnParticipant.unstashTransactionResources(opCtx, "abortTransaction"); txnParticipant.abortTransaction(opCtx); diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp index 78f817508a1..6c57d2271a5 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp @@ -79,11 +79,8 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - txnNumber, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNumber}, boost::none /* autocommit */, boost::none /* startTransaction */); WriteUnitOfWork wuow(opCtx); auto opTime = repl::getNextOpTime(opCtx); @@ -106,11 +103,8 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - txnNumber, - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNumber}, false /* autocommit */, true /* startTransaction */); txnParticipant.unstashTransactionResources(opCtx, "prepareTransaction"); @@ -130,11 +124,8 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - txnNumber, - false /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNumber}, false /* autocommit */, boost::none /* startTransaction */); txnParticipant.unstashTransactionResources(opCtx, "abortTransaction"); txnParticipant.abortTransaction(opCtx); @@ -268,11 +259,8 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - txnNumber, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNumber}, boost::none /* autocommit */, boost::none /* startTransaction */); ASSERT_TRUE(bool(txnParticipant.checkStatementExecuted(opCtx, stmtId))); } }; diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp index da8acbf4f2e..895f1ad225f 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp @@ -243,17 +243,11 @@ protected: auto txnParticipant = TransactionParticipant::get(opCtx); ASSERT(txnParticipant); if (multiDocTxn) { - txnParticipant.beginOrContinue(opCtx, - txnNum, - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNum}, false /* autocommit */, true /* startTransaction */); } else { - txnParticipant.beginOrContinue(opCtx, - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); } } @@ -294,7 +288,7 @@ protected: BSON(repl::OplogEntryBase::kSessionIdFieldName << sessionId.toBSON())); ASSERT_BSONOBJ_EQ(bsonOplog, {}); - ASSERT_EQ(txnParticipant.getActiveTxnNumber(), txnNum); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber(), txnNum); } boost::optional<ReshardingTxnClonerProgress> getTxnCloningProgress( @@ -396,11 +390,8 @@ protected: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - txnNumber, - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNumber}, false /* autocommit */, true /* startTransaction */); txnParticipant.unstashTransactionResources(opCtx, "prepareTransaction"); @@ -420,11 +411,8 @@ protected: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - txnNumber, - false /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNumber}, false /* autocommit */, boost::none /* startTransaction */); txnParticipant.unstashTransactionResources(opCtx, "abortTransaction"); txnParticipant.abortTransaction(opCtx); diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp index 91881f6c5e1..34d56bbb70f 100644 --- a/src/mongo/db/s/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp @@ -70,11 +70,8 @@ void runInTransaction(OperationContext* opCtx, Callable&& func) { auto txnParticipant = TransactionParticipant::get(opCtx); ASSERT(txnParticipant); - txnParticipant.beginOrContinue(opCtx, - txnNum, - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNum}, false /* autocommit */, true /* startTransaction */); txnParticipant.unstashTransactionResources(opCtx, "SetDestinedRecipient"); func(); diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 8350f974581..9e4b42d9a66 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -248,10 +248,9 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, try { txnParticipant.beginOrContinue(opCtx, - result.txnNum, + {result.txnNum}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); if (txnParticipant.checkStatementExecuted(opCtx, stmtIds.front())) { // Skip the incoming statement because it has already been logged locally return lastResult; diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 446fedd435a..639e9ac0a02 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -183,11 +183,8 @@ public: opCtx->setTxnNumber(txnNum); MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); } void checkOplog(const repl::OplogEntry& originalOplog, const repl::OplogEntry& oplogToCheck) { @@ -255,10 +252,9 @@ public: MongoDOperationContextSession sessionTxnState(innerOpCtx.get()); auto txnParticipant = TransactionParticipant::get(innerOpCtx.get()); txnParticipant.beginOrContinue(innerOpCtx.get(), - *sessionInfo.getTxnNumber(), + {*sessionInfo.getTxnNumber()}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); const auto reply = write_ops_exec::performInserts(innerOpCtx.get(), insertRequest); ASSERT(reply.results.size() == 1); @@ -1886,11 +1882,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, auto txnParticipant = TransactionParticipant::get(opCtx); txnParticipant.refreshFromStorageIfNeeded(opCtx); - txnParticipant.beginOrContinue(opCtx, - 3, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx, {3}, boost::none /* autocommit */, boost::none /* startTransaction */); } OperationSessionInfo sessionInfo2; diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 18596e672a9..147878c02d6 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -321,10 +321,9 @@ public: MongoDOperationContextSession sessionTxnState(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); txnParticipant.beginOrContinue(opCtx, - *opCtx->getTxnNumber(), + txnNumberAndRetryCounter, false /* autocommit */, - boost::none /* startTransaction */, - *opCtx->getTxnRetryCounter()); + boost::none /* startTransaction */); if (txnParticipant.transactionIsCommitted()) return; @@ -344,10 +343,9 @@ public: // Call beginOrContinue again in case the transaction number has changed. txnParticipant.beginOrContinue(opCtx, - *opCtx->getTxnNumber(), + txnNumberAndRetryCounter, false /* autocommit */, - boost::none /* startTransaction */, - *opCtx->getTxnRetryCounter()); + boost::none /* startTransaction */); invariant(!txnParticipant.transactionIsOpen(), "The participant should not be in progress after we waited for the " diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index e00d0e0c000..5acaa1e2614 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -864,11 +864,11 @@ void CheckoutSessionAndInvokeCommand::_checkOutSession() { // transaction on that session. while (!beganOrContinuedTxn) { try { - _txnParticipant->beginOrContinue(opCtx, - *sessionOptions.getTxnNumber(), - sessionOptions.getAutocommit(), - sessionOptions.getStartTransaction(), - sessionOptions.getTxnRetryCounter()); + _txnParticipant->beginOrContinue( + opCtx, + {*sessionOptions.getTxnNumber(), sessionOptions.getTxnRetryCounter()}, + sessionOptions.getAutocommit(), + sessionOptions.getStartTransaction()); beganOrContinuedTxn = true; } catch (const ExceptionFor<ErrorCodes::PreparedTransactionInProgress>&) { auto prepareCompleted = _txnParticipant->onExitPrepare(); diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 6e1878dc582..4b5634ba482 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -304,10 +304,11 @@ void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) { LOGV2_DEBUG(21979, 3, "Restoring locks of prepared transaction. SessionId: {sessionId} " - "TxnNumber: {txnNumber}", + "TxnNumberAndRetryCounter: {txnNumberAndRetryCounter}", "Restoring locks of prepared transaction", "sessionId"_attr = sessionId.getId(), - "txnNumber"_attr = txnParticipant.getActiveTxnNumber()); + "txnNumberAndRetryCounter"_attr = + txnParticipant.getActiveTxnNumberAndRetryCounter()); txnParticipant.refreshLocksForPreparedTransaction(newOpCtx.get(), false); } } @@ -486,7 +487,7 @@ MongoDOperationContextSessionWithoutRefresh::MongoDOperationContextSessionWithou auto txnParticipant = TransactionParticipant::get(opCtx); txnParticipant.beginOrContinueTransactionUnconditionally( - opCtx, clientTxnNumber, clientTxnRetryCounter); + opCtx, {clientTxnNumber, clientTxnRetryCounter}); } MongoDOperationContextSessionWithoutRefresh::~MongoDOperationContextSessionWithoutRefresh() { diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 4cb6b153458..894f568ec5f 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -100,21 +100,21 @@ const StringMap<int> preparedTxnCmdAllowlist = { {"abortTransaction", 1}, {"commitTransaction", 1}, {"prepareTransaction", 1}}; void fassertOnRepeatedExecution(const LogicalSessionId& lsid, - TxnNumber txnNumber, + TxnNumberAndRetryCounter txnNumberAndRetryCounter, StmtId stmtId, const repl::OpTime& firstOpTime, const repl::OpTime& secondOpTime) { LOGV2_FATAL( 40526, - "Statement id {stmtId} from transaction [ {lsid}:{txnNumber} ] was committed once " - "with opTime {firstCommitOpTime} and a second time with opTime {secondCommitOpTime}. This " - "indicates possible data corruption or server bug and the process will be " - "terminated.", + "Statement id {stmtId} from transaction [ {lsid}:{txnNumberAndRetryCounter} ] was " + "committed once with opTime {firstCommitOpTime} and a second time with opTime { " + "secondCommitOpTime}. This indicates possible data corruption or server bug and the " + "process will be terminated.", "Statement from transaction was committed twice. This indicates possible data corruption " "or server bug and the process will be terminated", "stmtId"_attr = stmtId, "lsid"_attr = lsid.toBSON(), - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "firstCommitOpTime"_attr = firstOpTime, "secondCommitOpTime"_attr = secondOpTime); } @@ -475,10 +475,13 @@ bool TransactionParticipant::Observer::_isInternalSessionForRetryableWrite() con } void TransactionParticipant::Participant::_beginOrContinueRetryableWrite( - OperationContext* opCtx, const TxnNumber& txnNumber) { - if (txnNumber > o().activeTxnNumber) { + OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { + invariant(!txnNumberAndRetryCounter.getTxnRetryCounter()); + if (txnNumberAndRetryCounter.getTxnNumber() > + o().activeTxnNumberAndRetryCounter.getTxnNumber()) { // New retryable write. - _setNewTxnNumber(opCtx, txnNumber, kUninitializedTxnRetryCounter); + _setNewTxnNumberAndRetryCounter( + opCtx, {txnNumberAndRetryCounter.getTxnNumber(), kUninitializedTxnRetryCounter}); p().autoCommit = boost::none; } else { // Retrying a retryable write. @@ -490,26 +493,36 @@ void TransactionParticipant::Participant::_beginOrContinueRetryableWrite( } void TransactionParticipant::Participant::_continueMultiDocumentTransaction( - OperationContext* opCtx, const TxnNumber& txnNumber, const TxnRetryCounter& txnRetryCounter) { + OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { uassert(ErrorCodes::NoSuchTransaction, str::stream() - << "Given transaction number " << txnNumber + << "Given transaction number " << txnNumberAndRetryCounter.getTxnNumber() << " does not match any in-progress transactions. The active transaction number is " - << o().activeTxnNumber, - txnNumber == o().activeTxnNumber && !o().txnState.isInRetryableWriteMode()); - - uassert(TxnRetryCounterTooOldInfo(o().activeTxnRetryCounter), - str::stream() << "Cannot continue transaction " << txnNumber << " on session " - << _sessionId() << " using txnRetryCounter " << txnRetryCounter + << o().activeTxnNumberAndRetryCounter.getTxnNumber(), + txnNumberAndRetryCounter.getTxnNumber() == + o().activeTxnNumberAndRetryCounter.getTxnNumber() && + !o().txnState.isInRetryableWriteMode()); + + uassert(TxnRetryCounterTooOldInfo(*o().activeTxnNumberAndRetryCounter.getTxnRetryCounter()), + str::stream() << "Cannot continue transaction " + << txnNumberAndRetryCounter.getTxnNumber() << " on session " + << _sessionId() << " using txnRetryCounter " + << txnNumberAndRetryCounter.getTxnRetryCounter() << " because it has already been restarted using a higher" - << " txnRetryCounter " << o().activeTxnRetryCounter, - txnRetryCounter >= o().activeTxnRetryCounter); + << " txnRetryCounter " + << o().activeTxnNumberAndRetryCounter.getTxnRetryCounter(), + txnNumberAndRetryCounter.getTxnRetryCounter() >= + o().activeTxnNumberAndRetryCounter.getTxnRetryCounter()); uassert(ErrorCodes::IllegalOperation, - str::stream() << "Cannot continue transaction " << txnNumber << " on session " - << _sessionId() << " using txnRetryCounter " << txnRetryCounter + str::stream() << "Cannot continue transaction " + << txnNumberAndRetryCounter.getTxnNumber() << " on session " + << _sessionId() << " using txnNumberAndRetryCounter.getTxnRetryCounter() " + << txnNumberAndRetryCounter.getTxnRetryCounter() << " because it is currently in state " << o().txnState - << " with txnRetryCounter " << o().activeTxnRetryCounter, - txnRetryCounter == o().activeTxnRetryCounter); + << " with txnRetryCounter " + << o().activeTxnNumberAndRetryCounter.getTxnRetryCounter(), + txnNumberAndRetryCounter.getTxnRetryCounter() == + o().activeTxnNumberAndRetryCounter.getTxnRetryCounter()); if (o().txnState.isInProgress() && !o().txnResourceStash) { // This indicates that the first command in the transaction failed but did not implicitly @@ -528,23 +541,30 @@ void TransactionParticipant::Participant::_continueMultiDocumentTransaction( uasserted( ErrorCodes::NoSuchTransaction, str::stream() - << "Transaction " << txnNumber + << "Transaction with " << txnNumberAndRetryCounter.toBSON() << " has been aborted because an earlier command in this transaction failed."); } } void TransactionParticipant::Participant::_beginMultiDocumentTransaction( - OperationContext* opCtx, const TxnNumber& txnNumber, const TxnRetryCounter& txnRetryCounter) { - if (txnNumber == o().activeTxnNumber) { - if (txnRetryCounter < o().activeTxnRetryCounter) { - uasserted(TxnRetryCounterTooOldInfo(o().activeTxnRetryCounter), - str::stream() - << "Cannot start a transaction at given transaction number " << txnNumber - << " on session " << _sessionId() << " using txnRetryCounter " - << txnRetryCounter << " because it has already been restarted using a " - << "higher txnRetryCounter " << o().activeTxnRetryCounter); - } else if (txnRetryCounter == o().activeTxnRetryCounter || - o().activeTxnRetryCounter == kUninitializedTxnRetryCounter) { + OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { + if (txnNumberAndRetryCounter.getTxnNumber() == + o().activeTxnNumberAndRetryCounter.getTxnNumber()) { + if (txnNumberAndRetryCounter.getTxnRetryCounter() < + o().activeTxnNumberAndRetryCounter.getTxnRetryCounter()) { + uasserted( + TxnRetryCounterTooOldInfo(*o().activeTxnNumberAndRetryCounter.getTxnRetryCounter()), + str::stream() << "Cannot start a transaction at given transaction number " + << txnNumberAndRetryCounter.getTxnNumber() << " on session " + << _sessionId() << " using txnRetryCounter " + << txnNumberAndRetryCounter.getTxnRetryCounter() + << " because it has already been restarted using a " + << "higher txnRetryCounter " + << o().activeTxnNumberAndRetryCounter.getTxnRetryCounter()); + } else if (txnNumberAndRetryCounter.getTxnRetryCounter() == + o().activeTxnNumberAndRetryCounter.getTxnRetryCounter() || + o().activeTxnNumberAndRetryCounter.getTxnRetryCounter() == + kUninitializedTxnRetryCounter) { // Servers in a sharded cluster can start a new transaction at the active transaction // number to allow internal retries by routers on re-targeting errors, like // StaleShard/DatabaseVersion or SnapshotTooOld. @@ -570,26 +590,31 @@ void TransactionParticipant::Participant::_beginMultiDocumentTransaction( const auto restartableStates = TransactionState::kNone | TransactionState::kAbortedWithoutPrepare; uassert(50911, - str::stream() << "Cannot start a transaction at given transaction number " - << txnNumber << " a transaction with the same number is in state " + str::stream() << "Cannot start a transaction at given transaction with " + << txnNumberAndRetryCounter.toBSON() + << " because a transaction with the same number is in state " << o().txnState, o().txnState.isInSet(restartableStates)); } else { const auto restartableStates = TransactionState::kNone | TransactionState::kInProgress | TransactionState::kAbortedWithoutPrepare | TransactionState::kAbortedWithPrepare; uassert(ErrorCodes::IllegalOperation, - str::stream() << "Cannot restart transaction " << txnNumber - << " using txnRetryCounter " << txnRetryCounter + str::stream() << "Cannot restart transaction " + << txnNumberAndRetryCounter.getTxnNumber() + << " using txnRetryCounter " + << txnNumberAndRetryCounter.getTxnRetryCounter() << " because it is already in state " << o().txnState - << " with txnRetryCounter " << o().activeTxnRetryCounter, + << " with txnRetryCounter " + << o().activeTxnNumberAndRetryCounter.getTxnRetryCounter(), o().txnState.isInSet(restartableStates)); } } else { - invariant(txnNumber > o().activeTxnNumber); + invariant(txnNumberAndRetryCounter.getTxnNumber() > + o().activeTxnNumberAndRetryCounter.getTxnNumber()); } // Aborts any in-progress txns. - _setNewTxnNumber(opCtx, txnNumber, txnRetryCounter); + _setNewTxnNumberAndRetryCounter(opCtx, txnNumberAndRetryCounter); p().autoCommit = false; stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -616,10 +641,9 @@ void TransactionParticipant::Participant::_beginMultiDocumentTransaction( void TransactionParticipant::Participant::beginOrContinue( OperationContext* opCtx, - TxnNumber txnNumber, + TxnNumberAndRetryCounter txnNumberAndRetryCounter, boost::optional<bool> autocommit, - boost::optional<bool> startTransaction, - boost::optional<TxnRetryCounter> txnRetryCounter) { + boost::optional<bool> startTransaction) { // Make sure we are still a primary. We need to hold on to the RSTL through the end of this // method, as we otherwise risk stepping down in the interim and incorrectly updating the // transaction number, which can abort active transactions. @@ -640,22 +664,26 @@ void TransactionParticipant::Participant::beginOrContinue( getTestCommandsEnabled()); } - if (txnNumber < o().activeTxnNumber) { + if (txnNumberAndRetryCounter.getTxnNumber() < + o().activeTxnNumberAndRetryCounter.getTxnNumber()) { const std::string currOperation = o().txnState.isInRetryableWriteMode() ? "retryable write" : "transaction"; if (!autocommit) { uasserted(ErrorCodes::TransactionTooOld, str::stream() - << "Retryable write with txnNumber " << txnNumber - << " is prohibited on session " << _sessionId() << " because a newer " - << currOperation << " with txnNumber " << o().activeTxnNumber + << "Retryable write with txnNumber " + << txnNumberAndRetryCounter.getTxnNumber() << " is prohibited on session " + << _sessionId() << " because a newer " << currOperation + << " with txnNumber " << o().activeTxnNumberAndRetryCounter.getTxnNumber() << " has already started on this session."); } else { uasserted(ErrorCodes::TransactionTooOld, - str::stream() << "Cannot start transaction " << txnNumber << " on session " - << _sessionId() << " because a newer " << currOperation - << " with txnNumber " << o().activeTxnNumber - << " has already started on this session."); + str::stream() + << "Cannot start transaction with " << txnNumberAndRetryCounter.toBSON() + << " on session " << _sessionId() << " because a newer " << currOperation + << " with txnNumberAndRetryCounter " + << o().activeTxnNumberAndRetryCounter.toBSON() + << " has already started on this session."); } } @@ -664,9 +692,9 @@ void TransactionParticipant::Participant::beginOrContinue( // startTransaction, which is verified earlier when parsing the request. if (!autocommit) { invariant(!startTransaction); - invariant(!txnRetryCounter.has_value(), + invariant(!txnNumberAndRetryCounter.getTxnRetryCounter(), "Cannot specify a txnRetryCounter for retryable write"); - _beginOrContinueRetryableWrite(opCtx, txnNumber); + _beginOrContinueRetryableWrite(opCtx, txnNumberAndRetryCounter); return; } @@ -675,17 +703,18 @@ void TransactionParticipant::Participant::beginOrContinue( // is verified earlier when parsing the request. invariant(*autocommit == false); invariant(opCtx->inMultiDocumentTransaction()); - if (txnRetryCounter.has_value()) { + if (txnNumberAndRetryCounter.getTxnRetryCounter()) { uassert(ErrorCodes::InvalidOptions, "txnRetryCounter is only supported in sharded clusters", serverGlobalParams.clusterRole != ClusterRole::None); - invariant(*txnRetryCounter >= 0, "Cannot specify a negative txnRetryCounter"); + invariant(*txnNumberAndRetryCounter.getTxnRetryCounter() >= 0, + "Cannot specify a negative txnRetryCounter"); } else { - txnRetryCounter = 0; + txnNumberAndRetryCounter.setTxnRetryCounter(0); } if (!startTransaction) { - _continueMultiDocumentTransaction(opCtx, txnNumber, *txnRetryCounter); + _continueMultiDocumentTransaction(opCtx, txnNumberAndRetryCounter); return; } @@ -693,22 +722,23 @@ void TransactionParticipant::Participant::beginOrContinue( // an argument on the request. The 'startTransaction' argument currently can only be specified // as true, which is verified earlier, when parsing the request. invariant(*startTransaction); - _beginMultiDocumentTransaction(opCtx, txnNumber, *txnRetryCounter); + _beginMultiDocumentTransaction(opCtx, txnNumberAndRetryCounter); } void TransactionParticipant::Participant::beginOrContinueTransactionUnconditionally( - OperationContext* opCtx, - TxnNumber txnNumber, - boost::optional<TxnRetryCounter> txnRetryCounter) { + OperationContext* opCtx, TxnNumberAndRetryCounter txnNumberAndRetryCounter) { invariant(opCtx->inMultiDocumentTransaction()); // We don't check or fetch any on-disk state, so treat the transaction as 'valid' for the // purposes of this method and continue the transaction unconditionally p().isValid = true; - if (o().activeTxnNumber != txnNumber) { - _beginMultiDocumentTransaction( - opCtx, txnNumber, txnRetryCounter.has_value() ? *txnRetryCounter : 0); + if (o().activeTxnNumberAndRetryCounter.getTxnNumber() != + txnNumberAndRetryCounter.getTxnNumber()) { + if (!txnNumberAndRetryCounter.getTxnRetryCounter()) { + txnNumberAndRetryCounter.setTxnRetryCounter(0); + } + _beginMultiDocumentTransaction(opCtx, txnNumberAndRetryCounter); } else { invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared), str::stream() << "Current state: " << o().txnState); @@ -1003,7 +1033,7 @@ void TransactionParticipant::Participant::_stashActiveTransaction(OperationConte return; } - invariant(o().activeTxnNumber == opCtx->getTxnNumber()); + invariant(o().activeTxnNumberAndRetryCounter.getTxnNumber() == opCtx->getTxnNumber()); stdx::lock_guard<Client> lk(*opCtx->getClient()); { @@ -1105,7 +1135,7 @@ void TransactionParticipant::Participant::unstashTransactionResources(OperationC return; } - _checkIsCommandValidWithTxnState(*opCtx->getTxnNumber(), cmdName); + _checkIsCommandValidWithTxnState({*opCtx->getTxnNumber()}, cmdName); if (o().txnResourceStash) { MaxLockTimeout maxLockTimeout; // Default is we should acquire ticket. @@ -1376,7 +1406,8 @@ void TransactionParticipant::Participant::addTransactionOperation( } invariant(o().txnState.isInProgress(), str::stream() << "Current state: " << o().txnState); - invariant(p().autoCommit && !*p().autoCommit && o().activeTxnNumber != kUninitializedTxnNumber); + invariant(p().autoCommit && !*p().autoCommit && + o().activeTxnNumberAndRetryCounter.getTxnNumber() != kUninitializedTxnNumber); invariant(opCtx->lockState()->inAWriteUnitOfWork()); p().transactionOperations.push_back(operation); const auto stmtIds = operation.getStatementIds(); @@ -1776,9 +1807,9 @@ void TransactionParticipant::Participant::_finishAbortingActiveTransaction( // When the state of active transaction on session is not expected, it means another // thread has already aborted the transaction on session. if (o().txnState.isInSet(expectedStates)) { - invariant(opCtx->getTxnNumber() == o().activeTxnNumber); + invariant(opCtx->getTxnNumber() == o().activeTxnNumberAndRetryCounter.getTxnNumber()); _abortTransactionOnSession(opCtx); - } else if (opCtx->getTxnNumber() == o().activeTxnNumber) { + } else if (opCtx->getTxnNumber() == o().activeTxnNumberAndRetryCounter.getTxnNumber()) { if (o().txnState.isInRetryableWriteMode()) { // The active transaction is not a multi-document transaction. invariant(opCtx->getWriteUnitOfWork() == nullptr); @@ -1859,16 +1890,19 @@ void TransactionParticipant::Participant::_cleanUpTxnResourceOnOpCtx( } void TransactionParticipant::Participant::_checkIsCommandValidWithTxnState( - const TxnNumber& requestTxnNumber, const std::string& cmdName) const { + const TxnNumberAndRetryCounter& requestTxnNumberAndRetryCounter, + const std::string& cmdName) const { uassert(ErrorCodes::NoSuchTransaction, - str::stream() << "Transaction " << requestTxnNumber << " has been aborted.", + str::stream() << "Transaction with " << requestTxnNumberAndRetryCounter.toBSON() + << " has been aborted.", !o().txnState.isAborted()); // Cannot change committed transaction but allow retrying: // - commitTransaction command. // - any command if the transaction is an internal transaction for retryable writes. uassert(ErrorCodes::TransactionCommitted, - str::stream() << "Transaction " << requestTxnNumber << " has been committed.", + str::stream() << "Transaction with " << requestTxnNumberAndRetryCounter.toBSON() + << " has been committed.", cmdName == "commitTransaction" || !o().txnState.isCommitted() || (_isInternalSessionForRetryableWrite() && o().txnState.isCommitted())); @@ -1890,7 +1924,7 @@ void TransactionParticipant::Observer::reportStashedState(OperationContext* opCt BSONObjBuilder* builder) const { if (o().txnResourceStash && o().txnResourceStash->locker()) { if (auto lockerInfo = o().txnResourceStash->locker()->getLockerInfo(boost::none)) { - invariant(o().activeTxnNumber != kUninitializedTxnNumber); + invariant(o().activeTxnNumberAndRetryCounter.getTxnNumber() != kUninitializedTxnNumber); builder->append("type", "idleSession"); builder->append("host", getHostNameCachedAndPort()); builder->append("desc", "inactive transaction"); @@ -2070,7 +2104,7 @@ std::string TransactionParticipant::Participant::_transactionInfoForLog( _sessionId().serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", o().activeTxnNumber); + parametersBuilder.append("txnNumber", o().activeTxnNumberAndRetryCounter.getTxnNumber()); parametersBuilder.append("autocommit", p().autoCommit ? *p().autoCommit : true); apiParameters.appendInfo(¶metersBuilder); readConcernArgs.appendInfo(¶metersBuilder); @@ -2144,7 +2178,7 @@ void TransactionParticipant::Participant::_transactionInfoForLog( _sessionId().serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", o().activeTxnNumber); + parametersBuilder.append("txnNumber", o().activeTxnNumberAndRetryCounter.getTxnNumber()); parametersBuilder.append("autocommit", p().autoCommit ? *p().autoCommit : true); apiParameters.appendInfo(¶metersBuilder); readConcernArgs.appendInfo(¶metersBuilder); @@ -2212,7 +2246,7 @@ BSONObj TransactionParticipant::Participant::_transactionInfoBSONForLog( _sessionId().serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", o().activeTxnNumber); + parametersBuilder.append("txnNumber", o().activeTxnNumberAndRetryCounter.getTxnNumber()); parametersBuilder.append("autocommit", p().autoCommit ? *p().autoCommit : true); apiParameters.appendInfo(¶metersBuilder); readConcernArgs.appendInfo(¶metersBuilder); @@ -2301,9 +2335,8 @@ void TransactionParticipant::Participant::_logSlowTransaction( } } -void TransactionParticipant::Participant::_setNewTxnNumber(OperationContext* opCtx, - const TxnNumber& txnNumber, - const TxnRetryCounter& txnRetryCounter) { +void TransactionParticipant::Participant::_setNewTxnNumberAndRetryCounter( + OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { uassert(ErrorCodes::PreparedTransactionInProgress, "Cannot change transaction number while the session has a prepared transaction", !o().txnState.isInSet(TransactionState::kPrepared)); @@ -2311,10 +2344,10 @@ void TransactionParticipant::Participant::_setNewTxnNumber(OperationContext* opC LOGV2_FOR_TRANSACTION( 23984, 4, - "New transaction started with txnNumber: {txnNumber} on session with lsid " + "New transaction started with txnNumber: {txnNumberAndRetryCounter} on session with lsid " "{lsid}", "New transaction started", - "txnNumber"_attr = txnNumber, + "txnNumberAndRetryCounter"_attr = txnNumberAndRetryCounter, "lsid"_attr = _sessionId().getId(), "apiParameters"_attr = APIParameters::get(opCtx).toBSON()); @@ -2324,8 +2357,7 @@ void TransactionParticipant::Participant::_setNewTxnNumber(OperationContext* opC } stdx::lock_guard<Client> lk(*opCtx->getClient()); - o(lk).activeTxnNumber = txnNumber; - o(lk).activeTxnRetryCounter = txnRetryCounter; + o(lk).activeTxnNumberAndRetryCounter = txnNumberAndRetryCounter; o(lk).lastWriteOpTime = repl::OpTime(); // Reset the retryable writes state @@ -2335,7 +2367,8 @@ void TransactionParticipant::Participant::_setNewTxnNumber(OperationContext* opC _resetTransactionState(lk, TransactionState::kNone); // Reset the transactions metrics - o(lk).transactionMetricsObserver.resetSingleTransactionStats(txnNumber); + o(lk).transactionMetricsObserver.resetSingleTransactionStats( + txnNumberAndRetryCounter.getTxnNumber()); } void TransactionParticipant::Participant::refreshFromStorageIfNeeded(OperationContext* opCtx) { @@ -2359,8 +2392,8 @@ void TransactionParticipant::Participant::_refreshFromStorageIfNeeded(OperationC const auto& lastTxnRecord = activeTxnHistory.lastTxnRecord; if (lastTxnRecord) { stdx::lock_guard<Client> lg(*opCtx->getClient()); - o(lg).activeTxnNumber = lastTxnRecord->getTxnNum(); - o(lg).activeTxnRetryCounter = [&] { + o(lg).activeTxnNumberAndRetryCounter.setTxnNumber(lastTxnRecord->getTxnNum()); + o(lg).activeTxnNumberAndRetryCounter.setTxnRetryCounter([&] { if (lastTxnRecord->getState()) { if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( serverGlobalParams.featureCompatibility)) { @@ -2376,7 +2409,7 @@ void TransactionParticipant::Participant::_refreshFromStorageIfNeeded(OperationC return 0; } return kUninitializedTxnRetryCounter; - }(); + }()); o(lg).lastWriteOpTime = lastTxnRecord->getLastWriteOpTime(); p().activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements); p().hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory; @@ -2416,7 +2449,7 @@ void TransactionParticipant::Participant::onWriteOpCompletedOnPrimary( const SessionTxnRecord& sessionTxnRecord) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); invariant(sessionTxnRecord.getSessionId() == _sessionId()); - invariant(sessionTxnRecord.getTxnNum() == o().activeTxnNumber); + invariant(sessionTxnRecord.getTxnNum() == o().activeTxnNumberAndRetryCounter.getTxnNumber()); if (o().txnState.isCommitted()) { // Only write statements in retryable internal transaction can bypass the checks in @@ -2454,7 +2487,7 @@ void TransactionParticipant::Participant::onRetryableWriteCloningCompleted( const SessionTxnRecord& sessionTxnRecord) { invariant(opCtx->lockState()->inAWriteUnitOfWork()); invariant(sessionTxnRecord.getSessionId() == _sessionId()); - invariant(sessionTxnRecord.getTxnNum() == o().activeTxnNumber); + invariant(sessionTxnRecord.getTxnNum() == o().activeTxnNumberAndRetryCounter.getTxnNumber()); const auto updateRequest = _makeUpdateRequest(sessionTxnRecord); @@ -2467,11 +2500,12 @@ void TransactionParticipant::Participant::onRetryableWriteCloningCompleted( void TransactionParticipant::Participant::_invalidate(WithLock wl) { p().isValid = false; - o(wl).activeTxnNumber = kUninitializedTxnNumber; + o(wl).activeTxnNumberAndRetryCounter = {kUninitializedTxnNumber, kUninitializedTxnRetryCounter}; o(wl).lastWriteOpTime = repl::OpTime(); // Reset the transactions metrics. - o(wl).transactionMetricsObserver.resetSingleTransactionStats(o().activeTxnNumber); + o(wl).transactionMetricsObserver.resetSingleTransactionStats( + o().activeTxnNumberAndRetryCounter.getTxnNumber()); } void TransactionParticipant::Participant::_resetRetryableWriteState() { @@ -2564,7 +2598,8 @@ boost::optional<repl::OpTime> TransactionParticipant::Participant::_checkStateme if (it == p().activeTxnCommittedStatements.end()) { uassert(ErrorCodes::IncompleteTransactionHistory, str::stream() << "Incomplete history detected for transaction " - << o().activeTxnNumber << " on session " << _sessionId(), + << o().activeTxnNumberAndRetryCounter.getTxnNumber() << " on session " + << _sessionId(), !p().hasIncompleteHistory); return boost::none; @@ -2624,7 +2659,7 @@ void TransactionParticipant::Participant::_registerUpdateCacheOnCommit( if (!insertRes.second) { const auto& existingOpTime = insertRes.first->second; fassertOnRepeatedExecution(participant._sessionId(), - participant.o().activeTxnNumber, + participant.o().activeTxnNumberAndRetryCounter, stmtId, existingOpTime, lastStmtIdWriteOpTime); @@ -2648,9 +2683,9 @@ void TransactionParticipant::Participant::_registerUpdateCacheOnCommit( if (!failBeforeCommitExceptionElem.eoo()) { const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number())); uasserted(failureCode, - str::stream() - << "Failing write for " << _sessionId() << ":" << o().activeTxnNumber - << " due to failpoint. The write must not be reflected."); + str::stream() << "Failing write for " << _sessionId() << ":" + << o().activeTxnNumberAndRetryCounter.getTxnNumber() + << " due to failpoint. The write must not be reflected."); } }); } diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index d4c0bf43919..4899131c723 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -268,18 +268,11 @@ public: explicit Observer(const ObservableSession& session); /** - * Returns the currently active transaction number on this participant. + * Returns an object containing the currently active transaction number and + * transaction retry counter on this participant. */ - TxnNumber getActiveTxnNumber() const { - return o().activeTxnNumber; - } - - /** - * Returns the last used transaction retry counter for the currently active transaction on - * this participant. - */ - TxnRetryCounter getActiveTxnRetryCounter() const { - return o().activeTxnRetryCounter; + TxnNumberAndRetryCounter getActiveTxnNumberAndRetryCounter() const { + return o().activeTxnNumberAndRetryCounter; } /** @@ -446,10 +439,9 @@ public: * when updating the shard key. */ void beginOrContinue(OperationContext* opCtx, - TxnNumber txnNumber, + TxnNumberAndRetryCounter txnNumberAndRetryCounter, boost::optional<bool> autocommit, - boost::optional<bool> startTransaction, - boost::optional<TxnRetryCounter> txnRetryCounter); + boost::optional<bool> startTransaction); /** * Used only by the secondary oplog application logic. Similar to 'beginOrContinue' without @@ -457,9 +449,7 @@ public: * the past. */ void beginOrContinueTransactionUnconditionally( - OperationContext* opCtx, - TxnNumber txnNumber, - boost::optional<TxnRetryCounter> txnRetryCounter); + OperationContext* opCtx, TxnNumberAndRetryCounter txnNumberAndRetryCounter); /** * If the participant is in prepare, returns a future whose promise is fulfilled when @@ -765,8 +755,9 @@ public: // Checks if the command can be run on this transaction based on the state of the // transaction. - void _checkIsCommandValidWithTxnState(const TxnNumber& requestTxnNumber, - const std::string& cmdName) const; + void _checkIsCommandValidWithTxnState( + const TxnNumberAndRetryCounter& requestTxnNumberAndRetryCounter, + const std::string& cmdName) const; // Logs the transaction information if it has run slower than the global parameter slowMS. // The transaction must be committed or aborted when this function is called. @@ -799,25 +790,24 @@ public: APIParameters apiParameters, repl::ReadConcernArgs readConcernArgs) const; - // Bumps up the transaction number of this transaction and perform the necessary cleanup. - void _setNewTxnNumber(OperationContext* opCtx, - const TxnNumber& txnNumber, - const TxnRetryCounter& txnRetryCounter); + // Bumps up the transaction number and transaction retry counter of this transaction and + // performs the necessary cleanup. + void _setNewTxnNumberAndRetryCounter( + OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter); // Attempt to begin or retry a retryable write at the given transaction number. - void _beginOrContinueRetryableWrite(OperationContext* opCtx, const TxnNumber& txnNumber); + void _beginOrContinueRetryableWrite( + OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter); // Attempt to begin a new multi document transaction at the given transaction number and // transaction retry counter. - void _beginMultiDocumentTransaction(OperationContext* opCtx, - const TxnNumber& txnNumber, - const TxnRetryCounter& txnRetryCounter); + void _beginMultiDocumentTransaction( + OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter); // Attempt to continue an in-progress multi document transaction at the given transaction // number and transaction retry counter. - void _continueMultiDocumentTransaction(OperationContext* opCtx, - const TxnNumber& txnNumber, - const TxnRetryCounter& txnRetryCounter); + void _continueMultiDocumentTransaction( + OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter); // Implementation of public refreshFromStorageIfNeeded methods. void _refreshFromStorageIfNeeded(OperationContext* opCtx, bool fetchOplogEntries); @@ -960,16 +950,14 @@ private: // Maintains the transaction state and the transition table for legal state transitions. TransactionState txnState; - // Tracks the last seen txn number for the session and is always >= to the transaction - // number in the last written txn record. When it is > than that in the last written txn - // record, this means a new transaction has begun on the session, but it hasn't yet - // performed any writes. - TxnNumber activeTxnNumber{kUninitializedTxnNumber}; - - // Tracks the last seen txnRetryCounter for the the current transaction. Should always be + // Tracks the last seen TxnNumber and TxnRetryCounter for the session. The txn number is + // always >= to the transaction number in the last written txn record. When it is > than + // that in the last written txn record, this means a new transaction has begun on the + // session, but it hasn't yet performed any writes. The txnRetryCounter should always be // kUninitializedTxnRetryCounter for a retryable write, and non-negative for a // multi-statement transaction. - TxnRetryCounter activeTxnRetryCounter{kUninitializedTxnRetryCounter}; + TxnNumberAndRetryCounter activeTxnNumberAndRetryCounter{kUninitializedTxnNumber, + kUninitializedTxnRetryCounter}; // Caches what is known to be the last optime written for the active transaction. repl::OpTime lastWriteOpTime; diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 0884396ddec..73f288358cc 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -229,11 +229,8 @@ protected: boost::optional<DurableTxnStateEnum> txnState) { const auto session = OperationContextSession::get(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); const auto uuid = UUID::gen(); @@ -295,11 +292,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) txnParticipant.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 20; - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); DBDirectClient client(opCtx()); @@ -315,11 +309,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 21; - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); const auto opTime = writeTxnRecord(txnNum, {0}, {}, boost::none); @@ -404,17 +395,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) { txnParticipant.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 20; - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - txnNum - 1, + {txnNum - 1}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */), + boost::none /* startTransaction */), AssertionException, ErrorCodes::TransactionTooOld); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); @@ -430,16 +417,12 @@ TEST_F(TransactionParticipantRetryableWritesTest, StringBuilder sb; sb << "Retryable write with txnNumber 21 is prohibited on session " << sessionId << " because a newer retryable write with txnNumber 22 has already started on this session."; - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), - txnNum - 1, + {txnNum - 1}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */), + boost::none /* startTransaction */), AssertionException, sb.str()); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); @@ -449,25 +432,25 @@ TEST_F(TransactionParticipantRetryableWritesTest, OldTransactionFailsOnSessionWithNewerRetryableWrite) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.refreshFromStorageIfNeeded(opCtx()); - const TxnNumber txnNum = 22; + const TxnNumberAndRetryCounter txnNumberAndRetryCounter1(21); + const TxnNumberAndRetryCounter txnNumberAndRetryCounter2(22); auto autocommit = false; const auto& sessionId = *opCtx()->getLogicalSessionId(); StringBuilder sb; - sb << "Cannot start transaction 21 on session " << sessionId - << " because a newer retryable write with txnNumber 22 has already started on this session."; + sb << "Cannot start transaction with " << txnNumberAndRetryCounter1.toBSON() << " on session " + << sessionId + << " because a newer retryable write with txnNumberAndRetryCounter { txnNumber: 22, " + "txnRetryCounter: -1 } has already started on this session."; txnParticipant.beginOrContinue(opCtx(), - txnNum, + txnNumberAndRetryCounter2, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); - ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), - txnNum - 1, - autocommit, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */), - AssertionException, - sb.str()); + boost::none /* startTransaction */); + ASSERT_THROWS_WHAT( + txnParticipant.beginOrContinue( + opCtx(), txnNumberAndRetryCounter1, autocommit, boost::none /* startTransaction */), + AssertionException, + sb.str()); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); } @@ -484,11 +467,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN ASSERT(client.runCommand(nss.db().toString(), BSON("drop" << nss.coll()), dropResult)); const TxnNumber txnNum = 21; - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); @@ -509,11 +489,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) { txnParticipant.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); ASSERT(!txnParticipant.checkStatementExecuted(opCtx(), 1000)); ASSERT(!txnParticipant.checkStatementExecutedNoOplogEntryFetch(1000)); @@ -555,11 +532,8 @@ DEATH_TEST_REGEX_F( const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 100; - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); const auto uuid = UUID::gen(); @@ -600,11 +574,8 @@ DEATH_TEST_REGEX_F( const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 100; - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); const auto uuid = UUID::gen(); @@ -644,11 +615,8 @@ DEATH_TEST_REGEX_F( txnParticipant.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); @@ -819,11 +787,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.refreshFromStorageIfNeeded(opCtx()); - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); repl::MutableOplogEntry oplogEntry; oplogEntry.setSessionId(sessionId); @@ -929,11 +894,8 @@ TEST_F(ShardTxnParticipantRetryableWritesTest, opCtx()->setTxnNumber(txnNum); const auto uuid = UUID::gen(); - txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */); { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); @@ -954,8 +916,7 @@ TEST_F(ShardTxnParticipantRetryableWritesTest, opCtx()->setInMultiDocumentTransaction(); ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue( - opCtx(), txnNum, autocommit, startTransaction, boost::none /* txnRetryCounter */), + txnParticipant.beginOrContinue(opCtx(), {txnNum}, autocommit, startTransaction), AssertionException, 50911); @@ -964,8 +925,7 @@ TEST_F(ShardTxnParticipantRetryableWritesTest, txnParticipant.refreshFromStorageIfNeeded(opCtx()); ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue( - opCtx(), txnNum, autocommit, startTransaction, boost::none /* txnRetryCounter */), + txnParticipant.beginOrContinue(opCtx(), {txnNum}, autocommit, startTransaction), AssertionException, 50911); } diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 6a9cd1bfb81..39821298d3d 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -315,10 +315,9 @@ protected: auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, false /* autocommit */, - startNewTxn /* startTransaction */, - boost::none /* txnRetryCounter */); + startNewTxn /* startTransaction */); return opCtxSession; } @@ -391,11 +390,8 @@ TEST_F(TxnParticipantTest, TransactionThrowsLockTimeoutIfLockIsUnavailable) { MongoDOperationContextSession newOpCtxSession(newOpCtx.get()); auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get()); - newTxnParticipant.beginOrContinue(newOpCtx.get(), - newTxnNum, - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + newTxnParticipant.beginOrContinue( + newOpCtx.get(), {newTxnNum}, false /* autocommit */, true /* startTransaction */); newTxnParticipant.unstashTransactionResources(newOpCtx.get(), "insert"); Date_t t1 = Date_t::now(); @@ -466,10 +462,9 @@ TEST_F(TxnParticipantTest, CannotSpecifyStartTransactionOnInProgressTxn) { // Cannot try to start a transaction that already started. ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), boost::none}, false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */), + true /* startTransaction */), AssertionException, ErrorCodes::ConflictingOperationInProgress); } @@ -486,20 +481,17 @@ TEST_F(TxnParticipantTest, AutocommitRequiredOnEveryTxnOp) { auto txnNum = *opCtx()->getTxnNumber(); // Omitting 'autocommit' after the first statement of a transaction should throw an error. - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - txnNum, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */), - AssertionException, - ErrorCodes::IncompleteTransactionHistory); + ASSERT_THROWS_CODE( + txnParticipant.beginOrContinue( + opCtx(), {txnNum}, boost::none /* autocommit */, boost::none /* startTransaction */), + AssertionException, + ErrorCodes::IncompleteTransactionHistory); // Including autocommit=false should succeed. txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, false /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); } DEATH_TEST_F(TxnParticipantTest, AutocommitCannotBeTrue, "invariant") { @@ -508,10 +500,9 @@ DEATH_TEST_F(TxnParticipantTest, AutocommitCannotBeTrue, "invariant") { // Passing 'autocommit=true' is not allowed and should crash. txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, true /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); } DEATH_TEST_F(TxnParticipantTest, StartTransactionCannotBeFalse, "invariant") { @@ -519,11 +510,8 @@ DEATH_TEST_F(TxnParticipantTest, StartTransactionCannotBeFalse, "invariant") { auto txnParticipant = TransactionParticipant::get(opCtx()); // Passing 'startTransaction=false' is not allowed and should crash. - txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - false /* autocommit */, - false /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, false /* autocommit */, false /* startTransaction */); } TEST_F(TxnParticipantTest, SameTransactionPreservesStoredStatements) { @@ -547,10 +535,9 @@ TEST_F(TxnParticipantTest, SameTransactionPreservesStoredStatements) { // Re-opening the same transaction should have no effect. txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, false /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); ASSERT_BSONOBJ_EQ(operation.toBSON(), txnParticipant.getTransactionOperationsForTest()[0].toBSON()); } @@ -831,10 +818,9 @@ TEST_F(TxnParticipantTest, KillOpBeforeCommittingPreparedTransaction) { auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx); auto newTxnParticipant = TransactionParticipant::get(opCtx); newTxnParticipant.beginOrContinue(opCtx, - *(opCtx->getTxnNumber()), + {*(opCtx->getTxnNumber())}, false /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); newTxnParticipant.unstashTransactionResources(opCtx, "commitTransaction"); newTxnParticipant.commitPreparedTransaction(opCtx, prepareTimestamp, boost::none); @@ -876,10 +862,9 @@ TEST_F(TxnParticipantTest, KillOpBeforeAbortingPreparedTransaction) { auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx); auto newTxnParticipant = TransactionParticipant::get(opCtx); newTxnParticipant.beginOrContinue(opCtx, - *(opCtx->getTxnNumber()), + {*(opCtx->getTxnNumber())}, false /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); newTxnParticipant.unstashTransactionResources(opCtx, "commitTransaction"); newTxnParticipant.commitPreparedTransaction(opCtx, prepareTimestamp, boost::none); @@ -1184,10 +1169,9 @@ TEST_F(TxnParticipantTest, ContinuingATransactionWithNoResourcesAborts) { auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, false /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */), + boost::none /* startTransaction */), AssertionException, ErrorCodes::NoSuchTransaction); } @@ -1201,10 +1185,9 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionIfNotPrimary) { // Include 'autocommit=false' for transactions. ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */), + true /* startTransaction */), AssertionException, ErrorCodes::NotWritablePrimary); } @@ -1218,10 +1201,9 @@ TEST_F(TxnParticipantTest, CannotStartRetryableWriteIfNotPrimary) { // Omit the 'autocommit' field for retryable writes. ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, boost::none /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */), + true /* startTransaction */), AssertionException, ErrorCodes::NotWritablePrimary); } @@ -1238,10 +1220,9 @@ TEST_F(TxnParticipantTest, CannotContinueTransactionIfNotPrimary) { // Technically, the transaction should have been aborted on stepdown anyway, but it // doesn't hurt to have this kind of coverage. ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, false /* autocommit */, - false /* startTransaction */, - boost::none /* txnRetryCounter */), + false /* startTransaction */), AssertionException, ErrorCodes::NotWritablePrimary); } @@ -1256,13 +1237,11 @@ TEST_F(TxnParticipantTest, OlderTransactionFailsOnSessionWithNewerTransaction) { const auto& sessionId = *opCtx()->getLogicalSessionId(); StringBuilder sb; - sb << "Cannot start transaction 19 on session " << sessionId - << " because a newer transaction with txnNumber 20 has already started on this session."; - ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber() - 1, - autocommit, - startTransaction, - boost::none /* txnRetryCounter */), + sb << "Cannot start transaction with { txnNumber: 19 } on session " << sessionId + << " because a newer transaction with txnNumberAndRetryCounter { txnNumber: 20, " + "txnRetryCounter: 0 } has already started on this session."; + ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber() - 1}, autocommit, startTransaction), AssertionException, sb.str()); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); @@ -1280,10 +1259,9 @@ TEST_F(TxnParticipantTest, OldRetryableWriteFailsOnSessionWithNewerTransaction) sb << "Retryable write with txnNumber 19 is prohibited on session " << sessionId << " because a newer transaction with txnNumber 20 has already started on this session."; ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber() - 1, + {*opCtx()->getTxnNumber() - 1}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */), + boost::none /* startTransaction */), AssertionException, sb.str()); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); @@ -1314,23 +1292,22 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr ScopeGuard guard([&]() { OperationContextSession::checkOut(opCtx()); }); // Try to start a new transaction while there is already a prepared transaction on the // session. This should fail with a PreparedTransactionInProgress error. - runFunctionFromDifferentOpCtx([lsid = *opCtx()->getLogicalSessionId(), - txnNumberToStart = *opCtx()->getTxnNumber() + - 1](OperationContext* newOpCtx) { - newOpCtx->setLogicalSessionId(lsid); - newOpCtx->setTxnNumber(txnNumberToStart); - newOpCtx->setInMultiDocumentTransaction(); - - MongoDOperationContextSession ocs(newOpCtx); - auto txnParticipant = TransactionParticipant::get(newOpCtx); - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(newOpCtx, - txnNumberToStart, - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */), - AssertionException, - ErrorCodes::PreparedTransactionInProgress); - }); + runFunctionFromDifferentOpCtx( + [lsid = *opCtx()->getLogicalSessionId(), + txnNumberToStart = *opCtx()->getTxnNumber() + 1](OperationContext* newOpCtx) { + newOpCtx->setLogicalSessionId(lsid); + newOpCtx->setTxnNumber(txnNumberToStart); + newOpCtx->setInMultiDocumentTransaction(); + + MongoDOperationContextSession ocs(newOpCtx); + auto txnParticipant = TransactionParticipant::get(newOpCtx); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(newOpCtx, + {txnNumberToStart}, + false /* autocommit */, + true /* startTransaction */), + AssertionException, + ErrorCodes::PreparedTransactionInProgress); + }); } ASSERT_FALSE(txnParticipant.transactionIsAborted()); @@ -1360,10 +1337,9 @@ TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, false /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */), + boost::none /* startTransaction */), AssertionException, ErrorCodes::NoSuchTransaction); } @@ -1511,11 +1487,8 @@ protected: auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT(txnParticipant.transactionIsOpen()); - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - autocommit, - startTransaction, - boost::none /* txnRetryCounter */), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, autocommit, startTransaction), AssertionException, 50911); } @@ -1531,11 +1504,8 @@ protected: txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); - txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - autocommit, - startTransaction, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, autocommit, startTransaction); ASSERT(txnParticipant.transactionIsOpen()); } @@ -1550,11 +1520,8 @@ protected: txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); txnParticipant.commitUnpreparedTransaction(opCtx()); - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - autocommit, - startTransaction, - boost::none /* txnRetryCounter */), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, autocommit, startTransaction), AssertionException, 50911); } @@ -1573,11 +1540,8 @@ protected: txnParticipant.addTransactionOperation(opCtx(), operation); txnParticipant.prepareTransaction(opCtx(), {}); - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - autocommit, - startTransaction, - boost::none /* txnRetryCounter */), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, autocommit, startTransaction), AssertionException, 50911); } @@ -1598,11 +1562,8 @@ protected: ASSERT(txnParticipant.transactionIsAborted()); startTransaction = true; - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - autocommit, - startTransaction, - boost::none /* txnRetryCounter */), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, autocommit, startTransaction), AssertionException, 50911); } @@ -1612,20 +1573,16 @@ protected: auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); ASSERT_FALSE(txnParticipant.transactionIsOpen()); auto autocommit = false; auto startTransaction = true; - txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - autocommit, - startTransaction, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, autocommit, startTransaction); ASSERT(txnParticipant.transactionIsOpen()); } @@ -2244,11 +2201,8 @@ TEST_F(TransactionsMetricsTest, TransactionErrorsBeforeUnstash) { auto txnParticipant = TransactionParticipant::get(opCtx()); const bool autocommit = false; const boost::optional<bool> startTransaction = boost::none; - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - autocommit, - startTransaction, - boost::none /* txnRetryCounter */), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, autocommit, startTransaction), AssertionException, ErrorCodes::NoSuchTransaction); @@ -2507,11 +2461,8 @@ TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldBeSetUponUnstashAndStash) // Start a new transaction. const auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - txnParticipant.beginOrContinue(opCtx(), - higherTxnNum, - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {higherTxnNum}, false /* autocommit */, true /* startTransaction */); // Time active should be zero for a new transaction. ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getTimeActiveMicros( @@ -3041,10 +2992,9 @@ TEST_F(TransactionsMetricsTest, ReportUnstashedResourcesForARetryableWrite) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); txnParticipant.unstashTransactionResources(opCtx(), "find"); // Build a BSONObj containing the details which we expect to see reported when we invoke @@ -3075,10 +3025,9 @@ TEST_F(TransactionsMetricsTest, UseAPIParametersOnOpCtxForARetryableWrite) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); APIParameters secondAPIParameters = APIParameters(); secondAPIParameters.setAPIVersion("3"); @@ -4203,7 +4152,10 @@ TEST_F(TxnParticipantTest, RollbackResetsInMemoryStateOfPreparedTransaction) { ASSERT(txnParticipant.transactionIsPrepared()); ASSERT_EQ(txnParticipant.getTransactionOperationsForTest().size(), 1U); ASSERT_EQ(txnParticipant.getPrepareOpTime().getTimestamp(), prepareTimestamp); - ASSERT_NE(txnParticipant.getActiveTxnNumber(), kUninitializedTxnNumber); + ASSERT_NE(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber(), + kUninitializedTxnNumber); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); + txnParticipant.abortTransaction(opCtx()); txnParticipant.invalidate(opCtx()); @@ -4213,7 +4165,10 @@ TEST_F(TxnParticipantTest, RollbackResetsInMemoryStateOfPreparedTransaction) { ASSERT_FALSE(txnParticipant.transactionIsPrepared()); ASSERT_EQ(txnParticipant.getTransactionOperationsForTest().size(), 0U); ASSERT_EQ(txnParticipant.getPrepareOpTime().getTimestamp(), Timestamp()); - ASSERT_EQ(txnParticipant.getActiveTxnNumber(), kUninitializedTxnNumber); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber(), + kUninitializedTxnNumber); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), + kUninitializedTxnRetryCounter); } TEST_F(TxnParticipantTest, PrepareTransactionAsSecondarySetsThePrepareOpTime) { @@ -4318,7 +4273,7 @@ TEST_F(TxnParticipantTest, AbortTransactionOnSessionCheckoutWithoutRefresh) { auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT(txnParticipant.transactionIsOpen()); - ASSERT_EQ(txnParticipant.getActiveTxnNumber(), txnNumber); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber(), txnNumber); txnParticipant.unstashTransactionResources(opCtx(), "abortTransaction"); txnParticipant.abortTransaction(opCtx()); @@ -4338,10 +4293,9 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyFalseIfInRetryableWrite) { // Start a retryable write. txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber()}, boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + boost::none /* startTransaction */); ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); } @@ -4351,11 +4305,8 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyTrueIfInProgressAndOperati ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); // Start a transaction. - txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly()); txnParticipant.unstashTransactionResources(opCtx(), "find"); @@ -4369,11 +4320,8 @@ TEST_F(TxnParticipantTest, ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); // Start a transaction. - txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); @@ -4392,11 +4340,8 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyFalseIfAborted) { ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); // Start a transaction. - txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly()); txnParticipant.unstashTransactionResources(opCtx(), "find"); @@ -4480,11 +4425,8 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnAbortAfterPrepare) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); txnParticipant.unstashTransactionResources(opCtx(), "find"); @@ -4510,11 +4452,8 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnCommitAfterPrepare) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + opCtx(), {*opCtx()->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); txnParticipant.unstashTransactionResources(opCtx(), "find"); @@ -4539,30 +4478,27 @@ TEST_F(ShardTxnParticipantTest, CanSpecifyTxnRetryCounterOnShardSvr) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */); + true /* startTransaction */); } TEST_F(ConfigTxnParticipantTest, CanSpecifyTxnRetryCounterOnConfigSvr) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */); + true /* startTransaction */); } TEST_F(TxnParticipantTest, CanOnlySpecifyTxnRetryCounterInShardedClusters) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */), + true /* startTransaction */), AssertionException, ErrorCodes::InvalidOptions); } @@ -4573,10 +4509,9 @@ DEATH_TEST_F(ShardTxnParticipantTest, MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), -1}, false /* autocommit */, - true /* startTransaction */, - -1 /* txnRetryCounter */); + true /* startTransaction */); } DEATH_TEST_F(ShardTxnParticipantTest, @@ -4585,10 +4520,9 @@ DEATH_TEST_F(ShardTxnParticipantTest, MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, boost::none /* autocommit */, - boost::none /* startTransaction */, - 0 /* txnRetryCounter */), + boost::none /* startTransaction */), AssertionException, ErrorCodes::InvalidOptions); } @@ -4598,13 +4532,12 @@ TEST_F(ShardTxnParticipantTest, auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT_TRUE(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 1}, false /* autocommit */, - true /* startTransaction */, - 1 /* txnRetryCounter */); + true /* startTransaction */); ASSERT_TRUE(txnParticipant.transactionIsInProgress()); } @@ -4613,18 +4546,18 @@ TEST_F(ShardTxnParticipantTest, auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0 /* txnRetryCounter */); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), + 0 /* txnRetryCounter */); txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 1}, false /* autocommit */, - true /* startTransaction */, - 1 /* txnRetryCounter */); + true /* startTransaction */); ASSERT(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 1); } TEST_F(ShardTxnParticipantTest, @@ -4636,15 +4569,14 @@ TEST_F(ShardTxnParticipantTest, txnParticipant.prepareTransaction(opCtx(), {}); txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 1}, false /* autocommit */, - true /* startTransaction */, - 1 /* txnRetryCounter */); + true /* startTransaction */); ASSERT(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 1); } TEST_F(ShardTxnParticipantTest, @@ -4655,17 +4587,16 @@ TEST_F(ShardTxnParticipantTest, txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); txnParticipant.commitUnpreparedTransaction(opCtx()); ASSERT(txnParticipant.transactionIsCommitted()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 1}, false /* autocommit */, - true /* startTransaction */, - 1 /* txnRetryCounter */), + true /* startTransaction */), AssertionException, ErrorCodes::IllegalOperation); ASSERT(txnParticipant.transactionIsCommitted()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); } TEST_F(ShardTxnParticipantTest, @@ -4678,17 +4609,16 @@ TEST_F(ShardTxnParticipantTest, const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {}); ASSERT_TRUE(txnParticipant.transactionIsCommitted()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 1}, false /* autocommit */, - true /* startTransaction */, - 1 /* txnRetryCounter */), + true /* startTransaction */), AssertionException, ErrorCodes::IllegalOperation); ASSERT(txnParticipant.transactionIsCommitted()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); } TEST_F(ShardTxnParticipantTest, @@ -4699,17 +4629,16 @@ TEST_F(ShardTxnParticipantTest, txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); txnParticipant.prepareTransaction(opCtx(), {}); ASSERT_TRUE(txnParticipant.transactionIsPrepared()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 1}, false /* autocommit */, - true /* startTransaction */, - 1 /* txnRetryCounter */), + true /* startTransaction */), AssertionException, ErrorCodes::IllegalOperation); ASSERT(txnParticipant.transactionIsPrepared()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); } TEST_F(ShardTxnParticipantTest, CannotRestartTransactionUsingTxnRetryCounterLessThanLastUsed) { @@ -4718,67 +4647,62 @@ TEST_F(ShardTxnParticipantTest, CannotRestartTransactionUsingTxnRetryCounterLess ASSERT_TRUE(txnParticipant.transactionIsInProgress()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 1}, false /* autocommit */, - true /* startTransaction */, - 1 /* txnRetryCounter */); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); + true /* startTransaction */); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 1); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */), + true /* startTransaction */), AssertionException, ErrorCodes::TxnRetryCounterTooOld); try { txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */); + true /* startTransaction */); ASSERT(false); } catch (const TxnRetryCounterTooOldException& ex) { auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>(); ASSERT_EQ(info->getTxnRetryCounter(), 1); } ASSERT_TRUE(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 1); } TEST_F(ShardTxnParticipantTest, CanContinueTransactionUsingTxnRetryCounterEqualToLastUsed) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT_TRUE(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); txnParticipant.unstashTransactionResources(opCtx(), "insert"); txnParticipant.stashTransactionResources(opCtx()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - boost::none /* startTransaction */, - 0 /* txnRetryCounter */); + boost::none /* startTransaction */); ASSERT_TRUE(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); } TEST_F(ShardTxnParticipantTest, CannotContinueTransactionUsingTxnRetryCounterGreaterThanLastUsed) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT_TRUE(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); txnParticipant.unstashTransactionResources(opCtx(), "insert"); txnParticipant.stashTransactionResources(opCtx()); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 1}, false /* autocommit */, - boost::none /* startTransaction */, - 1 /* txnRetryCounter */), + boost::none /* startTransaction */), AssertionException, ErrorCodes::IllegalOperation); ASSERT_TRUE(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 0); } TEST_F(ShardTxnParticipantTest, CannotContinueTransactionUsingTxnRetryCounterLessThanLastUsed) { @@ -4787,34 +4711,31 @@ TEST_F(ShardTxnParticipantTest, CannotContinueTransactionUsingTxnRetryCounterLes ASSERT_TRUE(txnParticipant.transactionIsInProgress()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 1}, false /* autocommit */, - true /* startTransaction */, - 1 /* txnRetryCounter */); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); + true /* startTransaction */); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 1); txnParticipant.unstashTransactionResources(opCtx(), "insert"); txnParticipant.stashTransactionResources(opCtx()); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - boost::none /* startTransaction */, - 0 /* txnRetryCounter */), + boost::none /* startTransaction */), AssertionException, ErrorCodes::TxnRetryCounterTooOld); try { txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */); + true /* startTransaction */); ASSERT(false); } catch (const TxnRetryCounterTooOldException& ex) { auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>(); ASSERT_EQ(info->getTxnRetryCounter(), 1); } ASSERT_TRUE(txnParticipant.transactionIsInProgress()); - ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); + ASSERT_EQ(txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnRetryCounter(), 1); } TEST_F(ShardTxnParticipantTest, CannotRetryInProgressTransactionForRetryableWrites) { @@ -4827,10 +4748,9 @@ TEST_F(ShardTxnParticipantTest, CannotRetryInProgressTransactionForRetryableWrit txnParticipant.stashTransactionResources(opCtx()); ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */), + true /* startTransaction */), AssertionException, 50911); ASSERT_TRUE(txnParticipant.transactionIsInProgress()); @@ -4847,10 +4767,9 @@ TEST_F(ShardTxnParticipantTest, CannotRetryPreparedTransactionForRetryableWrites // TODO (SERVER-60917): Make transaction participants throw RetryableTransactionInProgress if a // retry arrives while the transaction has been committed or aborted ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */), + true /* startTransaction */), AssertionException, 50911); ASSERT(txnParticipant.transactionIsPrepared()); @@ -4866,10 +4785,9 @@ TEST_F(ShardTxnParticipantTest, CanRetryCommittedUnpreparedTransactionForRetryab ASSERT(txnParticipant.transactionIsCommitted()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */); + true /* startTransaction */); ASSERT(txnParticipant.transactionIsCommitted()); } @@ -4885,10 +4803,9 @@ TEST_F(ShardTxnParticipantTest, CanRetryCommittedPreparedTransactionForRetryable ASSERT_TRUE(txnParticipant.transactionIsCommitted()); txnParticipant.beginOrContinue(opCtx(), - *opCtx()->getTxnNumber(), + {*opCtx()->getTxnNumber(), 0}, false /* autocommit */, - true /* startTransaction */, - 0 /* txnRetryCounter */); + true /* startTransaction */); ASSERT(txnParticipant.transactionIsCommitted()); } diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index bcd823446a7..7ba0257b1ec 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -704,8 +704,9 @@ public: private: BSONObj _getTxnDoc() { auto txnParticipant = TransactionParticipant::get(_opCtx); - auto txnsFilter = BSON("_id" << _opCtx->getLogicalSessionId()->toBSON() << "txnNum" - << txnParticipant.getActiveTxnNumber()); + auto txnsFilter = + BSON("_id" << _opCtx->getLogicalSessionId()->toBSON() << "txnNum" + << txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber()); return queryCollection(NamespaceString::kSessionTransactionsTableNamespace, txnsFilter); } }; @@ -1767,11 +1768,8 @@ public: auto txnParticipant = TransactionParticipant::get(_opCtx); ASSERT(txnParticipant); - txnParticipant.beginOrContinue(_opCtx, - *_opCtx->getTxnNumber(), - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + _opCtx, {*_opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); txnParticipant.unstashTransactionResources(_opCtx, "insert"); { // Insert a document that will set the index as multikey. @@ -3467,11 +3465,8 @@ public: auto txnParticipant = TransactionParticipant::get(_opCtx); ASSERT(txnParticipant); // Start a retryable write. - txnParticipant.beginOrContinue(_opCtx, - txnNumber, - boost::none /* autocommit */, - boost::none /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + _opCtx, {txnNumber}, boost::none /* autocommit */, boost::none /* startTransaction */); } protected: @@ -3676,11 +3671,8 @@ public: auto txnParticipant = TransactionParticipant::get(_opCtx); ASSERT(txnParticipant); - txnParticipant.beginOrContinue(_opCtx, - *_opCtx->getTxnNumber(), - false /* autocommit */, - true /* startTransaction */, - boost::none /* txnRetryCounter */); + txnParticipant.beginOrContinue( + _opCtx, {*_opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); txnParticipant.unstashTransactionResources(_opCtx, "insert"); { AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX); |