diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2021-08-19 05:37:25 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-09-01 22:02:16 +0000 |
commit | 22beeff23a26e44127a15587e8bfd84f1d1e916c (patch) | |
tree | bc88a80b18693021bdb60db0399d9ec20630e97c | |
parent | 9486a2779da1e8821b4b6d90ef3327a649c10b62 (diff) | |
download | mongo-22beeff23a26e44127a15587e8bfd84f1d1e916c.tar.gz |
SERVER-58752 Support retrying internal transactions on transient transaction errors
43 files changed, 2567 insertions, 608 deletions
diff --git a/buildscripts/resmokeconfig/suites/multiversion_auth.yml b/buildscripts/resmokeconfig/suites/multiversion_auth.yml index a5a386b0a10..a4ba52a8b83 100644 --- a/buildscripts/resmokeconfig/suites/multiversion_auth.yml +++ b/buildscripts/resmokeconfig/suites/multiversion_auth.yml @@ -25,9 +25,10 @@ selector: # Skip any tests that run with auth explicitly. - jstests/multiVersion/load_keys_on_upgrade.js - # TODO (SERVER-59343): Investigate AuthorizationSession error after running setFCV in - # internal_sessions.js in multiversion_auth suite. + # TODO (SERVER-59343): Investigate AuthorizationSession error in internal session tests in + # auth suites. - jstests/multiVersion/internal_sessions.js + - jstests/multiVersion/internal_transactions_retry_on_transient_transaction_error.js exclude_with_any_tags: - featureFlagToaster diff --git a/jstests/multiVersion/internal_transactions_retry_on_transient_transaction_error.js b/jstests/multiVersion/internal_transactions_retry_on_transient_transaction_error.js new file mode 100644 index 00000000000..02613c2310e --- /dev/null +++ b/jstests/multiVersion/internal_transactions_retry_on_transient_transaction_error.js @@ -0,0 +1,66 @@ +/* + * Tests that the client can only retry a transaction that failed with a transient transaction error + * by attaching a higher txnRetryCounter when the FCV is latest. + * + * @tags: [requires_fcv_51, featureFlagInternalTransactions] + */ +(function() { +'use strict'; + +function runTest(downgradeFCV) { + load("jstests/libs/fail_point_util.js"); + + const st = new ShardingTest({shards: 1, rs: {nodes: 3}}); + const shard0Rst = st.rs0; + const shard0Primary = shard0Rst.getPrimary(); + + const kDbName = "testDb"; + const kCollName = "testColl"; + const kNs = kDbName + "." + kCollName; + const testDB = shard0Primary.getDB(kDbName); + + jsTest.log("Verify that txnRetryCounter is only supported in FCV latest"); + + assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: downgradeFCV})); + + const sessionUUID = UUID(); + const lsid = {id: sessionUUID}; + const txnNumber = NumberLong(1); + configureFailPoint(shard0Primary, + "failCommand", + { + failInternalCommands: true, + failCommands: ["insert"], + errorCode: ErrorCodes.LockBusy, + namespace: kNs + }, + {times: 1}); + const insertCmdObj = { + insert: kCollName, + documents: [{x: 1}], + lsid: lsid, + txnNumber: txnNumber, + startTransaction: true, + autocommit: false, + txnRetryCounter: NumberInt(0) + }; + assert.commandFailedWithCode(testDB.runCommand(insertCmdObj), ErrorCodes.InvalidOptions); + + assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: latestFCV})); + + assert.commandFailedWithCode(testDB.runCommand(insertCmdObj), ErrorCodes.LockBusy); + insertCmdObj.txnRetryCounter = NumberInt(1); + assert.commandWorked(testDB.runCommand(insertCmdObj)); + assert.commandWorked(testDB.adminCommand({ + commitTransaction: 1, + lsid: lsid, + txnNumber: txnNumber, + autocommit: false, + txnRetryCounter: insertCmdObj.txnRetryCounter + })); + + st.stop(); +} + +runFeatureFlagMultiversionTest('featureFlagInternalTransactions', runTest); +})(); diff --git a/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js new file mode 100644 index 00000000000..e727716666a --- /dev/null +++ b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js @@ -0,0 +1,170 @@ +/* + * Tests that a client can retry a transaction that failed with a transient transaction error by + * attaching a higher txnRetryCounter. + * + * @tags: [requires_fcv_51, featureFlagInternalTransactions] + */ +(function() { +'use strict'; + +load("jstests/libs/fail_point_util.js"); + +const kDbName = "testDb"; +const kCollName = "testColl"; +const kNs = kDbName + "." + kCollName; + +const st = new ShardingTest({shards: 1, rs: {nodes: 3}}); +const shard0Rst = st.rs0; +const shard0Primary = shard0Rst.getPrimary(); + +const mongosTestDB = st.s.getDB(kDbName); +const shard0TestDB = shard0Primary.getDB(kDbName); +assert.commandWorked(mongosTestDB.createCollection(kCollName)); + +function testCommitAfterRetry(db, lsid, txnNumber) { + const txnRetryCounter0 = NumberInt(0); + const txnRetryCounter1 = NumberInt(1); + + jsTest.log( + "Verify that the client can retry a transaction that failed with a transient " + + "transaction error by attaching a higher txnRetryCounter and commit the transaction"); + configureFailPoint(shard0Primary, + "failCommand", + { + failInternalCommands: true, + failCommands: ["insert"], + errorCode: ErrorCodes.LockBusy, + namespace: kNs + }, + {times: 1}); + const insertCmdObj0 = { + insert: kCollName, + documents: [{x: 0}], + lsid: lsid, + txnNumber: txnNumber, + startTransaction: true, + autocommit: false, + }; + assert.commandFailedWithCode( + db.runCommand(Object.assign({}, insertCmdObj0, {txnRetryCounter: txnRetryCounter0})), + ErrorCodes.LockBusy); + assert.commandWorked( + db.runCommand(Object.assign({}, insertCmdObj0, {txnRetryCounter: txnRetryCounter1}))); + + jsTest.log("Verify that the client must attach the last used txnRetryCounter in all commands " + + "in the transaction"); + const insertCmdObj1 = { + insert: kCollName, + documents: [{x: 1}], + lsid: lsid, + txnNumber: txnNumber, + autocommit: false, + }; + const insertRes0 = assert.commandFailedWithCode( + db.runCommand(Object.assign({}, insertCmdObj1, {txnRetryCounter: txnRetryCounter0})), + ErrorCodes.TxnRetryCounterTooOld); + assert.eq(txnRetryCounter1, insertRes0.txnRetryCounter, insertRes0); + // txnRetryCounter defaults to 0. + const insertRes1 = assert.commandFailedWithCode(db.runCommand(insertCmdObj1), + ErrorCodes.TxnRetryCounterTooOld); + assert.eq(txnRetryCounter1, insertRes1.txnRetryCounter, insertRes1); + assert.commandWorked( + db.runCommand(Object.assign({}, insertCmdObj1, {txnRetryCounter: txnRetryCounter1}))); + + jsTest.log("Verify that the client must attach the last used txnRetryCounter in the " + + "commitTransaction command"); + const commitCmdObj = { + commitTransaction: 1, + lsid: lsid, + txnNumber: txnNumber, + autocommit: false, + }; + const commitRes = assert.commandFailedWithCode( + db.adminCommand(Object.assign({}, commitCmdObj, {txnRetryCounter: txnRetryCounter0})), + ErrorCodes.TxnRetryCounterTooOld); + assert.eq(txnRetryCounter1, commitRes.txnRetryCounter, commitRes); + + assert.commandWorked( + db.adminCommand(Object.assign({}, commitCmdObj, {txnRetryCounter: txnRetryCounter1}))); +} + +function testAbortAfterRetry(db, lsid, txnNumber) { + const txnRetryCounter0 = NumberInt(0); + const txnRetryCounter1 = NumberInt(1); + + jsTest.log("Verify that the client can retry a transaction that failed with a transient " + + "transaction error by attaching a higher txnRetryCounter and abort the transaction"); + configureFailPoint(shard0Primary, + "failCommand", + { + failInternalCommands: true, + failCommands: ["insert"], + errorCode: ErrorCodes.LockBusy, + namespace: kNs + }, + {times: 1}); + const insertCmdObj0 = { + insert: kCollName, + documents: [{x: 0}], + lsid: lsid, + txnNumber: txnNumber, + startTransaction: true, + autocommit: false, + }; + assert.commandFailedWithCode( + db.runCommand(Object.assign({}, insertCmdObj0, {txnRetryCounter: txnRetryCounter0})), + ErrorCodes.LockBusy); + assert.commandWorked( + db.runCommand(Object.assign({}, insertCmdObj0, {txnRetryCounter: txnRetryCounter1}))); + + jsTest.log("Verify that the client must attach the last used txnRetryCounter in the " + + "abortTransaction command"); + const abortCmdObj = { + abortTransaction: 1, + lsid: lsid, + txnNumber: txnNumber, + autocommit: false, + }; + const abortRes = assert.commandFailedWithCode( + db.adminCommand(Object.assign({}, abortCmdObj, {txnRetryCounter: txnRetryCounter0})), + ErrorCodes.TxnRetryCounterTooOld); + assert.eq(txnRetryCounter1, abortRes.txnRetryCounter, abortRes); + + assert.commandWorked( + db.adminCommand(Object.assign({}, abortCmdObj, {txnRetryCounter: txnRetryCounter1}))); +} + +(() => { + jsTest.log("Test transactions in a sharded cluster"); + const sessionUUID = UUID(); + const lsid0 = {id: sessionUUID}; + testCommitAfterRetry(mongosTestDB, lsid0, NumberLong(0)); + testAbortAfterRetry(mongosTestDB, lsid0, NumberLong(1)); + + const lsid1 = {id: sessionUUID, txnNumber: NumberLong(1), stmtId: NumberInt(0)}; + testCommitAfterRetry(mongosTestDB, lsid1, NumberLong(0)); + testAbortAfterRetry(mongosTestDB, lsid1, NumberLong(1)); + + const lsid2 = {id: sessionUUID, txnUUID: UUID()}; + testCommitAfterRetry(mongosTestDB, lsid2, NumberLong(0)); + testAbortAfterRetry(mongosTestDB, lsid2, NumberLong(1)); +})(); + +(() => { + jsTest.log("Test transactions in a replica set"); + const sessionUUID = UUID(); + const lsid0 = {id: sessionUUID}; + testCommitAfterRetry(shard0TestDB, lsid0, NumberLong(0)); + testAbortAfterRetry(shard0TestDB, lsid0, NumberLong(1)); + + const lsid1 = {id: sessionUUID, txnNumber: NumberLong(1), stmtId: NumberInt(0)}; + testCommitAfterRetry(shard0TestDB, lsid1, NumberLong(0)); + testAbortAfterRetry(shard0TestDB, lsid1, NumberLong(1)); + + const lsid2 = {id: sessionUUID, txnUUID: UUID()}; + testCommitAfterRetry(shard0TestDB, lsid2, NumberLong(0)); + testAbortAfterRetry(shard0TestDB, lsid2, NumberLong(1)); +})(); + +st.stop(); +})(); diff --git a/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_validation.js b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_validation.js new file mode 100644 index 00000000000..0f3119e8679 --- /dev/null +++ b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_validation.js @@ -0,0 +1,100 @@ +/* + * Tests that txnRetryCounter is only supported in sharded clusters and in transactions. + * + * @tags: [requires_fcv_51, featureFlagInternalTransactions] + */ +(function() { +'use strict'; + +load("jstests/libs/fail_point_util.js"); + +const kDbName = "testDb"; +const kCollName = "testColl"; +const kNs = kDbName + "." + kCollName; + +(() => { + const rst = new ReplSetTest({nodes: 1}); + + rst.startSet(); + rst.initiate(); + + const primary = rst.getPrimary(); + const testDB = primary.getDB(kDbName); + + jsTest.log("Verify that txnRetryCounter is only supported in sharded clusters"); + const insertCmdObj = { + insert: kCollName, + documents: [{x: 0}], + lsid: {id: UUID()}, + txnNumber: NumberLong(1), + startTransaction: true, + autocommit: false, + txnRetryCounter: NumberInt(0) + }; + assert.commandFailedWithCode(testDB.runCommand(insertCmdObj), ErrorCodes.InvalidOptions); + rst.stopSet(); +})(); + +(() => { + const st = new ShardingTest({shards: 1}); + + jsTest.log("Verify that txnRetryCounter is supported on shardvr"); + const shard0Primary = st.rs0.getPrimary(); + const insertCmdObjShardSvr = { + insert: kCollName, + documents: [{x: 0}], + lsid: {id: UUID()}, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false, + txnRetryCounter: NumberInt(0) + }; + assert.commandWorked(shard0Primary.getDB(kDbName).runCommand(insertCmdObjShardSvr)); + assert.commandWorked(shard0Primary.adminCommand({ + abortTransaction: 1, + lsid: insertCmdObjShardSvr.lsid, + txnNumber: insertCmdObjShardSvr.txnNumber, + autocommit: false, + txnRetryCounter: NumberInt(0) + })); + + jsTest.log("Verify that txnRetryCounter is supported on configsvr"); + const configRSPrimary = st.configRS.getPrimary(); + const deleteCmdObjConfigSvr = { + delete: "chunks", + deletes: [{q: {}, limit: 1}], + lsid: {id: UUID()}, + txnNumber: NumberLong(0), + startTransaction: true, + autocommit: false, + txnRetryCounter: NumberInt(0) + }; + assert.commandWorked(configRSPrimary.getDB("config").runCommand(deleteCmdObjConfigSvr)); + assert.commandWorked(configRSPrimary.adminCommand({ + abortTransaction: 1, + lsid: deleteCmdObjConfigSvr.lsid, + txnNumber: deleteCmdObjConfigSvr.txnNumber, + autocommit: false, + })); + + jsTest.log("Test that the client cannot specify txnRetryCounter in a retryable write command"); + const mongosTestDB = st.s.getDB(kDbName); + const shard0TestDB = shard0Primary.getDB(kDbName); + const insertCmdObj = { + insert: kCollName, + documents: [{x: 0}], + lsid: {id: UUID()}, + txnNumber: NumberLong(0), + txnRetryCounter: NumberInt(0) + }; + assert.commandFailedWithCode(mongosTestDB.runCommand(insertCmdObj), ErrorCodes.InvalidOptions); + // TODO (SERVER-59343): Investigate AuthorizationSession error in internal session tests in + // auth suites. + if (!TestData.auth) { + assert.commandFailedWithCode(shard0TestDB.runCommand(insertCmdObj), + ErrorCodes.InvalidOptions); + } + + st.stop(); +})(); +})(); diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml index 66319639eee..098402198ae 100644 --- a/src/mongo/base/error_codes.yml +++ b/src/mongo/base/error_codes.yml @@ -416,7 +416,7 @@ error_codes: - {code: 336, name: TimeseriesBucketCleared, categories: [InternalOnly]} - {code: 337, name: AuthenticationAbandoned, categories: [InternalOnly]} - + - {code: 338, name: ReshardCollectionInProgress} - {code: 339, name: NoSuchReshardCollection} - {code: 340, name: ReshardCollectionCommitted} @@ -428,14 +428,14 @@ error_codes: # case where it would be possible for MongoS to retry where MongoD couldn't. - {code: 343, name: ShardCannotRefreshDueToLocksHeld, extra: ShardCannotRefreshDueToLocksHeldInfo} - + - {code: 344, name: AuditingNotEnabled} - {code: 345, name: RuntimeAuditConfigurationNotEnabled} - + - {code: 346,name: ChangeStreamInvalidated, extra: ChangeStreamInvalidationInfo} - {code: 347, name: APIMismatchError, categories: [VersionedAPIError,VoteAbortError]} - + - {code: 348,name: ChangeStreamTopologyChange, extra: ChangeStreamTopologyChangeInfo} - {code: 349, name: KeyPatternShorterThanBound} @@ -451,6 +451,8 @@ error_codes: - {code: 355, name: InterruptedDueToStorageChange,categories: [Interruption,CancellationError]} + - {code: 356, name: TxnRetryCounterTooOld, extra: TxnRetryCounterTooOldInfo} + # Error codes 4000-8999 are reserved. # Non-sequential error codes for compatibility only) diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 964edce0dda..5beccabd31f 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -63,6 +63,7 @@ env.Library( 'field_ref_set.cpp', 'field_parser.cpp', 'keypattern.cpp', + 'txn_retry_counter_too_old_info.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/db/initialize_operation_session_info.cpp b/src/mongo/db/initialize_operation_session_info.cpp index c655b9ae7f7..d50af9c090c 100644 --- a/src/mongo/db/initialize_operation_session_info.cpp +++ b/src/mongo/db/initialize_operation_session_info.cpp @@ -32,6 +32,7 @@ #include "mongo/db/initialize_operation_session_info.h" #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/internal_transactions_feature_flag_gen.h" #include "mongo/db/logical_session_cache.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" @@ -96,6 +97,10 @@ OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext* if (getParentSessionId(lsid)) { uassert(ErrorCodes::InvalidOptions, + "Internal sessions are not enabled", + feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)); + uassert(ErrorCodes::InvalidOptions, "Internal sessions are not supported outside of transactions", osi.getTxnNumber() && osi.getAutocommit() && !osi.getAutocommit().value()); } @@ -120,6 +125,21 @@ OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext* *osi.getTxnNumber() >= 0); opCtx->setTxnNumber(*osi.getTxnNumber()); + + if (auto txnRetryCounter = osi.getTxnRetryCounter()) { + // TODO (SERVER-58759): Add a uassert that the client is internal. + uassert(ErrorCodes::InvalidOptions, + "txnRetryCounter is not enabled", + feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "Cannot specify txnRetryCounter for a retryable write", + osi.getAutocommit().has_value()); + uassert(ErrorCodes::InvalidOptions, + "txnRetryCounter cannot be negative", + txnRetryCounter >= 0); + opCtx->setTxnRetryCounter(*txnRetryCounter); + } } else { uassert(ErrorCodes::InvalidOptions, "'autocommit' field requires a transaction number to also be specified", diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h index 01d74fa7cc4..3270eece38e 100644 --- a/src/mongo/db/logical_session_id.h +++ b/src/mongo/db/logical_session_id.h @@ -40,6 +40,7 @@ namespace mongo { using TxnNumber = std::int64_t; using StmtId = std::int32_t; +using TxnRetryCounter = std::int32_t; // Default value for unassigned statementId. const StmtId kUninitializedStmtId = -1; @@ -48,6 +49,7 @@ const StmtId kUninitializedStmtId = -1; const StmtId kIncompleteHistoryStmtId = -2; const TxnNumber kUninitializedTxnNumber = -1; +const TxnRetryCounter kUninitializedTxnRetryCounter = -1; class BSONObjBuilder; class OperationContext; diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl index 1f356d5f046..4fcae11e9ba 100644 --- a/src/mongo/db/logical_session_id.idl +++ b/src/mongo/db/logical_session_id.idl @@ -55,6 +55,14 @@ types: cpp_type: "std::int32_t" deserializer: "mongo::BSONElement::_numberInt" + TxnRetryCounter: + description: "A strictly-increasing per-transaction counter that starts at 0 and is incremented + every time the transaction is internally retried on a transient transaction + error." + bson_serialization_type: int + cpp_type: "std::int32_t" + deserializer: "mongo::BSONElement::_numberInt" + structs: InternalSessionFields: description: "Internal sessiond id fields" @@ -131,6 +139,10 @@ structs: operation executes." type: TxnNumber optional: true + txnRetryCounter: + description: "The transaction retry counter for this transaction." + type: TxnRetryCounter + optional: true OperationSessionInfoFromClient: description: "Parser for pulling out the sessionId/txnNumber combination from commands" @@ -145,6 +157,10 @@ structs: operation executes." type: TxnNumber optional: true + txnRetryCounter: + description: "The transaction retry counter for this transaction." + type: TxnRetryCounter + optional: true autocommit: type: bool optional: true diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 4bbe23b0a66..1cc220d3746 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -593,7 +593,11 @@ public: NamespaceString nss, TxnNumber txnNum, StmtId stmtId) { - txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx, + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); { AutoGetCollection autoColl(opCtx, nss, MODE_IX); @@ -766,7 +770,11 @@ class OpObserverTransactionTest : public OpObserverTxnParticipantTest { public: void setUp() override { OpObserverTxnParticipantTest::setUp(); - txnParticipant().beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, true); + txnParticipant().beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); } protected: @@ -1454,7 +1462,11 @@ class OpObserverRetryableFindAndModifyTest : public OpObserverTxnParticipantTest public: void setUp() override { OpObserverTxnParticipantTest::setUp(); - txnParticipant().beginOrContinue(opCtx(), txnNum(), boost::none, boost::none); + txnParticipant().beginOrContinue(opCtx(), + txnNum(), + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } void tearDown() override { @@ -1660,7 +1672,11 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { opCtx->setTxnNumber(TxnNumber(testIdx)); contextSession.emplace(opCtx); txnParticipant.emplace(TransactionParticipant::get(opCtx)); - txnParticipant->beginOrContinue(opCtx, TxnNumber(testIdx), boost::none, boost::none); + txnParticipant->beginOrContinue(opCtx, + TxnNumber(testIdx), + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } if (testCase.imageType == StoreDocOption::None && !testCase.alwaysRecordPreImages) { @@ -1787,7 +1803,11 @@ TEST_F(OpObserverTest, TestFundamentalOnInsertsOutputs) { opCtx->setTxnNumber(TxnNumber(testIdx)); contextSession.emplace(opCtx); txnParticipant.emplace(TransactionParticipant::get(opCtx)); - txnParticipant->beginOrContinue(opCtx, TxnNumber(testIdx), boost::none, boost::none); + txnParticipant->beginOrContinue(opCtx, + TxnNumber(testIdx), + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } // Phase 2: Call the code we're testing. @@ -1893,7 +1913,11 @@ TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) { opCtx->setTxnNumber(TxnNumber(testIdx)); contextSession.emplace(opCtx); txnParticipant.emplace(TransactionParticipant::get(opCtx)); - txnParticipant->beginOrContinue(opCtx, TxnNumber(testIdx), boost::none, boost::none); + txnParticipant->beginOrContinue(opCtx, + TxnNumber(testIdx), + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } OpObserver::OplogDeleteEntryArgs deleteArgs; switch (testCase.retryableOptions) { diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index cb97d6a197a..0b10331e968 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -406,6 +406,13 @@ void OperationContext::setTxnNumber(TxnNumber txnNumber) { _txnNumber = txnNumber; } +void OperationContext::setTxnRetryCounter(TxnRetryCounter txnRetryCounter) { + invariant(_lsid); + invariant(_txnNumber); + invariant(!_txnRetryCounter.has_value()); + _txnRetryCounter = txnRetryCounter; +} + std::unique_ptr<RecoveryUnit> OperationContext::releaseRecoveryUnit() { return std::move(_recoveryUnit); } diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 010ab14756c..36c2c49a83d 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -235,6 +235,13 @@ public: } /** + * Returns the txnRetryCounter associated with this operation. + */ + boost::optional<TxnRetryCounter> getTxnRetryCounter() const { + return _txnRetryCounter; + } + + /** * Returns a CancellationToken that will be canceled when the OperationContext is killed via * markKilled (including for internal reasons, like the OperationContext deadline being * reached). @@ -264,6 +271,13 @@ public: void setTxnNumber(TxnNumber txnNumber); /** + * Associates a txnRetryCounter with this operation context. May only be called once for the + * lifetime of the operation and the operation must have a logical session id and a transaction + * number assigned. + */ + void setTxnRetryCounter(TxnRetryCounter txnRetryCounter); + + /** * Returns the top-level WriteUnitOfWork associated with this operation context, if any. */ WriteUnitOfWork* getWriteUnitOfWork() { @@ -443,6 +457,9 @@ public: */ void setInMultiDocumentTransaction() { _inMultiDocumentTransaction = true; + if (!_txnRetryCounter.has_value()) { + _txnRetryCounter = 0; + } } /** @@ -495,6 +512,7 @@ public: _isStartingMultiDocumentTransaction = false; _lsid = boost::none; _txnNumber = boost::none; + _txnRetryCounter = boost::none; } /** @@ -654,6 +672,7 @@ private: boost::optional<LogicalSessionId> _lsid; boost::optional<TxnNumber> _txnNumber; + boost::optional<TxnRetryCounter> _txnRetryCounter; std::unique_ptr<Locker> _locker; diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp index 267bbc2ab5c..e996689c44b 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp +++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp @@ -958,6 +958,8 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr opCtx->setLogicalSessionId(sessionId); opCtx->setTxnNumber(txnNumber); opCtx->setInMultiDocumentTransaction(); + auto txnRetryCounter = *opCtx->getTxnRetryCounter(); + invariant(txnRetryCounter == 0); MongoDOperationContextSession ocs(opCtx); LOGV2_DEBUG(5351301, @@ -994,7 +996,7 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr return; } - txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, txnNumber); + txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, txnNumber, txnRetryCounter); MutableOplogEntry noopEntry; noopEntry.setOpType(repl::OpTypeEnum::kNoop); diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 7ca766859e9..a4a5ab525c3 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -579,6 +579,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( opCtx->setLogicalSessionId(sessionId); opCtx->setTxnNumber(txnNumber); opCtx->setInMultiDocumentTransaction(); + auto txnRetryCounter = *opCtx->getTxnRetryCounter(); + invariant(txnRetryCounter == 0); LOGV2_DEBUG(5351502, 1, "Tenant Oplog Applier committing transaction", @@ -605,7 +607,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( << " because the transaction number " << txnParticipant.getActiveTxnNumber() << " has already started", txnParticipant.getActiveTxnNumber() < txnNumber); - txnParticipant.beginOrContinueTransactionUnconditionally(opCtx.get(), txnNumber); + txnParticipant.beginOrContinueTransactionUnconditionally( + opCtx.get(), txnNumber, txnRetryCounter); // Only set sessionId and txnNumber for the final applyOp in a transaction. noopEntry.setSessionId(sessionId); @@ -735,7 +738,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( txnParticipant.beginOrContinue(opCtx.get(), txnNumber, boost::none /* autocommit */, - boost::none /* startTransaction */); + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); // We could have an existing lastWriteOpTime for the same retryable write chain from a // previously aborted migration. This could also happen if the tenant being migrated has @@ -766,7 +770,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( txnParticipant.beginOrContinue(opCtx.get(), txnNumber, boost::none /* autocommit */, - boost::none /* startTransaction */); + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } // We should never process the same donor statement twice, except in failover diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp index 1451fd2eced..5f81beb94fa 100644 --- a/src/mongo/db/s/migration_destination_manager.cpp +++ b/src/mongo/db/s/migration_destination_manager.cpp @@ -108,7 +108,8 @@ void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) { TransactionParticipant::get(opCtx).beginOrContinue(opCtx, *opCtx->getTxnNumber(), boost::none /* autocommit */, - boost::none /* startTransaction */); + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } template <typename Callable> @@ -921,7 +922,8 @@ void MigrationDestinationManager::_migrateThread() { txnParticipant.beginOrContinue(opCtx, *opCtx->getTxnNumber(), boost::none /* autocommit */, - boost::none /* startTransaction */); + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); _migrateDriver(opCtx); } catch (...) { _setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus())); diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp index 3461c15b815..745e83a40f9 100644 --- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp +++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp @@ -249,7 +249,11 @@ boost::optional<SharedSemiFuture<void>> withSessionCheckedOut(OperationContext* auto txnParticipant = TransactionParticipant::get(opCtx); try { - txnParticipant.beginOrContinue(opCtx, txnNumber, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx, + txnNumber, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); if (stmtId && txnParticipant.checkStatementExecuted(opCtx, *stmtId)) { // Skip the incoming statement because it has already been logged locally. diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index 104e79cdd37..5a71b14a7da 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -86,7 +86,11 @@ void runWithTransaction(OperationContext* opCtx, unique_function<void(OperationC } }); - txnParticipant.beginOrContinue(asr.opCtx(), txnNumber, false, true); + txnParticipant.beginOrContinue(asr.opCtx(), + txnNumber, + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(asr.opCtx(), "reshardingOplogApplication"); func(asr.opCtx()); diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp index 9691a947954..f89da1f5b2d 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp @@ -177,8 +177,11 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue( - opCtx, txnNumber, false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(opCtx, + txnNumber, + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(opCtx, "prepareTransaction"); @@ -198,8 +201,11 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue( - opCtx, txnNumber, false /* autocommit */, boost::none /* startTransaction */); + txnParticipant.beginOrContinue(opCtx, + txnNumber, + false /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(opCtx, "abortTransaction"); txnParticipant.abortTransaction(opCtx); diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp index 61a519c7493..29c6fd3c26b 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp @@ -79,7 +79,11 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, txnNumber, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx, + txnNumber, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); WriteUnitOfWork wuow(opCtx); auto opTime = repl::getNextOpTime(opCtx); @@ -102,8 +106,11 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue( - opCtx, txnNumber, false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(opCtx, + txnNumber, + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(opCtx, "prepareTransaction"); @@ -123,8 +130,11 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue( - opCtx, txnNumber, false /* autocommit */, boost::none /* startTransaction */); + txnParticipant.beginOrContinue(opCtx, + txnNumber, + false /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(opCtx, "abortTransaction"); txnParticipant.abortTransaction(opCtx); @@ -255,7 +265,11 @@ public: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, txnNumber, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx, + txnNumber, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT_TRUE(bool(txnParticipant.checkStatementExecuted(opCtx, stmtId))); } }; diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp index e302f943693..ba1efd5a017 100644 --- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp +++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp @@ -243,9 +243,17 @@ protected: auto txnParticipant = TransactionParticipant::get(opCtx); ASSERT(txnParticipant); if (multiDocTxn) { - txnParticipant.beginOrContinue(opCtx, txnNum, false, true); + txnParticipant.beginOrContinue(opCtx, + txnNum, + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); } else { - txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx, + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } } @@ -389,8 +397,11 @@ protected: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue( - opCtx, txnNumber, false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(opCtx, + txnNumber, + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(opCtx, "prepareTransaction"); @@ -410,8 +421,11 @@ protected: MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue( - opCtx, txnNumber, false /* autocommit */, boost::none /* startTransaction */); + txnParticipant.beginOrContinue(opCtx, + txnNumber, + false /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(opCtx, "abortTransaction"); txnParticipant.abortTransaction(opCtx); diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp index 98d534ca694..6592bb4f35d 100644 --- a/src/mongo/db/s/resharding_destined_recipient_test.cpp +++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp @@ -70,7 +70,11 @@ void runInTransaction(OperationContext* opCtx, Callable&& func) { auto txnParticipant = TransactionParticipant::get(opCtx); ASSERT(txnParticipant); - txnParticipant.beginOrContinue(opCtx, txnNum, false, true); + txnParticipant.beginOrContinue(opCtx, + txnNum, + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(opCtx, "SetDestinedRecipient"); func(); diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 80226a9c463..8350f974581 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -247,7 +247,11 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON, auto txnParticipant = TransactionParticipant::get(opCtx); try { - txnParticipant.beginOrContinue(opCtx, result.txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx, + result.txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); if (txnParticipant.checkStatementExecuted(opCtx, stmtIds.front())) { // Skip the incoming statement because it has already been logged locally return lastResult; diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 7d770e47cff..446fedd435a 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -183,7 +183,11 @@ public: opCtx->setTxnNumber(txnNum); MongoDOperationContextSession ocs(opCtx); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx, + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } void checkOplog(const repl::OplogEntry& originalOplog, const repl::OplogEntry& oplogToCheck) { @@ -250,8 +254,11 @@ public: initializeOperationSessionInfo(innerOpCtx.get(), insertBuilder.obj(), true, true, true); MongoDOperationContextSession sessionTxnState(innerOpCtx.get()); auto txnParticipant = TransactionParticipant::get(innerOpCtx.get()); - txnParticipant.beginOrContinue( - innerOpCtx.get(), *sessionInfo.getTxnNumber(), boost::none, boost::none); + txnParticipant.beginOrContinue(innerOpCtx.get(), + *sessionInfo.getTxnNumber(), + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); const auto reply = write_ops_exec::performInserts(innerOpCtx.get(), insertRequest); ASSERT(reply.results.size() == 1); @@ -1879,7 +1886,11 @@ TEST_F(SessionCatalogMigrationDestinationTest, auto txnParticipant = TransactionParticipant::get(opCtx); txnParticipant.refreshFromStorageIfNeeded(opCtx); - txnParticipant.beginOrContinue(opCtx, 3, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx, + 3, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } OperationSessionInfo sessionInfo2; diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index b228c301d95..3fce3dda429 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -314,7 +314,8 @@ public: txnParticipant.beginOrContinue(opCtx, *opCtx->getTxnNumber(), false /* autocommit */, - boost::none /* startTransaction */); + boost::none /* startTransaction */, + *opCtx->getTxnRetryCounter()); if (txnParticipant.transactionIsCommitted()) return; @@ -336,7 +337,8 @@ public: txnParticipant.beginOrContinue(opCtx, *opCtx->getTxnNumber(), false /* autocommit */, - boost::none /* startTransaction */); + boost::none /* startTransaction */, + *opCtx->getTxnRetryCounter()); invariant(!txnParticipant.transactionIsOpen(), "The participant should not be in progress after we waited for the " diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 5e2bd446b35..64cf8a9ff1b 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -862,7 +862,8 @@ void CheckoutSessionAndInvokeCommand::_checkOutSession() { _txnParticipant->beginOrContinue(opCtx, *sessionOptions.getTxnNumber(), sessionOptions.getAutocommit(), - sessionOptions.getStartTransaction()); + sessionOptions.getStartTransaction(), + sessionOptions.getTxnRetryCounter()); beganOrContinuedTxn = true; } catch (const ExceptionFor<ErrorCodes::PreparedTransactionInProgress>&) { auto prepareCompleted = _txnParticipant->onExitPrepare(); diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 075e5bda2cc..6e1878dc582 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -482,9 +482,11 @@ MongoDOperationContextSessionWithoutRefresh::MongoDOperationContextSessionWithou : _operationContextSession(opCtx), _opCtx(opCtx) { invariant(!opCtx->getClient()->isInDirectClient()); const auto clientTxnNumber = *opCtx->getTxnNumber(); + const auto clientTxnRetryCounter = *opCtx->getTxnRetryCounter(); auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, clientTxnNumber); + txnParticipant.beginOrContinueTransactionUnconditionally( + opCtx, clientTxnNumber, clientTxnRetryCounter); } MongoDOperationContextSessionWithoutRefresh::~MongoDOperationContextSessionWithoutRefresh() { diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index a9cfe387ad9..c588ec4d22a 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -64,6 +64,7 @@ #include "mongo/db/storage/flow_control.h" #include "mongo/db/transaction_history_iterator.h" #include "mongo/db/transaction_participant_gen.h" +#include "mongo/db/txn_retry_counter_too_old_info.h" #include "mongo/db/vector_clock_mutable.h" #include "mongo/logv2/log.h" #include "mongo/util/fail_point.h" @@ -423,11 +424,11 @@ const LogicalSessionId& TransactionParticipant::Observer::_sessionId() const { return owningSession->getSessionId(); } -void TransactionParticipant::Participant::_beginOrContinueRetryableWrite(OperationContext* opCtx, - TxnNumber txnNumber) { +void TransactionParticipant::Participant::_beginOrContinueRetryableWrite( + OperationContext* opCtx, const TxnNumber& txnNumber) { if (txnNumber > o().activeTxnNumber) { // New retryable write. - _setNewTxnNumber(opCtx, txnNumber); + _setNewTxnNumber(opCtx, txnNumber, kUninitializedTxnRetryCounter); p().autoCommit = boost::none; } else { // Retrying a retryable write. @@ -438,8 +439,8 @@ void TransactionParticipant::Participant::_beginOrContinueRetryableWrite(Operati } } -void TransactionParticipant::Participant::_continueMultiDocumentTransaction(OperationContext* opCtx, - TxnNumber txnNumber) { +void TransactionParticipant::Participant::_continueMultiDocumentTransaction( + OperationContext* opCtx, const TxnNumber& txnNumber, const TxnRetryCounter& txnRetryCounter) { uassert(ErrorCodes::NoSuchTransaction, str::stream() << "Given transaction number " << txnNumber @@ -447,6 +448,19 @@ void TransactionParticipant::Participant::_continueMultiDocumentTransaction(Oper << o().activeTxnNumber, txnNumber == o().activeTxnNumber && !o().txnState.isInRetryableWriteMode()); + uassert(TxnRetryCounterTooOldInfo(o().activeTxnRetryCounter), + str::stream() << "Cannot continue transaction " << txnNumber << " on session " + << _sessionId() << " using txnRetryCounter " << txnRetryCounter + << " because it has already been restarted using a higher" + << " txnRetryCounter " << o().activeTxnRetryCounter, + txnRetryCounter >= o().activeTxnRetryCounter); + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot continue transaction " << txnNumber << " on session " + << _sessionId() << " using txnRetryCounter " << txnRetryCounter + << " because it is currently in state " << o().txnState + << " with txnRetryCounter " << o().activeTxnRetryCounter, + txnRetryCounter == o().activeTxnRetryCounter); + if (o().txnState.isInProgress() && !o().txnResourceStash) { // This indicates that the first command in the transaction failed but did not implicitly // abort the transaction. It is not safe to continue the transaction, in particular because @@ -467,13 +481,58 @@ void TransactionParticipant::Participant::_continueMultiDocumentTransaction(Oper << "Transaction " << txnNumber << " has been aborted because an earlier command in this transaction failed."); } - return; } -void TransactionParticipant::Participant::_beginMultiDocumentTransaction(OperationContext* opCtx, - TxnNumber txnNumber) { +void TransactionParticipant::Participant::_beginMultiDocumentTransaction( + OperationContext* opCtx, const TxnNumber& txnNumber, const TxnRetryCounter& txnRetryCounter) { + if (txnNumber == o().activeTxnNumber) { + if (txnRetryCounter < o().activeTxnRetryCounter) { + uasserted(TxnRetryCounterTooOldInfo(o().activeTxnRetryCounter), + str::stream() + << "Cannot start a transaction at given transaction number " << txnNumber + << " on session " << _sessionId() << " using txnRetryCounter " + << txnRetryCounter << " because it has already been restarted using a " + << "higher txnRetryCounter " << o().activeTxnRetryCounter); + } else if (txnRetryCounter == o().activeTxnRetryCounter || + o().activeTxnRetryCounter == kUninitializedTxnRetryCounter) { + // Servers in a sharded cluster can start a new transaction at the active transaction + // number to allow internal retries by routers on re-targeting errors, like + // StaleShard/DatabaseVersion or SnapshotTooOld. + uassert(ErrorCodes::ConflictingOperationInProgress, + "Only servers in a sharded cluster can start a new transaction at the active " + "transaction number", + serverGlobalParams.clusterRole != ClusterRole::None); + + // The active transaction number can only be reused if: + // 1. The transaction participant is in retryable write mode and has not yet executed a + // retryable write, or + // 2. A transaction is aborted and has not been involved in a two phase commit. + // + // Assuming routers target primaries in increasing order of term and in the absence of + // byzantine messages, this check should never fail. + const auto restartableStates = + TransactionState::kNone | TransactionState::kAbortedWithoutPrepare; + uassert(50911, + str::stream() << "Cannot start a transaction at given transaction number " + << txnNumber << " a transaction with the same number is in state " + << o().txnState, + o().txnState.isInSet(restartableStates)); + } else { + const auto restartableStates = TransactionState::kNone | TransactionState::kInProgress | + TransactionState::kAbortedWithoutPrepare | TransactionState::kAbortedWithPrepare; + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot restart transaction " << txnNumber + << " using txnRetryCounter " << txnRetryCounter + << " because it is already in state " << o().txnState + << " with txnRetryCounter " << o().activeTxnRetryCounter, + o().txnState.isInSet(restartableStates)); + } + } else { + invariant(txnNumber > o().activeTxnNumber); + } + // Aborts any in-progress txns. - _setNewTxnNumber(opCtx, txnNumber); + _setNewTxnNumber(opCtx, txnNumber, txnRetryCounter); p().autoCommit = false; stdx::lock_guard<Client> lk(*opCtx->getClient()); @@ -498,10 +557,12 @@ void TransactionParticipant::Participant::_beginMultiDocumentTransaction(Operati invariant(p().transactionOperations.empty()); } -void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCtx, - TxnNumber txnNumber, - boost::optional<bool> autocommit, - boost::optional<bool> startTransaction) { +void TransactionParticipant::Participant::beginOrContinue( + OperationContext* opCtx, + TxnNumber txnNumber, + boost::optional<bool> autocommit, + boost::optional<bool> startTransaction, + boost::optional<TxnRetryCounter> txnRetryCounter) { // Make sure we are still a primary. We need to hold on to the RSTL through the end of this // method, as we otherwise risk stepping down in the interim and incorrectly updating the // transaction number, which can abort active transactions. @@ -546,6 +607,8 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt // startTransaction, which is verified earlier when parsing the request. if (!autocommit) { invariant(!startTransaction); + invariant(!txnRetryCounter.has_value(), + "Cannot specify a txnRetryCounter for retryable write"); _beginOrContinueRetryableWrite(opCtx, txnNumber); return; } @@ -555,9 +618,17 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt // is verified earlier when parsing the request. invariant(*autocommit == false); invariant(opCtx->inMultiDocumentTransaction()); + if (txnRetryCounter.has_value()) { + uassert(ErrorCodes::InvalidOptions, + "txnRetryCounter is only supported in sharded clusters", + serverGlobalParams.clusterRole != ClusterRole::None); + invariant(*txnRetryCounter >= 0, "Cannot specify a negative txnRetryCounter"); + } else { + txnRetryCounter = 0; + } if (!startTransaction) { - _continueMultiDocumentTransaction(opCtx, txnNumber); + _continueMultiDocumentTransaction(opCtx, txnNumber, *txnRetryCounter); return; } @@ -565,37 +636,11 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt // an argument on the request. The 'startTransaction' argument currently can only be specified // as true, which is verified earlier, when parsing the request. invariant(*startTransaction); - - if (txnNumber == o().activeTxnNumber) { - // Servers in a sharded cluster can start a new transaction at the active transaction number - // to allow internal retries by routers on re-targeting errors, like - // StaleShard/DatabaseVersion or SnapshotTooOld. - uassert(ErrorCodes::ConflictingOperationInProgress, - "Only servers in a sharded cluster can start a new transaction at the active " - "transaction number", - serverGlobalParams.clusterRole != ClusterRole::None); - - // The active transaction number can only be reused if: - // 1. The transaction participant is in retryable write mode and has not yet executed a - // retryable write, or - // 2. A transaction is aborted and has not been involved in a two phase commit. - // - // Assuming routers target primaries in increasing order of term and in the absence of - // byzantine messages, this check should never fail. - const auto restartableStates = - TransactionState::kNone | TransactionState::kAbortedWithoutPrepare; - uassert(50911, - str::stream() << "Cannot start a transaction at given transaction number " - << txnNumber << " a transaction with the same number is in state " - << o().txnState, - o().txnState.isInSet(restartableStates)); - } - - _beginMultiDocumentTransaction(opCtx, txnNumber); + _beginMultiDocumentTransaction(opCtx, txnNumber, *txnRetryCounter); } void TransactionParticipant::Participant::beginOrContinueTransactionUnconditionally( - OperationContext* opCtx, TxnNumber txnNumber) { + OperationContext* opCtx, TxnNumber txnNumber, TxnRetryCounter txnRetryCounter) { invariant(opCtx->inMultiDocumentTransaction()); // We don't check or fetch any on-disk state, so treat the transaction as 'valid' for the @@ -603,7 +648,7 @@ void TransactionParticipant::Participant::beginOrContinueTransactionUnconditiona p().isValid = true; if (o().activeTxnNumber != txnNumber) { - _beginMultiDocumentTransaction(opCtx, txnNumber); + _beginMultiDocumentTransaction(opCtx, txnNumber, txnRetryCounter); } else { invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared), str::stream() << "Current state: " << o().txnState); @@ -2173,7 +2218,8 @@ void TransactionParticipant::Participant::_logSlowTransaction( } void TransactionParticipant::Participant::_setNewTxnNumber(OperationContext* opCtx, - const TxnNumber& txnNumber) { + const TxnNumber& txnNumber, + const TxnRetryCounter& txnRetryCounter) { uassert(ErrorCodes::PreparedTransactionInProgress, "Cannot change transaction number while the session has a prepared transaction", !o().txnState.isInSet(TransactionState::kPrepared)); @@ -2195,6 +2241,7 @@ void TransactionParticipant::Participant::_setNewTxnNumber(OperationContext* opC stdx::lock_guard<Client> lk(*opCtx->getClient()); o(lk).activeTxnNumber = txnNumber; + o(lk).activeTxnRetryCounter = txnRetryCounter; o(lk).lastWriteOpTime = repl::OpTime(); // Reset the retryable writes state @@ -2229,6 +2276,7 @@ void TransactionParticipant::Participant::_refreshFromStorageIfNeeded(OperationC if (lastTxnRecord) { stdx::lock_guard<Client> lg(*opCtx->getClient()); o(lg).activeTxnNumber = lastTxnRecord->getTxnNum(); + o(lg).activeTxnRetryCounter = lastTxnRecord->getState() ? 0 : kUninitializedTxnRetryCounter; o(lg).lastWriteOpTime = lastTxnRecord->getLastWriteOpTime(); p().activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements); p().hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory; diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index de02c0e3e9c..2ab95a71d5c 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -275,6 +275,14 @@ public: } /** + * Returns the last used transaction retry counter for the currently active transaction on + * this participant. + */ + TxnRetryCounter getActiveTxnRetryCounter() const { + return o().activeTxnRetryCounter; + } + + /** * Returns the op time of the last committed write for this session and transaction. If no * write has completed yet, returns an empty timestamp. */ @@ -401,13 +409,20 @@ public: * * 'startTransaction' comes from the 'startTransaction' field in the original client * request. See below for the acceptable values and the meaning of the combinations of - * autocommit and startTransaction. + * autocommit, startTransaction and txnRetryCounter. * - * autocommit = boost::none, startTransaction = boost::none: Means retryable write - * autocommit = false, startTransaction = boost::none: Means continuation of a - * multi-statement transaction - * autocommit = false, startTransaction = true: Means abort whatever transaction is in - * progress on the session and start a new transaction + * autocommit = boost::none, startTransaction = boost::none and txnRetryCounter = + * boost::none means retryable write. + * + * autocommit = false, startTransaction = boost::none and txnRetryCounter = last seen + * txnRetryCounter means continuation of a multi-statement transaction. + * + * autocommit = false, startTransaction = true, txnNumber = active txnNumber and + * txnRetryCounter > last seen txnRetryCounter (defaults to 0) means restart the existing + * transaction as long as it has not been committed or prepared. + * + * autocommit = false, startTransaction = true, txnNumber > active txnNumber means abort + * whatever transaction is in progress on the session and starts a new transaction. * * Any combination other than the ones listed above will invariant since it is expected that * the caller has performed the necessary customer input validations. @@ -415,6 +430,11 @@ public: * Exceptions of note, which can be thrown are: * - TransactionTooOld - if an attempt is made to start a transaction older than the * currently active one or the last one which committed + * - TxnRetryCounterTooOld - if an attempt is made to start or continue a transaction with + * a txnRetryCounter less than the last seen one. + * - IllegalOperation - if an attempt is made to use a txnRetryCounter greater than the + * last seen one to continue a transaction, or to restart a transaction that has already + * been committed or prepared. * - PreparedTransactionInProgress - if the transaction is in the prepared state and a new * transaction or retryable write is attempted * - NotWritablePrimary - if the node is not a primary when this method is called. @@ -426,7 +446,8 @@ public: void beginOrContinue(OperationContext* opCtx, TxnNumber txnNumber, boost::optional<bool> autocommit, - boost::optional<bool> startTransaction); + boost::optional<bool> startTransaction, + boost::optional<TxnRetryCounter> txnRetryCounter); /** * Used only by the secondary oplog application logic. Similar to 'beginOrContinue' without @@ -434,7 +455,8 @@ public: * the past. */ void beginOrContinueTransactionUnconditionally(OperationContext* opCtx, - TxnNumber txnNumber); + TxnNumber txnNumber, + TxnRetryCounter txnRetryCounter); /** * If the participant is in prepare, returns a future whose promise is fulfilled when @@ -775,17 +797,24 @@ public: repl::ReadConcernArgs readConcernArgs) const; // Bumps up the transaction number of this transaction and perform the necessary cleanup. - void _setNewTxnNumber(OperationContext* opCtx, const TxnNumber& txnNumber); + void _setNewTxnNumber(OperationContext* opCtx, + const TxnNumber& txnNumber, + const TxnRetryCounter& txnRetryCounter); // Attempt to begin or retry a retryable write at the given transaction number. - void _beginOrContinueRetryableWrite(OperationContext* opCtx, TxnNumber txnNumber); + void _beginOrContinueRetryableWrite(OperationContext* opCtx, const TxnNumber& txnNumber); - // Attempt to begin a new multi document transaction at the given transaction number. - void _beginMultiDocumentTransaction(OperationContext* opCtx, TxnNumber txnNumber); + // Attempt to begin a new multi document transaction at the given transaction number and + // transaction retry counter. + void _beginMultiDocumentTransaction(OperationContext* opCtx, + const TxnNumber& txnNumber, + const TxnRetryCounter& txnRetryCounter); // Attempt to continue an in-progress multi document transaction at the given transaction - // number. - void _continueMultiDocumentTransaction(OperationContext* opCtx, TxnNumber txnNumber); + // number and transaction retry counter. + void _continueMultiDocumentTransaction(OperationContext* opCtx, + const TxnNumber& txnNumber, + const TxnRetryCounter& txnRetryCounter); // Implementation of public refreshFromStorageIfNeeded methods. void _refreshFromStorageIfNeeded(OperationContext* opCtx, bool fetchOplogEntries); @@ -934,6 +963,11 @@ private: // performed any writes. TxnNumber activeTxnNumber{kUninitializedTxnNumber}; + // Tracks the last seen txnRetryCounter for the the current transaction. Should always be + // kUninitializedTxnRetryCounter for a retryable write, and non-negative for a + // multi-statement transaction. + TxnRetryCounter activeTxnRetryCounter{kUninitializedTxnRetryCounter}; + // Caches what is known to be the last optime written for the active transaction. repl::OpTime lastWriteOpTime; diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index c2582f6ba60..915a6c5a697 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -229,7 +229,11 @@ protected: boost::optional<DurableTxnStateEnum> txnState) { const auto session = OperationContextSession::get(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); const auto uuid = UUID::gen(); @@ -291,7 +295,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) txnParticipant.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 20; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); DBDirectClient client(opCtx()); @@ -307,7 +315,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 21; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); const auto opTime = writeTxnRecord(txnNum, {0}, {}, boost::none); @@ -392,12 +404,19 @@ TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) { txnParticipant.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 20; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); - - ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(opCtx(), txnNum - 1, boost::none, boost::none), - AssertionException, - ErrorCodes::TransactionTooOld); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); + + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + txnNum - 1, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */), + AssertionException, + ErrorCodes::TransactionTooOld); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); } @@ -411,11 +430,18 @@ TEST_F(TransactionParticipantRetryableWritesTest, StringBuilder sb; sb << "Retryable write with txnNumber 21 is prohibited on session " << sessionId << " because a newer retryable write with txnNumber 22 has already started on this session."; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); - ASSERT_THROWS_WHAT( - txnParticipant.beginOrContinue(opCtx(), txnNum - 1, boost::none, boost::none), - AssertionException, - sb.str()); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); + ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), + txnNum - 1, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */), + AssertionException, + sb.str()); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); } @@ -430,8 +456,16 @@ TEST_F(TransactionParticipantRetryableWritesTest, StringBuilder sb; sb << "Cannot start transaction 21 on session " << sessionId << " because a newer retryable write with txnNumber 22 has already started on this session."; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); - ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), txnNum - 1, autocommit, boost::none), + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); + ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), + txnNum - 1, + autocommit, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */), AssertionException, sb.str()); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); @@ -450,7 +484,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN ASSERT(client.runCommand(nss.db().toString(), BSON("drop" << nss.coll()), dropResult)); const TxnNumber txnNum = 21; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); @@ -471,7 +509,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) { txnParticipant.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT(!txnParticipant.checkStatementExecuted(opCtx(), 1000)); ASSERT(!txnParticipant.checkStatementExecutedNoOplogEntryFetch(1000)); @@ -513,7 +555,11 @@ DEATH_TEST_REGEX_F( const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 100; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); const auto uuid = UUID::gen(); @@ -554,7 +600,11 @@ DEATH_TEST_REGEX_F( const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 100; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); const auto uuid = UUID::gen(); @@ -594,7 +644,11 @@ DEATH_TEST_REGEX_F( txnParticipant.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); @@ -765,7 +819,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.refreshFromStorageIfNeeded(opCtx()); - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); repl::MutableOplogEntry oplogEntry; oplogEntry.setSessionId(sessionId); @@ -871,7 +929,11 @@ TEST_F(ShardTxnParticipantRetryableWritesTest, opCtx()->setTxnNumber(txnNum); const auto uuid = UUID::gen(); - txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); @@ -892,7 +954,8 @@ TEST_F(ShardTxnParticipantRetryableWritesTest, opCtx()->setInMultiDocumentTransaction(); ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(opCtx(), txnNum, autocommit, startTransaction), + txnParticipant.beginOrContinue( + opCtx(), txnNum, autocommit, startTransaction, boost::none /* txnRetryCounter */), AssertionException, 50911); @@ -901,7 +964,8 @@ TEST_F(ShardTxnParticipantRetryableWritesTest, txnParticipant.refreshFromStorageIfNeeded(opCtx()); ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(opCtx(), txnNum, autocommit, startTransaction), + txnParticipant.beginOrContinue( + opCtx(), txnNum, autocommit, startTransaction, boost::none /* txnRetryCounter */), AssertionException, 50911); } diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 288872c78ee..ea5f53ca6bb 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -51,6 +51,7 @@ #include "mongo/db/stats/fill_locker_info.h" #include "mongo/db/transaction_participant.h" #include "mongo/db/transaction_participant_gen.h" +#include "mongo/db/txn_retry_counter_too_old_info.h" #include "mongo/stdx/future.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" @@ -312,7 +313,11 @@ protected: opCtx()->setInMultiDocumentTransaction(); auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, startNewTxn); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + startNewTxn /* startTransaction */, + boost::none /* txnRetryCounter */); return opCtxSession; } @@ -385,7 +390,11 @@ TEST_F(TxnParticipantTest, TransactionThrowsLockTimeoutIfLockIsUnavailable) { MongoDOperationContextSession newOpCtxSession(newOpCtx.get()); auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get()); - newTxnParticipant.beginOrContinue(newOpCtx.get(), newTxnNum, false, true); + newTxnParticipant.beginOrContinue(newOpCtx.get(), + newTxnNum, + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); newTxnParticipant.unstashTransactionResources(newOpCtx.get(), "insert"); Date_t t1 = Date_t::now(); @@ -455,10 +464,13 @@ TEST_F(TxnParticipantTest, CannotSpecifyStartTransactionOnInProgressTxn) { ASSERT_TRUE(txnParticipant.transactionIsOpen()); // Cannot try to start a transaction that already started. - ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, true), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } TEST_F(TxnParticipantTest, AutocommitRequiredOnEveryTxnOp) { @@ -473,12 +485,20 @@ TEST_F(TxnParticipantTest, AutocommitRequiredOnEveryTxnOp) { auto txnNum = *opCtx()->getTxnNumber(); // Omitting 'autocommit' after the first statement of a transaction should throw an error. - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + txnNum, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */), AssertionException, ErrorCodes::IncompleteTransactionHistory); // Including autocommit=false should succeed. - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, boost::none); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } DEATH_TEST_F(TxnParticipantTest, AutocommitCannotBeTrue, "invariant") { @@ -486,7 +506,11 @@ DEATH_TEST_F(TxnParticipantTest, AutocommitCannotBeTrue, "invariant") { auto txnParticipant = TransactionParticipant::get(opCtx()); // Passing 'autocommit=true' is not allowed and should crash. - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), true, boost::none); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + true /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } DEATH_TEST_F(TxnParticipantTest, StartTransactionCannotBeFalse, "invariant") { @@ -494,7 +518,11 @@ DEATH_TEST_F(TxnParticipantTest, StartTransactionCannotBeFalse, "invariant") { auto txnParticipant = TransactionParticipant::get(opCtx()); // Passing 'startTransaction=false' is not allowed and should crash. - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, false); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + false /* startTransaction */, + boost::none /* txnRetryCounter */); } TEST_F(TxnParticipantTest, SameTransactionPreservesStoredStatements) { @@ -517,7 +545,11 @@ TEST_F(TxnParticipantTest, SameTransactionPreservesStoredStatements) { txnParticipant.getTransactionOperationsForTest()[0].toBSON()); // Re-opening the same transaction should have no effect. - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, boost::none); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT_BSONOBJ_EQ(operation.toBSON(), txnParticipant.getTransactionOperationsForTest()[0].toBSON()); } @@ -797,8 +829,11 @@ TEST_F(TxnParticipantTest, KillOpBeforeCommittingPreparedTransaction) { // Check out the session and continue the transaction. auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx); auto newTxnParticipant = TransactionParticipant::get(opCtx); - newTxnParticipant.beginOrContinue( - opCtx, *(opCtx->getTxnNumber()), false, boost::none /*startNewTxn*/); + newTxnParticipant.beginOrContinue(opCtx, + *(opCtx->getTxnNumber()), + false /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); newTxnParticipant.unstashTransactionResources(opCtx, "commitTransaction"); newTxnParticipant.commitPreparedTransaction(opCtx, prepareTimestamp, boost::none); @@ -839,8 +874,11 @@ TEST_F(TxnParticipantTest, KillOpBeforeAbortingPreparedTransaction) { // Check out the session and continue the transaction. auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx); auto newTxnParticipant = TransactionParticipant::get(opCtx); - newTxnParticipant.beginOrContinue( - opCtx, *(opCtx->getTxnNumber()), false, boost::none /*startNewTxn*/); + newTxnParticipant.beginOrContinue(opCtx, + *(opCtx->getTxnNumber()), + false /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); newTxnParticipant.unstashTransactionResources(opCtx, "commitTransaction"); newTxnParticipant.commitPreparedTransaction(opCtx, prepareTimestamp, boost::none); @@ -1144,10 +1182,13 @@ TEST_F(TxnParticipantTest, ContinuingATransactionWithNoResourcesAborts) { MongoDOperationContextSession sessionCheckout(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, boost::none), - AssertionException, - ErrorCodes::NoSuchTransaction); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */), + AssertionException, + ErrorCodes::NoSuchTransaction); } TEST_F(TxnParticipantTest, CannotStartNewTransactionIfNotPrimary) { @@ -1158,10 +1199,13 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionIfNotPrimary) { auto txnParticipant = TransactionParticipant::get(opCtx()); // Include 'autocommit=false' for transactions. - ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, true), - AssertionException, - ErrorCodes::NotWritablePrimary); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */), + AssertionException, + ErrorCodes::NotWritablePrimary); } TEST_F(TxnParticipantTest, CannotStartRetryableWriteIfNotPrimary) { @@ -1172,10 +1216,13 @@ TEST_F(TxnParticipantTest, CannotStartRetryableWriteIfNotPrimary) { auto txnParticipant = TransactionParticipant::get(opCtx()); // Omit the 'autocommit' field for retryable writes. - ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, true), - AssertionException, - ErrorCodes::NotWritablePrimary); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + boost::none /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */), + AssertionException, + ErrorCodes::NotWritablePrimary); } TEST_F(TxnParticipantTest, CannotContinueTransactionIfNotPrimary) { @@ -1189,10 +1236,13 @@ TEST_F(TxnParticipantTest, CannotContinueTransactionIfNotPrimary) { // Technically, the transaction should have been aborted on stepdown anyway, but it // doesn't hurt to have this kind of coverage. - ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, false), - AssertionException, - ErrorCodes::NotWritablePrimary); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + false /* startTransaction */, + boost::none /* txnRetryCounter */), + AssertionException, + ErrorCodes::NotWritablePrimary); } TEST_F(TxnParticipantTest, OlderTransactionFailsOnSessionWithNewerTransaction) { @@ -1207,8 +1257,11 @@ TEST_F(TxnParticipantTest, OlderTransactionFailsOnSessionWithNewerTransaction) { StringBuilder sb; sb << "Cannot start transaction 19 on session " << sessionId << " because a newer transaction with txnNumber 20 has already started on this session."; - ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber() - 1, autocommit, startTransaction), + ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber() - 1, + autocommit, + startTransaction, + boost::none /* txnRetryCounter */), AssertionException, sb.str()); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); @@ -1225,8 +1278,11 @@ TEST_F(TxnParticipantTest, OldRetryableWriteFailsOnSessionWithNewerTransaction) StringBuilder sb; sb << "Retryable write with txnNumber 19 is prohibited on session " << sessionId << " because a newer transaction with txnNumber 20 has already started on this session."; - ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber() - 1, boost::none, boost::none), + ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber() - 1, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */), AssertionException, sb.str()); ASSERT(txnParticipant.getLastWriteOpTime().isNull()); @@ -1257,20 +1313,23 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr auto guard = makeGuard([&]() { OperationContextSession::checkOut(opCtx()); }); // Try to start a new transaction while there is already a prepared transaction on the // session. This should fail with a PreparedTransactionInProgress error. - runFunctionFromDifferentOpCtx( - [lsid = *opCtx()->getLogicalSessionId(), - txnNumberToStart = *opCtx()->getTxnNumber() + 1](OperationContext* newOpCtx) { - newOpCtx->setLogicalSessionId(lsid); - newOpCtx->setTxnNumber(txnNumberToStart); - newOpCtx->setInMultiDocumentTransaction(); - - MongoDOperationContextSession ocs(newOpCtx); - auto txnParticipant = TransactionParticipant::get(newOpCtx); - ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(newOpCtx, txnNumberToStart, false, true), - AssertionException, - ErrorCodes::PreparedTransactionInProgress); - }); + runFunctionFromDifferentOpCtx([lsid = *opCtx()->getLogicalSessionId(), + txnNumberToStart = *opCtx()->getTxnNumber() + + 1](OperationContext* newOpCtx) { + newOpCtx->setLogicalSessionId(lsid); + newOpCtx->setTxnNumber(txnNumberToStart); + newOpCtx->setInMultiDocumentTransaction(); + + MongoDOperationContextSession ocs(newOpCtx); + auto txnParticipant = TransactionParticipant::get(newOpCtx); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(newOpCtx, + txnNumberToStart, + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */), + AssertionException, + ErrorCodes::PreparedTransactionInProgress); + }); } ASSERT_FALSE(txnParticipant.transactionIsAborted()); @@ -1299,10 +1358,13 @@ TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) { TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - ASSERT_THROWS_CODE( - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, boost::none), - AssertionException, - ErrorCodes::NoSuchTransaction); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */), + AssertionException, + ErrorCodes::NoSuchTransaction); } // Tests that a transaction aborts if it becomes too large based on the server parameter @@ -1420,8 +1482,11 @@ protected: auto txnParticipant = TransactionParticipant::get(opCtx()); ASSERT(txnParticipant.transactionIsOpen()); - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + autocommit, + startTransaction, + boost::none /* txnRetryCounter */), AssertionException, 50911); } @@ -1437,8 +1502,11 @@ protected: txnParticipant.abortTransaction(opCtx()); ASSERT(txnParticipant.transactionIsAborted()); - txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + autocommit, + startTransaction, + boost::none /* txnRetryCounter */); ASSERT(txnParticipant.transactionIsOpen()); } @@ -1453,8 +1521,11 @@ protected: txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); txnParticipant.commitUnpreparedTransaction(opCtx()); - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + autocommit, + startTransaction, + boost::none /* txnRetryCounter */), AssertionException, 50911); } @@ -1473,8 +1544,11 @@ protected: txnParticipant.addTransactionOperation(opCtx(), operation); txnParticipant.prepareTransaction(opCtx(), {}); - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + autocommit, + startTransaction, + boost::none /* txnRetryCounter */), AssertionException, 50911); } @@ -1495,8 +1569,11 @@ protected: ASSERT(txnParticipant.transactionIsAborted()); startTransaction = true; - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + autocommit, + startTransaction, + boost::none /* txnRetryCounter */), AssertionException, 50911); } @@ -1505,14 +1582,21 @@ protected: MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT_FALSE(txnParticipant.transactionIsOpen()); auto autocommit = false; auto startTransaction = true; - txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + autocommit, + startTransaction, + boost::none /* txnRetryCounter */); ASSERT(txnParticipant.transactionIsOpen()); } @@ -2129,8 +2213,11 @@ TEST_F(TransactionsMetricsTest, TransactionErrorsBeforeUnstash) { auto txnParticipant = TransactionParticipant::get(opCtx()); const bool autocommit = false; const boost::optional<bool> startTransaction = boost::none; - ASSERT_THROWS_CODE(txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction), + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + autocommit, + startTransaction, + boost::none /* txnRetryCounter */), AssertionException, ErrorCodes::NoSuchTransaction); @@ -2389,7 +2476,11 @@ TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldBeSetUponUnstashAndStash) // Start a new transaction. const auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - txnParticipant.beginOrContinue(opCtx(), higherTxnNum, false, true); + txnParticipant.beginOrContinue(opCtx(), + higherTxnNum, + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); // Time active should be zero for a new transaction. ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getTimeActiveMicros( @@ -2918,7 +3009,11 @@ TEST_F(TransactionsMetricsTest, ReportUnstashedResourcesForARetryableWrite) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(opCtx(), "find"); // Build a BSONObj containing the details which we expect to see reported when we invoke @@ -2948,7 +3043,11 @@ TEST_F(TransactionsMetricsTest, UseAPIParametersOnOpCtxForARetryableWrite) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, boost::none); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); APIParameters secondAPIParameters = APIParameters(); secondAPIParameters.setAPIVersion("3"); @@ -4210,7 +4309,8 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyFalseIfInRetryableWrite) { txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none /* autocommit */, - boost::none /* startTransaction */); + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); } @@ -4220,8 +4320,11 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyTrueIfInProgressAndOperati ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); // Start a transaction. - txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly()); txnParticipant.unstashTransactionResources(opCtx(), "find"); @@ -4235,8 +4338,11 @@ TEST_F(TxnParticipantTest, ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); // Start a transaction. - txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); @@ -4255,8 +4361,11 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyFalseIfAborted) { ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly()); // Start a transaction. - txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly()); txnParticipant.unstashTransactionResources(opCtx(), "find"); @@ -4340,8 +4449,11 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnAbortAfterPrepare) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); txnParticipant.unstashTransactionResources(opCtx(), "find"); @@ -4367,8 +4479,11 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnCommitAfterPrepare) { MongoDOperationContextSession opCtxSession(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant.beginOrContinue( - opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); txnParticipant.unstashTransactionResources(opCtx(), "find"); @@ -4389,5 +4504,287 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnCommitAfterPrepare) { ASSERT_TRUE(txnParticipant.onExitPrepare().isReady()); } +TEST_F(ShardTxnParticipantTest, CanSpecifyTxnRetryCounterOnShardSvr) { + MongoDOperationContextSession opCtxSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 0 /* txnRetryCounter */); +} + +TEST_F(ConfigTxnParticipantTest, CanSpecifyTxnRetryCounterOnConfigSvr) { + MongoDOperationContextSession opCtxSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 0 /* txnRetryCounter */); +} + +TEST_F(TxnParticipantTest, CanOnlySpecifyTxnRetryCounterInShardedClusters) { + MongoDOperationContextSession opCtxSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 0 /* txnRetryCounter */), + AssertionException, + ErrorCodes::InvalidOptions); +} + +DEATH_TEST_F(ShardTxnParticipantTest, + CannotSpecifyNegativeTxnRetryCounter, + "Cannot specify a negative txnRetryCounter") { + MongoDOperationContextSession opCtxSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + -1 /* txnRetryCounter */); +} + +DEATH_TEST_F(ShardTxnParticipantTest, + CannotSpecifyTxnRetryCounterForRetryableWrite, + "Cannot specify a txnRetryCounter for retryable write") { + MongoDOperationContextSession opCtxSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + boost::none /* autocommit */, + boost::none /* startTransaction */, + 0 /* txnRetryCounter */), + AssertionException, + ErrorCodes::InvalidOptions); +} + +TEST_F(ShardTxnParticipantTest, + CanRestartInProgressTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 1 /* txnRetryCounter */); + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); +} + +TEST_F(ShardTxnParticipantTest, + CanRestartAbortedUnpreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0 /* txnRetryCounter */); + + txnParticipant.abortTransaction(opCtx()); + ASSERT(txnParticipant.transactionIsAborted()); + + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 1 /* txnRetryCounter */); + ASSERT(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); +} + +TEST_F(ShardTxnParticipantTest, + CanRestartAbortedPreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT(txnParticipant.transactionIsInProgress()); + txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); + txnParticipant.prepareTransaction(opCtx(), {}); + txnParticipant.abortTransaction(opCtx()); + ASSERT(txnParticipant.transactionIsAborted()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 1 /* txnRetryCounter */); + ASSERT(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); +} + +TEST_F(ShardTxnParticipantTest, + CannotRestartCommittedUnpreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT(txnParticipant.transactionIsInProgress()); + txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); + txnParticipant.commitUnpreparedTransaction(opCtx()); + ASSERT(txnParticipant.transactionIsCommitted()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 1 /* txnRetryCounter */), + AssertionException, + ErrorCodes::IllegalOperation); + ASSERT(txnParticipant.transactionIsCommitted()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); +} + +TEST_F(ShardTxnParticipantTest, + CannotRestartCommittedPreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT(txnParticipant.transactionIsInProgress()); + txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); + const auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {}); + const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); + txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {}); + ASSERT_TRUE(txnParticipant.transactionIsCommitted()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 1 /* txnRetryCounter */), + AssertionException, + ErrorCodes::IllegalOperation); + ASSERT(txnParticipant.transactionIsCommitted()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); +} + +TEST_F(ShardTxnParticipantTest, + CannotRestartPreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT(txnParticipant.transactionIsInProgress()); + txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction"); + txnParticipant.prepareTransaction(opCtx(), {}); + ASSERT_TRUE(txnParticipant.transactionIsPrepared()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 1 /* txnRetryCounter */), + AssertionException, + ErrorCodes::IllegalOperation); + ASSERT(txnParticipant.transactionIsPrepared()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); +} + +TEST_F(ShardTxnParticipantTest, CannotRestartTransactionUsingTxnRetryCounterLessThanLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); + + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 1 /* txnRetryCounter */); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 0 /* txnRetryCounter */), + AssertionException, + ErrorCodes::TxnRetryCounterTooOld); + try { + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 0 /* txnRetryCounter */); + ASSERT(false); + } catch (const TxnRetryCounterTooOldException& ex) { + auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>(); + ASSERT_EQ(info->getTxnRetryCounter(), 1); + } + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); +} + +TEST_F(ShardTxnParticipantTest, CanContinueTransactionUsingTxnRetryCounterEqualToLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + txnParticipant.stashTransactionResources(opCtx()); + + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + boost::none /* startTransaction */, + 0 /* txnRetryCounter */); + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); +} + +TEST_F(ShardTxnParticipantTest, CannotContinueTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + txnParticipant.stashTransactionResources(opCtx()); + + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + boost::none /* startTransaction */, + 1 /* txnRetryCounter */), + AssertionException, + ErrorCodes::IllegalOperation); + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0); +} + +TEST_F(ShardTxnParticipantTest, CannotContinueTransactionUsingTxnRetryCounterLessThanLastUsed) { + auto sessionCheckout = checkOutSession(); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); + + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 1 /* txnRetryCounter */); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); + + txnParticipant.unstashTransactionResources(opCtx(), "insert"); + txnParticipant.stashTransactionResources(opCtx()); + ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + boost::none /* startTransaction */, + 0 /* txnRetryCounter */), + AssertionException, + ErrorCodes::TxnRetryCounterTooOld); + try { + txnParticipant.beginOrContinue(opCtx(), + *opCtx()->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + 0 /* txnRetryCounter */); + ASSERT(false); + } catch (const TxnRetryCounterTooOldException& ex) { + auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>(); + ASSERT_EQ(info->getTxnRetryCounter(), 1); + } + ASSERT_TRUE(txnParticipant.transactionIsInProgress()); + ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1); +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/txn_retry_counter_too_old_info.cpp b/src/mongo/db/txn_retry_counter_too_old_info.cpp new file mode 100644 index 00000000000..ba5f01429d9 --- /dev/null +++ b/src/mongo/db/txn_retry_counter_too_old_info.cpp @@ -0,0 +1,54 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/txn_retry_counter_too_old_info.h" + +#include "mongo/base/init.h" + +namespace mongo { + +namespace { + +MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(TxnRetryCounterTooOldInfo); + +constexpr StringData kTxnRetryCounterFieldName = "txnRetryCounter"_sd; + +} // namespace + +void TxnRetryCounterTooOldInfo::serialize(BSONObjBuilder* bob) const { + bob->append(kTxnRetryCounterFieldName, _txnRetryCounter); +} + +std::shared_ptr<const ErrorExtraInfo> TxnRetryCounterTooOldInfo::parse(const BSONObj& obj) { + return std::make_shared<TxnRetryCounterTooOldInfo>(obj[kTxnRetryCounterFieldName].Int()); +} + +} // namespace mongo diff --git a/src/mongo/db/txn_retry_counter_too_old_info.h b/src/mongo/db/txn_retry_counter_too_old_info.h new file mode 100644 index 00000000000..f126af7ede1 --- /dev/null +++ b/src/mongo/db/txn_retry_counter_too_old_info.h @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2021-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/error_extra_info.h" +#include "mongo/bson/bsonobj.h" +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/logical_session_id.h" + +namespace mongo { + +class TxnRetryCounterTooOldInfo final : public ErrorExtraInfo { +public: + static constexpr auto code = ErrorCodes::TxnRetryCounterTooOld; + + explicit TxnRetryCounterTooOldInfo(const TxnRetryCounter txnRetryCounter) + : _txnRetryCounter(txnRetryCounter){}; + + const auto& getTxnRetryCounter() const { + return _txnRetryCounter; + } + + void serialize(BSONObjBuilder* bob) const override; + static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj&); + +private: + TxnRetryCounter _txnRetryCounter; +}; + +using TxnRetryCounterTooOldException = ExceptionFor<ErrorCodes::TxnRetryCounterTooOld>; + +} // namespace mongo diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp index c0f0a8551a3..067ab52d4cd 100644 --- a/src/mongo/dbtests/storage_timestamp_tests.cpp +++ b/src/mongo/dbtests/storage_timestamp_tests.cpp @@ -1767,8 +1767,11 @@ public: auto txnParticipant = TransactionParticipant::get(_opCtx); ASSERT(txnParticipant); - txnParticipant.beginOrContinue( - _opCtx, *_opCtx->getTxnNumber(), false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(_opCtx, + *_opCtx->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(_opCtx, "insert"); { // Insert a document that will set the index as multikey. @@ -3464,8 +3467,11 @@ public: auto txnParticipant = TransactionParticipant::get(_opCtx); ASSERT(txnParticipant); // Start a retryable write. - txnParticipant.beginOrContinue( - _opCtx, txnNumber, boost::none /* autocommit */, boost::none /* startTransaction */); + txnParticipant.beginOrContinue(_opCtx, + txnNumber, + boost::none /* autocommit */, + boost::none /* startTransaction */, + boost::none /* txnRetryCounter */); } protected: @@ -3670,8 +3676,11 @@ public: auto txnParticipant = TransactionParticipant::get(_opCtx); ASSERT(txnParticipant); - txnParticipant.beginOrContinue( - _opCtx, *_opCtx->getTxnNumber(), false /* autocommit */, true /* startTransaction */); + txnParticipant.beginOrContinue(_opCtx, + *_opCtx->getTxnNumber(), + false /* autocommit */, + true /* startTransaction */, + boost::none /* txnRetryCounter */); txnParticipant.unstashTransactionResources(_opCtx, "insert"); { AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX); diff --git a/src/mongo/idl/generic_argument.idl b/src/mongo/idl/generic_argument.idl index c384e52f34d..016fe8d043b 100644 --- a/src/mongo/idl/generic_argument.idl +++ b/src/mongo/idl/generic_argument.idl @@ -99,6 +99,8 @@ generic_argument_lists: forward_to_shards: false $topologyTime: forward_to_shards: false + txnRetryCounter: + forward_to_shards: true generic_reply_field_lists: generic_reply_fields_api_v1: diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index ea3efda12ea..93ba0651bf6 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -64,6 +64,7 @@ env.Library( 'grid', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/internal_transactions_feature_flag', '$BUILD_DIR/mongo/db/sessions_collection', ], ) diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp index d4dda8ae836..99324ffcd86 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.cpp +++ b/src/mongo/s/commands/document_shard_key_update_util.cpp @@ -148,7 +148,8 @@ void startTransactionForShardKeyUpdate(OperationContext* opCtx) { auto txnNumber = opCtx->getTxnNumber(); invariant(txnNumber); - txnRouter.beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */); } BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx) { diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index c90ac6bf405..c512ccc9867 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -627,6 +627,9 @@ Status ParseAndRunCommand::RunInvocation::_setup() { auto txnNumber = opCtx->getTxnNumber(); invariant(txnNumber); + auto txnRetryCounter = opCtx->getTxnRetryCounter(); + invariant(txnRetryCounter); + auto transactionAction = ([&] { auto startTxnSetting = _parc->_osi->getStartTransaction(); if (startTxnSetting && *startTxnSetting) { @@ -641,7 +644,7 @@ Status ParseAndRunCommand::RunInvocation::_setup() { })(); startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart); - txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction); + txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction, *txnRetryCounter); } bool supportsWriteConcern = invocation->supportsWriteConcern(); diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 160bce00191..64742e972c7 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -40,10 +40,12 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/internal_transactions_feature_flag_gen.h" #include "mongo/db/jsobj.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/transaction_validation.h" +#include "mongo/db/txn_retry_counter_too_old_info.h" #include "mongo/db/vector_clock.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/logv2/log.h" @@ -244,6 +246,18 @@ std::string commitTypeToString(TransactionRouter::CommitType state) { MONGO_UNREACHABLE; } +std::string actionTypeToString(TransactionRouter::TransactionActions action) { + switch (action) { + case TransactionRouter::TransactionActions::kStart: + return "start"; + case TransactionRouter::TransactionActions::kContinue: + return "continue"; + case TransactionRouter::TransactionActions::kCommit: + return "commit"; + } + MONGO_UNREACHABLE; +} + } // unnamed namespace TransactionRouter::TransactionRouter() = default; @@ -453,6 +467,12 @@ BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded( invariant(sharedOptions.txnNumber == *osi.getTxnNumber()); } + if (feature_flags::gFeatureFlagInternalTransactions.isEnabled( + serverGlobalParams.featureCompatibility)) { + newCmd.append(OperationSessionInfoFromClient::kTxnRetryCounterFieldName, + sharedOptions.txnRetryCounter); + } + return newCmd.obj(); } @@ -634,6 +654,7 @@ TransactionRouter::Participant& TransactionRouter::Router::_createParticipant( SharedTransactionOptions sharedOptions = { o().txnNumber, + o().txnRetryCounter, o().apiParameters, o().readConcernArgs, o().atClusterTime ? boost::optional<LogicalTime>(o().atClusterTime->getTime()) @@ -893,31 +914,26 @@ void TransactionRouter::Router::_setAtClusterTime( o(lk).atClusterTime->setTime(candidateTime, p().latestStmtId); } -void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, - TxnNumber txnNumber, - TransactionActions action) { - if (txnNumber < o().txnNumber) { - // This transaction is older than the transaction currently in progress, so throw an error. - uasserted(ErrorCodes::TransactionTooOld, - str::stream() << "txnNumber " << txnNumber << " is less than last txnNumber " - << o().txnNumber << " seen in session " << _sessionId()); - } else if (txnNumber == o().txnNumber) { - // This is the same transaction as the one in progress. - auto apiParamsFromClient = APIParameters::get(opCtx); - if (action == TransactionActions::kContinue || action == TransactionActions::kCommit) { - uassert( - ErrorCodes::APIMismatchError, - "API parameter mismatch: transaction-continuing command used {}, the transaction's" - " first command used {}"_format(apiParamsFromClient.toBSON().toString(), - o().apiParameters.toBSON().toString()), - apiParamsFromClient == o().apiParameters); - } - +void TransactionRouter::Router::_beginOrContinueActiveTxnNumber(OperationContext* opCtx, + TxnNumber txnNumber, + TransactionActions action, + TxnRetryCounter txnRetryCounter) { + invariant(txnNumber == o().txnNumber); + + if (txnRetryCounter < o().txnRetryCounter) { + uasserted(TxnRetryCounterTooOldInfo(o().txnRetryCounter), + str::stream() << "Cannot " << actionTypeToString(action) << " transaction " + << txnNumber << " on session " << _sessionId() + << " using txnRetryCounter " << txnRetryCounter + << " because the transaction has already been restarted using" + << " a higher txnRetryCounter " << o().txnRetryCounter); + } else if (txnRetryCounter == o().txnRetryCounter) { switch (action) { case TransactionActions::kStart: { uasserted(ErrorCodes::ConflictingOperationInProgress, str::stream() << "txnNumber " << o().txnNumber << " for session " << _sessionId() << " already started"); + break; } case TransactionActions::kContinue: { uassert(ErrorCodes::InvalidOptions, @@ -936,62 +952,83 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, _onContinue(opCtx); break; } - } else if (txnNumber > o().txnNumber) { - // This is a newer transaction. - switch (action) { - case TransactionActions::kStart: { - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); - uassert( - ErrorCodes::InvalidOptions, - "The first command in a transaction cannot specify a readConcern level other " - "than local, majority, or snapshot", - !readConcernArgs.hasLevel() || - isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())); - - _resetRouterState(opCtx, txnNumber); - - { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - o(lk).apiParameters = APIParameters::get(opCtx); - o(lk).readConcernArgs = readConcernArgs; - } + } else { + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot " << actionTypeToString(action) << " transaction " + << txnNumber << " on session " << _sessionId() + << " using txnRetryCounter " << txnRetryCounter + << " because it is using a lower txnRetryCounter " + << o().txnRetryCounter, + action == TransactionActions::kStart); + uassert(ErrorCodes::IllegalOperation, + str::stream() << "Cannot restart transaction " << txnNumber << " on session " + << _sessionId() << " using txnRetryCounter " << txnRetryCounter + << " because it has already started to commit", + o().commitType == CommitType::kNotInitiated || !o().abortCause.empty()); + _resetRouterStateForStartTransaction(opCtx, txnNumber, txnRetryCounter); + } +} - if (o().readConcernArgs.getLevel() == - repl::ReadConcernLevel::kSnapshotReadConcern) { - stdx::lock_guard<Client> lk(*opCtx->getClient()); - o(lk).atClusterTime.emplace(); - } +void TransactionRouter::Router::_beginNewTxnNumber(OperationContext* opCtx, + TxnNumber txnNumber, + TransactionActions action, + TxnRetryCounter txnRetryCounter) { + invariant(txnNumber > o().txnNumber); + + switch (action) { + case TransactionActions::kStart: { + _resetRouterStateForStartTransaction(opCtx, txnNumber, txnRetryCounter); + break; + } + case TransactionActions::kContinue: { + uasserted(ErrorCodes::NoSuchTransaction, + str::stream() << "cannot continue txnId " << o().txnNumber << " for session " + << _sessionId() << " with txnId " << txnNumber); + } + case TransactionActions::kCommit: { + _resetRouterState(opCtx, txnNumber, txnRetryCounter); + // If the first action seen by the router for this transaction is to commit, that + // means that the client is attempting to recover a commit decision. + p().isRecoveringCommit = true; - LOGV2_DEBUG(22889, - 3, - "{sessionId}:{txnNumber} New transaction started", - "New transaction started", - "sessionId"_attr = _sessionId().getId(), - "txnNumber"_attr = o().txnNumber); - break; - } - case TransactionActions::kContinue: { - uasserted(ErrorCodes::NoSuchTransaction, - str::stream() - << "cannot continue txnId " << o().txnNumber << " for session " - << _sessionId() << " with txnId " << txnNumber); - } - case TransactionActions::kCommit: { - _resetRouterState(opCtx, txnNumber); - // If the first action seen by the router for this transaction is to commit, that - // means that the client is attempting to recover a commit decision. - p().isRecoveringCommit = true; - - LOGV2_DEBUG(22890, - 3, - "{sessionId}:{txnNumber} Commit recovery started", - "Commit recovery started", - "sessionId"_attr = _sessionId().getId(), - "txnNumber"_attr = o().txnNumber); + LOGV2_DEBUG(22890, + 3, + "{sessionId}:{txnNumber} Commit recovery started", + "Commit recovery started", + "sessionId"_attr = _sessionId().getId(), + "txnNumber"_attr = o().txnNumber); - break; - } - }; + break; + } + }; +} + +void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, + TxnNumber txnNumber, + TransactionActions action, + TxnRetryCounter txnRetryCounter) { + invariant(txnRetryCounter >= 0, "Cannot specify a negative txnRetryCounter"); + + if (txnNumber < o().txnNumber) { + // This transaction is older than the transaction currently in progress, so throw an error. + uasserted(ErrorCodes::TransactionTooOld, + str::stream() << "txnNumber " << txnNumber << " is less than last txnNumber " + << o().txnNumber << " seen in session " << _sessionId()); + } else if (txnNumber == o().txnNumber) { + // This is the same transaction as the one in progress. + auto apiParamsFromClient = APIParameters::get(opCtx); + if (action == TransactionActions::kContinue || action == TransactionActions::kCommit) { + uassert( + ErrorCodes::APIMismatchError, + "API parameter mismatch: transaction-continuing command used {}, the transaction's" + " first command used {}"_format(apiParamsFromClient.toBSON().toString(), + o().apiParameters.toBSON().toString()), + apiParamsFromClient == o().apiParameters); + } + _beginOrContinueActiveTxnNumber(opCtx, txnNumber, action, txnRetryCounter); + } else { + // This is a newer transaction. + _beginNewTxnNumber(opCtx, txnNumber, action, txnRetryCounter); } _updateLastClientInfo(opCtx->getClient()); @@ -1326,9 +1363,11 @@ void TransactionRouter::Router::appendRecoveryToken(BSONObjBuilder* builder) con } void TransactionRouter::Router::_resetRouterState(OperationContext* opCtx, - const TxnNumber& txnNumber) { + const TxnNumber& txnNumber, + const TxnRetryCounter& txnRetryCounter) { stdx::lock_guard<Client> lk(*opCtx->getClient()); o(lk).txnNumber = txnNumber; + o(lk).txnRetryCounter = txnRetryCounter; o(lk).commitType = CommitType::kNotInitiated; p().isRecoveringCommit = false; o(lk).participants.clear(); @@ -1350,6 +1389,37 @@ void TransactionRouter::Router::_resetRouterState(OperationContext* opCtx, p().firstStmtId = kDefaultFirstStmtId; }; +void TransactionRouter::Router::_resetRouterStateForStartTransaction( + OperationContext* opCtx, const TxnNumber& txnNumber, const TxnRetryCounter& txnRetryCounter) { + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); + uassert(ErrorCodes::InvalidOptions, + "The first command in a transaction cannot specify a readConcern level " + "other than local, majority, or snapshot", + !readConcernArgs.hasLevel() || + isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())); + + _resetRouterState(opCtx, txnNumber, txnRetryCounter); + + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).apiParameters = APIParameters::get(opCtx); + o(lk).readConcernArgs = readConcernArgs; + } + + if (o().readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).atClusterTime.emplace(); + } + + LOGV2_DEBUG(22889, + 3, + "{sessionId}:{txnNumber} New transaction started", + "New transaction started", + "sessionId"_attr = _sessionId().getId(), + "txnNumber"_attr = o().txnNumber, + "txnRetryCounter"_attr = txnRetryCounter); +}; + BSONObj TransactionRouter::Router::_commitWithRecoveryToken(OperationContext* opCtx, const TxnRecoveryToken& recoveryToken) { uassert(ErrorCodes::NoSuchTransaction, diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index b31be66cc5d..3b360b1eb6f 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -83,6 +83,7 @@ public: struct SharedTransactionOptions { // Set for all distributed transactions. TxnNumber txnNumber; + TxnRetryCounter txnRetryCounter; APIParameters apiParameters; repl::ReadConcernArgs readConcernArgs; @@ -367,7 +368,8 @@ public: */ void beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber, - TransactionActions action); + TransactionActions action, + TxnRetryCounter txnRetryCounter); /** * Updates transaction diagnostics when the transaction's session is checked in. @@ -534,7 +536,35 @@ public: * time. This is required because we don't create a new router object for each transaction, * but instead reuse the same object across different transactions. */ - void _resetRouterState(OperationContext* opCtx, const TxnNumber& txnNumber); + void _resetRouterState(OperationContext* opCtx, + const TxnNumber& txnNumber, + const TxnRetryCounter& txnRetryCounter); + + /** + * Calls _resetRouterState and then resets the read concern and the cluster time of the + * timestamp that all participant shards in the current transaction with snapshot level read + * concern must read from. + */ + void _resetRouterStateForStartTransaction(OperationContext* opCtx, + const TxnNumber& txnNumber, + const TxnRetryCounter& txnRetryCounter); + + /** + * Continues or restarts the currently active transaction. + */ + void _beginOrContinueActiveTxnNumber(OperationContext* opCtx, + TxnNumber txnNumber, + TransactionActions action, + TxnRetryCounter txnRetryCounter); + + /** + * Starts a new transaction or continues a transaction started by a different router to + * recover the commit decision. + */ + void _beginNewTxnNumber(OperationContext* opCtx, + TxnNumber txnNumber, + TransactionActions action, + TxnRetryCounter txnRetryCounter); /** * Internal method for committing a transaction. Should only throw on failure to send @@ -696,6 +726,10 @@ private: // called. Otherwise set to kUninitializedTxnNumber. TxnNumber txnNumber{kUninitializedTxnNumber}; + // The last seen txnRetryCounter for the currently active transaction, if beginOrContinueTxn + // has been called. Otherwise set to kUninitializedTxnRetryCounter. + TxnRetryCounter txnRetryCounter{kUninitializedTxnRetryCounter}; + // Is updated at commit time to reflect which commit path was taken. CommitType commitType{CommitType::kNotInitiated}; diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 4e286306a4d..447cfadc389 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -38,7 +38,10 @@ #include "mongo/db/commands.h" #include "mongo/db/namespace_string.h" #include "mongo/db/repl/read_concern_args.h" +#include "mongo/db/txn_retry_counter_too_old_info.h" #include "mongo/db/vector_clock.h" +#include "mongo/idl/server_parameter_test_util.h" +#include "mongo/logv2/log.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/catalog/type_shard.h" #include "mongo/s/router_transactions_metrics.h" @@ -110,6 +113,8 @@ protected: StaleConfigInfo(kViewNss, ChunkVersion::UNSHARDED(), boost::none, shard1), "The metadata for the collection is not loaded"}; + RAIIServerParameterControllerForTest _controller{"featureFlagInternalTransactions", true}; + void setUp() override { ShardingTestFixture::setUp(); configTargeter()->setFindHostReturnValue(kTestConfigShardHost); @@ -230,8 +235,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" @@ -241,7 +248,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, << "snapshot" << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" << true - << "autocommit" << false << "txnNumber" << txnNum); + << "autocommit" << false << "txnNumber" << txnNum + << "txnRetryCounter" << 0); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), @@ -259,7 +267,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, ASSERT_BSONOBJ_EQ(BSON("update" << "test" << "coordinator" << true << "autocommit" << false << "txnNumber" - << txnNum), + << txnNum << "txnRetryCounter" << 0), newCmd); } } @@ -268,8 +276,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime) TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" @@ -279,7 +289,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime) << "snapshot" << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" << true - << "autocommit" << false << "txnNumber" << txnNum); + << "autocommit" << false << "txnNumber" << txnNum + << "txnRetryCounter" << 0); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), @@ -297,7 +308,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime) ASSERT_BSONOBJ_EQ(BSON("update" << "test" << "coordinator" << true << "autocommit" << false << "txnNumber" - << txnNum), + << txnNum << "txnRetryCounter" << 0), newCmd); } } @@ -307,8 +318,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting) auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue), + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */), AssertionException, ErrorCodes::NoSuchTransaction); } @@ -317,8 +330,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" @@ -328,7 +343,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe << "snapshot" << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" << true - << "autocommit" << false << "txnNumber" << txnNum); + << "autocommit" << false << "txnNumber" << txnNum + << "txnRetryCounter" << 0); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), @@ -346,7 +362,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe ASSERT_BSONOBJ_EQ(BSON("update" << "test" << "coordinator" << true << "autocommit" << false << "txnNumber" - << txnNum), + << txnNum << "txnRetryCounter" << 0), newCmd); } @@ -357,7 +373,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe << "snapshot" << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "autocommit" << false << "txnNumber" - << txnNum); + << txnNum << "txnRetryCounter" << 0); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), @@ -374,7 +390,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe << "test")); ASSERT_BSONOBJ_EQ(BSON("update" << "test" - << "autocommit" << false << "txnNumber" << txnNum), + << "autocommit" << false << "txnNumber" << txnNum + << "txnRetryCounter" << 0), newCmd); } } @@ -383,8 +400,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); { @@ -399,13 +418,16 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) << "snapshot" << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" << true - << "autocommit" << false << "txnNumber" << txnNum), + << "autocommit" << false << "txnNumber" << txnNum + << "txnRetryCounter" << 0), newCmd); } TxnNumber txnNum2{5}; - txnRouter.beginOrContinueTxn( - operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum2, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" @@ -415,7 +437,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) << "snapshot" << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" << true - << "autocommit" << false << "txnNumber" << txnNum2); + << "autocommit" << false << "txnNumber" << txnNum2 + << "txnRetryCounter" << 0); { auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), @@ -426,12 +449,329 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) } } -TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) { +DEATH_TEST_F(TransactionRouterTestWithDefaultSession, + CannotSpecifyNegativeTxnRetryCounter, + "Cannot specify a negative txnRetryCounter") { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, -1); +} + +TEST_F(TransactionRouterTestWithDefaultSession, NotAttachTxnRetryCounterIfFeatureFlagIsNotEnabled) { + RAIIServerParameterControllerForTest controller{"featureFlagInternalTransactions", false}; + TxnNumber txnNum{3}; + BSONObj expectedCmd = BSON("insert" + << "test" + << "readConcern" + << BSON("level" + << "snapshot" + << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) + << "startTransaction" << true << "coordinator" << true + << "autocommit" << false << "txnNumber" << txnNum); auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0); + txnRouter.setDefaultAtClusterTime(operationContext()); + ASSERT_BSONOBJ_EQ(expectedCmd, + txnRouter.attachTxnFieldsIfNeeded(operationContext(), + shard1, + BSON("insert" + << "test"))); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CanRestartInProgressTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + TxnNumber txnNum{3}; + BSONObj expectedCmd = BSON("insert" + << "test" + << "readConcern" + << BSON("level" + << "snapshot" + << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) + << "startTransaction" << true << "coordinator" << true + << "autocommit" << false << "txnNumber" << txnNum); + + auto txnRouter = TransactionRouter::get(operationContext()); + + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0); + txnRouter.setDefaultAtClusterTime(operationContext()); + ASSERT_BSONOBJ_EQ(expectedCmd.addFields(BSON("txnRetryCounter" << 0)), + txnRouter.attachTxnFieldsIfNeeded(operationContext(), + shard1, + BSON("insert" + << "test"))); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1); + txnRouter.setDefaultAtClusterTime(operationContext()); + ASSERT_BSONOBJ_EQ(expectedCmd.addFields(BSON("txnRetryCounter" << 1)), + txnRouter.attachTxnFieldsIfNeeded(operationContext(), + shard1, + BSON("insert" + << "test"))); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CanRestartAbortedTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + TxnNumber txnNum{3}; + BSONObj expectedCmd = BSON("insert" + << "test" + << "readConcern" + << BSON("level" + << "snapshot" + << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) + << "startTransaction" << true << "coordinator" << true + << "autocommit" << false << "txnNumber" << txnNum); + + auto txnRouter = TransactionRouter::get(operationContext()); + + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0); + txnRouter.setDefaultAtClusterTime(operationContext()); + ASSERT_BSONOBJ_EQ(expectedCmd.addFields(BSON("txnRetryCounter" << 0)), + txnRouter.attachTxnFieldsIfNeeded(operationContext(), + shard1, + BSON("insert" + << "test"))); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); + startCapturingLogMessages(); + auto future = launchAsync([&] { txnRouter.abortTransaction(operationContext()); }); + expectAbortTransactions({hostAndPort1}, *operationContext()->getLogicalSessionId(), txnNum); + future.default_timed_get(); + stopCapturingLogMessages(); + + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1); + txnRouter.setDefaultAtClusterTime(operationContext()); + ASSERT_BSONOBJ_EQ(expectedCmd.addFields(BSON("txnRetryCounter" << 1)), + txnRouter.attachTxnFieldsIfNeeded(operationContext(), + shard1, + BSON("insert" + << "test"))); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CanRestartTransactionWithFailedCommitUsingTxnRetryCounterGreaterThanLastUsed) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0); + txnRouter.setDefaultAtClusterTime(operationContext()); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); + ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == + txnRouter.getParticipant(shard1)->readOnly); + + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 0); + auto future = + launchAsync([&] { txnRouter.commitTransaction(operationContext(), boost::none); }); + expectCommitTransaction(kDummyErrorRes); + future.default_timed_get(); + + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 1); + auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none); + ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CannotRestartCommittedTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0); + txnRouter.setDefaultAtClusterTime(operationContext()); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 0); + auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none); + ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); + + ASSERT_THROWS_CODE( + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1), + AssertionException, + ErrorCodes::IllegalOperation); + + commitResult = txnRouter.commitTransaction(operationContext(), boost::none); + ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CannotRestartTransactionUsingTxnRetryCounterLessThanLastUsed) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1); + + ASSERT_THROWS_CODE( + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0), + AssertionException, + ErrorCodes::TxnRetryCounterTooOld); + try { + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 0); + } catch (const TxnRetryCounterTooOldException& ex) { + auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>(); + ASSERT_EQ(info->getTxnRetryCounter(), 1); + } + + auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none); + ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CanContinueTransactionUsingTxnRetryCounterEqualToLastUsed) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1); + txnRouter.setDefaultAtClusterTime(operationContext()); + + // (Must reset readConcern from "snapshot".) + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 1); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CannotContinueTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0); + txnRouter.setDefaultAtClusterTime(operationContext()); + + // (Must reset readConcern from "snapshot".) + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + ASSERT_THROWS_CODE( + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 1), + AssertionException, + ErrorCodes::IllegalOperation); + + auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none); + ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CannotContinueTransactionUsingTxnRetryCounterLessThanLastUsed) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1); + txnRouter.setDefaultAtClusterTime(operationContext()); + + // (Must reset readConcern from "snapshot".) + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + ASSERT_THROWS_CODE( + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 0), + AssertionException, + ErrorCodes::TxnRetryCounterTooOld); + try { + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 0); + } catch (const TxnRetryCounterTooOldException& ex) { + auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>(); + ASSERT_EQ(info->getTxnRetryCounter(), 1); + } + + auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none); + ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CanCommitTransactionUsingTxnRetryCounterEqualToLastUsed) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1); + txnRouter.setDefaultAtClusterTime(operationContext()); + + // (Must reset readConcern from "snapshot".) + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 1); + + auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none); + ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CannotCommitTransactionUsingTxnRetryCounterGreaterThanLastUsed) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0); + txnRouter.setDefaultAtClusterTime(operationContext()); + + // (Must reset readConcern from "snapshot".) + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + ASSERT_THROWS_CODE( + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 1), + AssertionException, + ErrorCodes::IllegalOperation); + + auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none); + ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); +} + +TEST_F(TransactionRouterTestWithDefaultSession, + CannotCommitTransactionUsingTxnRetryCounterLessThanLastUsed) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1); + txnRouter.setDefaultAtClusterTime(operationContext()); + + // (Must reset readConcern from "snapshot".) + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + ASSERT_THROWS_CODE( + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 0), + AssertionException, + ErrorCodes::TxnRetryCounterTooOld); + try { + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 0); + } catch (const TxnRetryCounterTooOldException& ex) { + auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>(); + ASSERT_EQ(info->getTxnRetryCounter(), 1); + } + + auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none); + ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); +} + +TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_FALSE(txnRouter.getCoordinatorId()); @@ -453,8 +793,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) { } TxnNumber txnNum2{5}; - txnRouter.beginOrContinueTxn( - operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum2, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_FALSE(txnRouter.getCoordinatorId()); @@ -472,8 +814,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForRea TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // The recovery shard is unset initially. @@ -492,8 +836,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForRea txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse); ASSERT_FALSE(txnRouter.getRecoveryShardId()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); // The recovery shard is not set even if the participants say they did a write for commit. auto future = @@ -514,8 +860,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -529,8 +877,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -550,8 +900,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Shard1's response says read-only. @@ -571,8 +923,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Shard1's response says not read-only. @@ -595,8 +949,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Shard1's response says not read-only. @@ -607,8 +963,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, // New statement. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); // Shard2 responds, it doesn't matter whether it's read-only, just that it's a pending // participant. @@ -631,8 +989,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNe TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Shard1's response says not read-only. @@ -643,8 +1003,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNe // Start new transaction on session. TxnNumber newTxnNum{4}; - txnRouter.beginOrContinueTxn( - operationContext(), newTxnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + newTxnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); ASSERT_FALSE(txnRouter.getRecoveryShardId()); } @@ -653,8 +1015,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, DoesNotAttachTxnNumIfAlreadyTher TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" @@ -664,7 +1028,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, DoesNotAttachTxnNumIfAlreadyTher << "snapshot" << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" << true - << "autocommit" << false); + << "autocommit" << false << "txnRetryCounter" << 0); auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, @@ -680,8 +1044,10 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), @@ -695,8 +1061,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfA TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); { @@ -714,7 +1082,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfA << "snapshot" << "atClusterTime" << kInMemoryLogicalTime.asTimestamp()) << "startTransaction" << true << "coordinator" << true - << "autocommit" << false << "txnNumber" << txnNum), + << "autocommit" << false << "txnNumber" << txnNum + << "txnRetryCounter" << 0), newCmd); } } @@ -726,14 +1095,18 @@ TEST_F(TransactionRouterTestWithDefaultSession, SameAPIParametersAfterFirstState TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Continuing with the same API params succeeds. (Must reset readConcern from "snapshot".) repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); } TEST_F(TransactionRouterTestWithDefaultSession, DifferentAPIParametersAfterFirstStatement) { @@ -743,16 +1116,20 @@ TEST_F(TransactionRouterTestWithDefaultSession, DifferentAPIParametersAfterFirst TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Can't continue with different params. (Must reset readConcern from "snapshot".) APIParameters::get(operationContext()).setAPIStrict(true); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); ASSERT_THROWS_CODE( - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue), + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */), DBException, ErrorCodes::APIMismatchError); } @@ -764,16 +1141,20 @@ TEST_F(TransactionRouterTestWithDefaultSession, NoAPIParametersAfterFirstStateme TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Can't continue without params. (Must reset readConcern from "snapshot".) APIParameters::get(operationContext()) = APIParameters(); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); ASSERT_THROWS_CODE( - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue), + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */), DBException, ErrorCodes::APIMismatchError); } @@ -782,13 +1163,17 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFir TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_THROWS_CODE( - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue), + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */), DBException, ErrorCodes::InvalidOptions); } @@ -799,15 +1184,17 @@ TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughEmptyReadConcernToP TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" << "test" << "readConcern" << BSONObj() << "startTransaction" << true << "coordinator" << true << "autocommit" << false << "txnNumber" - << txnNum); + << txnNum << "txnRetryCounter" << 0); auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, @@ -825,8 +1212,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedNewObj = BSON("insert" @@ -834,7 +1223,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, << "readConcern" << BSON("afterClusterTime" << kAfterClusterTime.asTimestamp()) << "startTransaction" << true << "coordinator" << true - << "autocommit" << false << "txnNumber" << txnNum); + << "autocommit" << false << "txnNumber" << txnNum + << "txnRetryCounter" << 0); auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, @@ -850,8 +1240,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedReadConcernLeve TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart), + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */), DBException, ErrorCodes::InvalidOptions); } @@ -865,8 +1257,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart), + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */), DBException, ErrorCodes::InvalidOptions); } @@ -880,8 +1274,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart), + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */), DBException, ErrorCodes::InvalidOptions); } @@ -891,8 +1287,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsO TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_THROWS(txnRouter.commitTransaction(operationContext(), boost::none), AssertionException); @@ -901,6 +1299,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsO void checkSessionDetails(const BSONObj& cmdObj, const LogicalSessionId& lsid, const TxnNumber txnNum, + const TxnRetryCounter txnRetryCounter, boost::optional<bool> isCoordinator) { auto osi = OperationSessionInfoFromClient::parse("testTxnRouter"_sd, cmdObj); @@ -913,6 +1312,9 @@ void checkSessionDetails(const BSONObj& cmdObj, ASSERT(osi.getAutocommit()); ASSERT_FALSE(*osi.getAutocommit()); + ASSERT(osi.getTxnRetryCounter()); + ASSERT_EQ(txnRetryCounter, *osi.getTxnRetryCounter()); + if (isCoordinator) { ASSERT_EQ(*isCoordinator, cmdObj["coordinator"].trueValue()); } else { @@ -931,12 +1333,16 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none); ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1)); @@ -950,10 +1356,11 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForSingleParticipantThatIsReadOnly) { TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -962,8 +1369,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + txnRetryCounter); auto future = launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); @@ -975,7 +1384,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "commitTransaction"); - checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */); return BSON("ok" << 1); }); @@ -986,10 +1396,11 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForSingleParticipantThatDidAWrite) { TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -998,8 +1409,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + txnRetryCounter); auto future = launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); @@ -1011,7 +1424,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "commitTransaction"); - checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */); return BSON("ok" << 1); }); @@ -1022,10 +1436,11 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForMultipleParticipantsThatAreAllReadOnly) { TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -1036,8 +1451,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + txnRetryCounter); auto future = launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); @@ -1057,8 +1474,11 @@ TEST_F(TransactionRouterTestWithDefaultSession, ASSERT_EQ(cmdName, "commitTransaction"); // The shard with hostAndPort1 is expected to be the coordinator. - checkSessionDetails( - request.cmdObj, getSessionId(), txnNum, (request.target == hostAndPort1)); + checkSessionDetails(request.cmdObj, + getSessionId(), + txnNum, + txnRetryCounter, + (request.target == hostAndPort1)); return kOkReadOnlyTrueResponse; }); @@ -1072,10 +1492,11 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCoordinateCommitForMultipleParticipantsOnlyOneDidAWrite) { TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -1083,8 +1504,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + txnRetryCounter); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1109,7 +1532,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, expectedParticipants.erase(shardId); } - checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */); return BSON("ok" << 1); }); @@ -1120,10 +1544,11 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCoordinateCommitForMultipleParticipantsMoreThanOneDidAWrite) { TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -1131,8 +1556,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + txnRetryCounter); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1157,7 +1584,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, expectedParticipants.erase(shardId); } - checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */); return BSON("ok" << 1); }); @@ -1168,6 +1596,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setLogicalSessionId(lsid); @@ -1178,7 +1607,8 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, txnRetryCounter); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1196,7 +1626,8 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) { auto participantElements = request.cmdObj["participants"].Array(); ASSERT_TRUE(participantElements.empty()); - checkSessionDetails(request.cmdObj, lsid, txnNum, true); + checkSessionDetails( + request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */); checkWriteConcern(request.cmdObj, writeConcern); return BSON("ok" << 1); @@ -1207,8 +1638,10 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) { // Sending commit with a recovery token again should cause the router to use the recovery path // again. - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + txnRetryCounter); future = launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); @@ -1222,7 +1655,8 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) { auto participantElements = request.cmdObj["participants"].Array(); ASSERT_TRUE(participantElements.empty()); - checkSessionDetails(request.cmdObj, lsid, txnNum, true); + checkSessionDetails( + request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */); checkWriteConcern(request.cmdObj, writeConcern); return BSON("ok" << 1); @@ -1234,6 +1668,7 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) { TEST_F(TransactionRouterTestWithDefaultSession, CrossShardTxnCommitWorksAfterRecoveryCommitForPreviousTransaction) { TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setTxnNumber(txnNum); @@ -1244,7 +1679,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, auto txnRouter = TransactionRouter::get(opCtx); // Simulate recovering a commit with a recovery token and no participants. { - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, txnRetryCounter); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1262,7 +1698,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, auto participantElements = request.cmdObj["participants"].Array(); ASSERT_TRUE(participantElements.empty()); - checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */); checkWriteConcern(request.cmdObj, writeConcern); return BSON("ok" << 1); @@ -1275,8 +1712,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, // should be sent with the correct participant list. { ++txnNum; - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -1284,8 +1723,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + txnRetryCounter); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1310,7 +1751,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, expectedParticipants.erase(shardId); } - checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */); return BSON("ok" << 1); }); @@ -1322,6 +1764,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, RouterShouldWorkAsRecoveryRouterEvenIfItHasSeenPreviousTransactions) { TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setTxnNumber(txnNum); @@ -1333,8 +1776,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, // Run a cross-shard transaction with two-phase commit. The commit should be sent with the // correct participant list. { - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -1342,8 +1787,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + txnRetryCounter); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1368,7 +1815,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, expectedParticipants.erase(shardId); } - checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */); return BSON("ok" << 1); }); @@ -1381,7 +1829,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, { ++txnNum; - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, txnRetryCounter); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1399,7 +1848,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, auto participantElements = request.cmdObj["participants"].Array(); ASSERT_TRUE(participantElements.empty()); - checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */); checkWriteConcern(request.cmdObj, writeConcern); return BSON("ok" << 1); @@ -1422,7 +1872,8 @@ TEST_F(TransactionRouterTest, CommitWithEmptyRecoveryToken) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, 0 /* txnRetryCounter */); TxnRecoveryToken recoveryToken; ASSERT_THROWS_CODE(txnRouter.commitTransaction(operationContext(), recoveryToken), @@ -1443,7 +1894,8 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, 0 /* txnRetryCounter */); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(ShardId("magicShard")); @@ -1461,12 +1913,56 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) { ASSERT_THROWS_CODE(future.default_timed_get(), DBException, ErrorCodes::ShardNotFound); } +TEST_F(TransactionRouterTestWithDefaultSession, CommitWithRecoveryTokenAndTxnRetryCounter) { + TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{1}; + + operationContext()->setTxnNumber(txnNum); + + WriteConcernOptions writeConcern(10, WriteConcernOptions::SyncMode::NONE, 0); + operationContext()->setWriteConcern(writeConcern); + + auto txnRouter = TransactionRouter::get(operationContext()); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kCommit, + txnRetryCounter); + txnRouter.setDefaultAtClusterTime(operationContext()); + + TxnRecoveryToken recoveryToken; + recoveryToken.setRecoveryShardId(shard1); + + auto future = + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); + + onCommand([&](const RemoteCommandRequest& request) { + ASSERT_EQ(hostAndPort1, request.target); + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "coordinateCommitTransaction"); + + auto participantElements = request.cmdObj["participants"].Array(); + ASSERT_TRUE(participantElements.empty()); + + checkSessionDetails( + request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */); + checkWriteConcern(request.cmdObj, writeConcern); + + return BSON("ok" << 1); + }); + + future.default_timed_get(); +} + TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsResetAtClusterTime) { TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedReadConcern = BSON("level" @@ -1513,8 +2009,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); BSONObj expectedReadConcern = BSON("level" @@ -1552,8 +2050,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, // Later statements cannot change atClusterTime. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); LogicalTime laterTimeNewStmt(Timestamp(1000, 1)); ASSERT_GT(laterTimeNewStmt, laterTimeSameStmt); @@ -1574,8 +2074,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipa TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Successfully start a transaction on two shards, selecting one as the coordinator. @@ -1624,8 +2126,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAft TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT(txnRouter.canContinueOnSnapshotError()); @@ -1633,21 +2137,27 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAft txnRouter.setDefaultAtClusterTime(operationContext()); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); ASSERT_FALSE(txnRouter.canContinueOnSnapshotError()); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); ASSERT_FALSE(txnRouter.canContinueOnSnapshotError()); } TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreatedAt) { TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Transaction 1 contacts shard1 and shard2 during the first command, then shard3 in the second @@ -1661,8 +2171,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate ASSERT_EQ(txnRouter.getParticipant(shard2)->stmtIdCreatedAt, initialStmtId); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); ShardId shard3("shard3"); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {}); @@ -1677,8 +2189,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern); TxnNumber txnNum2{5}; - txnRouter.beginOrContinueTxn( - operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum2, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {}); @@ -1688,8 +2202,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate ASSERT_EQ(txnRouter.getParticipant(shard2)->stmtIdCreatedAt, initialStmtId); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum2, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum2, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); ASSERT_EQ(txnRouter.getParticipant(shard1)->stmtIdCreatedAt, initialStmtId + 1); @@ -1700,8 +2216,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Start a transaction on two shards, selecting one as the coordinator, but simulate a @@ -1745,8 +2263,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClea TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // First statement successfully targets one shard, selecing it as the coordinator. @@ -1760,8 +2280,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClea // least one of them. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {}); @@ -1789,8 +2311,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); // First statement selects an atClusterTime. @@ -1800,8 +2324,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, // change the atClusterTime. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); LogicalTime laterTime(Timestamp(1000, 1)); ASSERT_GT(laterTime, kInMemoryLogicalTime); @@ -1842,8 +2368,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, WritesCanOnlyBeRetriedIfFirstOve TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); for (auto writeCmd : writeCmds) { @@ -1857,8 +2385,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, WritesCanOnlyBeRetriedIfFirstOve // Advance to the next command. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); for (auto writeCmd : writeCmds) { ASSERT_FALSE(txnRouter.canContinueOnStaleShardOrDbError(writeCmd, kDummyStatus)); @@ -1879,7 +2409,8 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_THROWS_CODE( @@ -1889,6 +2420,7 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) { TEST_F(TransactionRouterTest, AbortForSingleParticipant) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setLogicalSessionId(lsid); @@ -1897,7 +2429,8 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -1910,7 +2443,8 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) { auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "abortTransaction"); - checkSessionDetails(request.cmdObj, lsid, txnNum, true); + checkSessionDetails( + request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */); return kOkReadOnlyFalseResponse; }); @@ -1922,6 +2456,7 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) { TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setLogicalSessionId(lsid); @@ -1930,7 +2465,8 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); @@ -1951,7 +2487,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) { auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "abortTransaction"); - checkSessionDetails(request.cmdObj, lsid, txnNum, target->second); + checkSessionDetails(request.cmdObj, lsid, txnNum, txnRetryCounter, target->second); targets.erase(request.target); return kOkReadOnlyFalseResponse; @@ -1965,6 +2501,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) { TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransaction) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setLogicalSessionId(lsid); @@ -1973,7 +2510,8 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransa RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); @@ -1997,7 +2535,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransa auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "abortTransaction"); - checkSessionDetails(request.cmdObj, lsid, txnNum, target->second); + checkSessionDetails(request.cmdObj, lsid, txnNum, txnRetryCounter, target->second); targets.erase(request.target); @@ -2014,6 +2552,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransa TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setLogicalSessionId(lsid); @@ -2022,7 +2561,8 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); @@ -2046,7 +2586,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "abortTransaction"); - checkSessionDetails(request.cmdObj, lsid, txnNum, target->second); + checkSessionDetails(request.cmdObj, lsid, txnNum, txnRetryCounter, target->second); targets.erase(request.target); @@ -2068,8 +2608,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNe TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // One shard is targeted by the first statement. @@ -2097,8 +2639,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNe // Advance to a later client statement that targets a new shard. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); auto secondShardCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); ASSERT_TRUE(secondShardCmd["startTransaction"].trueValue()); @@ -2126,7 +2670,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Should not throw. @@ -2136,6 +2681,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) { TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setLogicalSessionId(lsid); @@ -2144,7 +2690,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2158,7 +2705,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) { auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "abortTransaction"); - checkSessionDetails(request.cmdObj, lsid, txnNum, true); + checkSessionDetails( + request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */); return kOkReadOnlyFalseResponse; }); @@ -2169,6 +2717,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) { TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setLogicalSessionId(lsid); @@ -2177,7 +2726,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); @@ -2197,7 +2747,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) { auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "abortTransaction"); - checkSessionDetails(request.cmdObj, lsid, txnNum, target->second); + checkSessionDetails(request.cmdObj, lsid, txnNum, txnRetryCounter, target->second); targets.erase(request.target); return kOkReadOnlyFalseResponse; @@ -2210,6 +2760,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) { TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; + TxnRetryCounter txnRetryCounter{0}; auto opCtx = operationContext(); opCtx->setLogicalSessionId(lsid); @@ -2218,7 +2769,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2232,7 +2784,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) { auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "abortTransaction"); - checkSessionDetails(request.cmdObj, lsid, txnNum, true); + checkSessionDetails( + request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */); return BSON("ok" << 0); }); @@ -2249,7 +2802,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, AbortPropagatesWriteConcern) { WriteConcernOptions writeConcern(10, WriteConcernOptions::SyncMode::NONE, 0); opCtx->setWriteConcern(writeConcern); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(opCtx); txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {}); @@ -2272,8 +2826,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinueOnlyOnStaleVersionOnFirs TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2311,8 +2867,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinueOnlyOnStaleVersionOnFirs // Start a new transaction statement. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); // Cannot retry on a stale config error with one participant after the first statement. txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2324,13 +2882,17 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinuingTransactionPlacesItsRe TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); ASSERT(repl::ReadConcernArgs::get(operationContext()).getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern); @@ -2341,14 +2903,18 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); // First statement does not select an atClusterTime, but does not target any participants. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); // Subsequent statement does select an atClusterTime and does target a participant. txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2366,8 +2932,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, // The next statement cannot change the atClusterTime. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); LogicalTime laterTimeSameStmt(Timestamp(100, 1)); ASSERT_GT(laterTimeSameStmt, kInMemoryLogicalTime); @@ -2388,8 +2956,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernHasNoAtClu repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum++, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); // No atClusterTime is placed on the router by default. ASSERT_FALSE(txnRouter.mustUseAtClusterTime()); @@ -2410,8 +2980,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum++, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); const BSONObj expectedRC = BSON("level" << rcIt.first); @@ -2447,8 +3019,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, repl::ReadConcernArgs(clusterTime, rcIt.second); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum++, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), @@ -2468,8 +3042,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernLevelsPres repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(opTime, rcIt.second); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum++, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); // Call setDefaultAtClusterTime to simulate real command execution. txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2488,8 +3064,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); // // NoSuchTransaction is ignored when it is the top-level error code. @@ -2518,8 +3096,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); // // Retryable top-level error. @@ -2550,8 +3130,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); // // Non-retryable top-level error. @@ -2582,8 +3164,10 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); // Add some participants to the list. @@ -2599,8 +3183,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2614,8 +3200,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2639,8 +3227,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2664,8 +3254,10 @@ TEST_F( TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2690,8 +3282,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2715,8 +3309,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -2733,8 +3329,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, repl::ReadConcernArgs secondRequestEmptyReadConcern; repl::ReadConcernArgs::get(operationContext()) = secondRequestEmptyReadConcern; - txnRouter.beginOrContinueTxn( - operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn(operationContext(), + txnNum, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); // The router should throw regardless of whether the response says readOnly true or false. ASSERT_THROWS_CODE( @@ -2753,14 +3351,16 @@ TEST_F(TransactionRouterTestWithDefaultSession, auto opCtx = operationContext(); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(opCtx); txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {}); // Continue causes the _latestStmtId to be bumped. repl::ReadConcernArgs::get(opCtx) = repl::ReadConcernArgs(); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kContinue, 0 /* txnRetryCounter */); // Aborting will set the termination initiation state. auto future = launchAsync([&] { txnRouter.abortTransaction(opCtx); }); @@ -2779,7 +3379,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, auto opCtx = operationContext(); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(opCtx); txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {}); @@ -2801,7 +3402,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, auto opCtx = operationContext(); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(opCtx); txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {}); @@ -2812,7 +3414,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.getParticipant(shard1)->readOnly); // Commit causes the _latestStmtId to be bumped. - txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn( + opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, 0 /* txnRetryCounter */); // Committing will set the termination initiation state. auto future = launchAsync([&] { txnRouter.commitTransaction(opCtx, boost::none); }); @@ -2837,8 +3440,10 @@ protected: TransactionRouterTestWithDefaultSession::setUp(); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); } }; @@ -2909,27 +3514,35 @@ protected: } void beginTxnWithDefaultTxnNumber() { - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter().setDefaultAtClusterTime(operationContext()); } void beginSlowTxnWithDefaultTxnNumber() { - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter().setDefaultAtClusterTime(operationContext()); tickSource()->advance(Milliseconds(serverGlobalParams.slowMS + 1)); } void beginRecoverCommitWithDefaultTxnNumber() { - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); txnRouter().setDefaultAtClusterTime(operationContext()); } void beginSlowRecoverCommitWithDefaultTxnNumber() { - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); txnRouter().setDefaultAtClusterTime(operationContext()); tickSource()->advance(Milliseconds(serverGlobalParams.slowMS + 1)); } @@ -3104,8 +3717,10 @@ protected: } void runRecoverWithTokenCommit(boost::optional<ShardId> recoveryShard) { - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(recoveryShard); @@ -3347,8 +3962,10 @@ TEST_F(TransactionRouterMetricsTest, SlowLoggingPrintsTimeActiveAndInactive) { tickSource()->advance(Microseconds(222222)); assertTimeInactiveIs(Microseconds(222222)); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); runCommit(kDummyOkRes); ASSERT_EQUALS( @@ -3786,8 +4403,10 @@ TEST_F(TransactionRouterMetricsTest, DurationResetByNewTransaction) { txnRouter().commitTransaction(operationContext(), kDummyRecoveryToken); // Start a new transaction and verify the duration was reset. - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); assertDurationIs(Microseconds(0)); tickSource()->advance(Microseconds(50)); @@ -3865,8 +4484,10 @@ TEST_F(TransactionRouterMetricsTest, CommitDurationResetByNewTransaction) { future.default_timed_get(); // Start a new transaction and verify the commit duration was reset. - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); future = beginAndPauseCommit(); @@ -3997,8 +4618,10 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceSeparatelyAndSu // Will not advance after commit. // Neither can advance after a successful commit. - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); runCommit(kDummyOkRes); tickSource()->advance(Microseconds(150)); @@ -4065,8 +4688,10 @@ TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedActiveTxn) { assertTimeInactiveIs(kDefaultTimeInactive); assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); tickSource()->advance(Microseconds(100)); assertTimeActiveIs(kDefaultTimeActive + Microseconds(100)); @@ -4089,8 +4714,10 @@ TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedEndedTxn) { assertTimeInactiveIs(kDefaultTimeInactive); assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); runCommit(kDummyOkRes); txnRouter().stash(operationContext()); @@ -4112,8 +4739,10 @@ TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterSuc setUpDefaultTimeActiveAndInactive(); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); runCommit(kDummyOkRes); // Neither can advance. @@ -4126,8 +4755,10 @@ TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterFai setUpDefaultTimeActiveAndInactive(); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); runCommit(kDummyErrorRes); // Neither can advance. @@ -4140,8 +4771,10 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom setUpDefaultTimeActiveAndInactive(); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); runCommit(Status(ErrorCodes::HostUnreachable, "dummy"), true /* expectRetries */); // timeActive can advance. @@ -4155,8 +4788,10 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom assertTimeActiveIs(kDefaultTimeActive + Microseconds(100)); assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100)); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); runCommit(kDummyRetryableErrorRes, true /* expectRetries */); // timeActive can advance. @@ -4170,8 +4805,10 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom assertTimeActiveIs(kDefaultTimeActive + Microseconds(200)); assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(200)); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kCommit, + 0 /* txnRetryCounter */); runCommit(kDummyOkRes); // The result is known, so neither can advance. @@ -4185,8 +4822,10 @@ TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterAbo setUpDefaultTimeActiveAndInactive(); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); ASSERT_THROWS_CODE(txnRouter().abortTransaction(operationContext()), AssertionException, ErrorCodes::NoSuchTransaction); @@ -4201,8 +4840,10 @@ TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterImp setUpDefaultTimeActiveAndInactive(); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus); // Neither can advance. @@ -4241,8 +4882,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_Stash) { TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAfterStash) { beginRecoverCommitWithDefaultTxnNumber(); txnRouter().stash(operationContext()); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive()); @@ -4256,22 +4899,28 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_AreNotCumulative) { ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive()); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive()); // Test inactive. - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 2, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 2, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter().stash(operationContext()); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive()); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 3, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 3, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter().stash(operationContext()); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); @@ -4328,8 +4977,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAndStashForEndedT ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive()); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); @@ -4402,8 +5053,10 @@ TEST_F(TransactionRouterTest, RouterMetricsCurrent_ReapForInactiveTxn) { // Start a transaction on the session. auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), 5, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + 5, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_EQUALS(1L, routerTxnMetrics->getCurrentOpen()); @@ -4506,8 +5159,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalStarted_IsCumulative) { beginTxnWithDefaultTxnNumber(); ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalStarted()); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalStarted()); // Shouldn't go down when a transaction ends. @@ -4558,8 +5213,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalCommitted_NotIncreasedByI TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalCommitted_NotIncreasedByAbandonedTransaction) { beginTxnWithDefaultTxnNumber(); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalCommitted()); } @@ -4574,8 +5231,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalCommitted_IsCumulative) { runCommit(kDummyOkRes); ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalCommitted()); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); runCommit(kDummyOkRes); ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalCommitted()); } @@ -4602,8 +5261,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalAborted_NotIncreasedByUnk TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalAborted_NotIncreasedByAbandonedTransaction) { beginTxnWithDefaultTxnNumber(); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalAborted()); } @@ -4640,8 +5301,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalAborted_IsCumulative) { ErrorCodes::NoSuchTransaction); ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalAborted()); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); ASSERT_THROWS_CODE(txnRouter().abortTransaction(operationContext()), AssertionException, ErrorCodes::NoSuchTransaction); @@ -4667,8 +5330,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalContactedParticipants) { ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalContactedParticipants()); // Is cumulative across transactions. - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalContactedParticipants()); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -4750,8 +5415,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalParticipantsAtCommit) { ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalParticipantsAtCommit()); // Is cumulative across transactions. - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); runCommit(kDummyOkRes); ASSERT_EQUALS(3L, routerTxnMetrics()->getTotalParticipantsAtCommit()); @@ -4832,8 +5499,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCommitTypeStatsSuccessfulDurat .successfulDurationMicros.load()); // Start a new transaction and verify that successful commit duration is cumulative. - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber + 1, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter().setDefaultAtClusterTime(operationContext()); future = beginAndPauseCommit(); tickSource()->advance(Microseconds(100)); @@ -4948,8 +5617,10 @@ TEST_F(TransactionRouterMetricsTest, ReportResourcesWithParticipantList) { txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); txnRouter().processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse); - txnRouter().beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue); + txnRouter().beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kContinue, + 0 /* txnRetryCounter */); // Verify participants array has been updated with proper ReadOnly responses. diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index 6f8f077859d..8f09574e608 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -1843,8 +1843,10 @@ public: _scopedSession.emplace(operationContext()); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); } @@ -1994,8 +1996,10 @@ public: _scopedSession.emplace(operationContext()); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); } @@ -2126,8 +2130,10 @@ public: _scopedSession.emplace(operationContext()); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); txnRouter.setDefaultAtClusterTime(operationContext()); } diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index 43fc32f13b2..7e53707d78d 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -1553,8 +1553,10 @@ public: _scopedSession.emplace(operationContext()); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter.beginOrContinueTxn( - operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn(operationContext(), + kTxnNumber, + TransactionRouter::TransactionActions::kStart, + 0 /* txnRetryCounter */); } void tearDown() override { diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp index 68851b61cd4..4ba35fb5470 100644 --- a/src/mongo/s/write_ops/write_op_test.cpp +++ b/src/mongo/s/write_ops/write_op_test.cpp @@ -411,7 +411,8 @@ TEST_F(WriteOpTransactionTest, TargetMultiAllShardsAndErrorSingleChildOp) { _opCtx->setTxnNumber(kTxnNumber); auto txnRouter = TransactionRouter::get(_opCtx); - txnRouter.beginOrContinueTxn(_opCtx, kTxnNumber, TransactionRouter::TransactionActions::kStart); + txnRouter.beginOrContinueTxn( + _opCtx, kTxnNumber, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */); // Do multi-target write op WriteOp writeOp(BatchItemRef(&request, 0), true); |