summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2021-08-19 05:37:25 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-09-01 22:02:16 +0000
commit22beeff23a26e44127a15587e8bfd84f1d1e916c (patch)
treebc88a80b18693021bdb60db0399d9ec20630e97c
parent9486a2779da1e8821b4b6d90ef3327a649c10b62 (diff)
downloadmongo-22beeff23a26e44127a15587e8bfd84f1d1e916c.tar.gz
SERVER-58752 Support retrying internal transactions on transient transaction errors
-rw-r--r--buildscripts/resmokeconfig/suites/multiversion_auth.yml5
-rw-r--r--jstests/multiVersion/internal_transactions_retry_on_transient_transaction_error.js66
-rw-r--r--jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js170
-rw-r--r--jstests/sharding/internal_transactions_retry_on_transient_transaction_error_validation.js100
-rw-r--r--src/mongo/base/error_codes.yml10
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/initialize_operation_session_info.cpp20
-rw-r--r--src/mongo/db/logical_session_id.h2
-rw-r--r--src/mongo/db/logical_session_id.idl16
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp36
-rw-r--r--src/mongo/db/operation_context.cpp7
-rw-r--r--src/mongo/db/operation_context.h19
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_service.cpp4
-rw-r--r--src/mongo/db/repl/tenant_oplog_applier.cpp11
-rw-r--r--src/mongo/db/s/migration_destination_manager.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_data_copy_util.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp6
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp14
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp26
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp26
-rw-r--r--src/mongo/db/s/resharding_destined_recipient_test.cpp6
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp6
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp19
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp6
-rw-r--r--src/mongo/db/service_entry_point_common.cpp3
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp4
-rw-r--r--src/mongo/db/transaction_participant.cpp136
-rw-r--r--src/mongo/db/transaction_participant.h62
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp114
-rw-r--r--src/mongo/db/transaction_participant_test.cpp561
-rw-r--r--src/mongo/db/txn_retry_counter_too_old_info.cpp54
-rw-r--r--src/mongo/db/txn_retry_counter_too_old_info.h59
-rw-r--r--src/mongo/dbtests/storage_timestamp_tests.cpp21
-rw-r--r--src/mongo/idl/generic_argument.idl2
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/commands/document_shard_key_update_util.cpp3
-rw-r--r--src/mongo/s/commands/strategy.cpp5
-rw-r--r--src/mongo/s/transaction_router.cpp218
-rw-r--r--src/mongo/s/transaction_router.h38
-rw-r--r--src/mongo/s/transaction_router_test.cpp1279
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp18
-rw-r--r--src/mongo/s/write_ops/batch_write_op_test.cpp6
-rw-r--r--src/mongo/s/write_ops/write_op_test.cpp3
43 files changed, 2567 insertions, 608 deletions
diff --git a/buildscripts/resmokeconfig/suites/multiversion_auth.yml b/buildscripts/resmokeconfig/suites/multiversion_auth.yml
index a5a386b0a10..a4ba52a8b83 100644
--- a/buildscripts/resmokeconfig/suites/multiversion_auth.yml
+++ b/buildscripts/resmokeconfig/suites/multiversion_auth.yml
@@ -25,9 +25,10 @@ selector:
# Skip any tests that run with auth explicitly.
- jstests/multiVersion/load_keys_on_upgrade.js
- # TODO (SERVER-59343): Investigate AuthorizationSession error after running setFCV in
- # internal_sessions.js in multiversion_auth suite.
+ # TODO (SERVER-59343): Investigate AuthorizationSession error in internal session tests in
+ # auth suites.
- jstests/multiVersion/internal_sessions.js
+ - jstests/multiVersion/internal_transactions_retry_on_transient_transaction_error.js
exclude_with_any_tags:
- featureFlagToaster
diff --git a/jstests/multiVersion/internal_transactions_retry_on_transient_transaction_error.js b/jstests/multiVersion/internal_transactions_retry_on_transient_transaction_error.js
new file mode 100644
index 00000000000..02613c2310e
--- /dev/null
+++ b/jstests/multiVersion/internal_transactions_retry_on_transient_transaction_error.js
@@ -0,0 +1,66 @@
+/*
+ * Tests that the client can only retry a transaction that failed with a transient transaction error
+ * by attaching a higher txnRetryCounter when the FCV is latest.
+ *
+ * @tags: [requires_fcv_51, featureFlagInternalTransactions]
+ */
+(function() {
+'use strict';
+
+function runTest(downgradeFCV) {
+ load("jstests/libs/fail_point_util.js");
+
+ const st = new ShardingTest({shards: 1, rs: {nodes: 3}});
+ const shard0Rst = st.rs0;
+ const shard0Primary = shard0Rst.getPrimary();
+
+ const kDbName = "testDb";
+ const kCollName = "testColl";
+ const kNs = kDbName + "." + kCollName;
+ const testDB = shard0Primary.getDB(kDbName);
+
+ jsTest.log("Verify that txnRetryCounter is only supported in FCV latest");
+
+ assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: downgradeFCV}));
+
+ const sessionUUID = UUID();
+ const lsid = {id: sessionUUID};
+ const txnNumber = NumberLong(1);
+ configureFailPoint(shard0Primary,
+ "failCommand",
+ {
+ failInternalCommands: true,
+ failCommands: ["insert"],
+ errorCode: ErrorCodes.LockBusy,
+ namespace: kNs
+ },
+ {times: 1});
+ const insertCmdObj = {
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: lsid,
+ txnNumber: txnNumber,
+ startTransaction: true,
+ autocommit: false,
+ txnRetryCounter: NumberInt(0)
+ };
+ assert.commandFailedWithCode(testDB.runCommand(insertCmdObj), ErrorCodes.InvalidOptions);
+
+ assert.commandWorked(st.s.adminCommand({setFeatureCompatibilityVersion: latestFCV}));
+
+ assert.commandFailedWithCode(testDB.runCommand(insertCmdObj), ErrorCodes.LockBusy);
+ insertCmdObj.txnRetryCounter = NumberInt(1);
+ assert.commandWorked(testDB.runCommand(insertCmdObj));
+ assert.commandWorked(testDB.adminCommand({
+ commitTransaction: 1,
+ lsid: lsid,
+ txnNumber: txnNumber,
+ autocommit: false,
+ txnRetryCounter: insertCmdObj.txnRetryCounter
+ }));
+
+ st.stop();
+}
+
+runFeatureFlagMultiversionTest('featureFlagInternalTransactions', runTest);
+})();
diff --git a/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js
new file mode 100644
index 00000000000..e727716666a
--- /dev/null
+++ b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_basic.js
@@ -0,0 +1,170 @@
+/*
+ * Tests that a client can retry a transaction that failed with a transient transaction error by
+ * attaching a higher txnRetryCounter.
+ *
+ * @tags: [requires_fcv_51, featureFlagInternalTransactions]
+ */
+(function() {
+'use strict';
+
+load("jstests/libs/fail_point_util.js");
+
+const kDbName = "testDb";
+const kCollName = "testColl";
+const kNs = kDbName + "." + kCollName;
+
+const st = new ShardingTest({shards: 1, rs: {nodes: 3}});
+const shard0Rst = st.rs0;
+const shard0Primary = shard0Rst.getPrimary();
+
+const mongosTestDB = st.s.getDB(kDbName);
+const shard0TestDB = shard0Primary.getDB(kDbName);
+assert.commandWorked(mongosTestDB.createCollection(kCollName));
+
+function testCommitAfterRetry(db, lsid, txnNumber) {
+ const txnRetryCounter0 = NumberInt(0);
+ const txnRetryCounter1 = NumberInt(1);
+
+ jsTest.log(
+ "Verify that the client can retry a transaction that failed with a transient " +
+ "transaction error by attaching a higher txnRetryCounter and commit the transaction");
+ configureFailPoint(shard0Primary,
+ "failCommand",
+ {
+ failInternalCommands: true,
+ failCommands: ["insert"],
+ errorCode: ErrorCodes.LockBusy,
+ namespace: kNs
+ },
+ {times: 1});
+ const insertCmdObj0 = {
+ insert: kCollName,
+ documents: [{x: 0}],
+ lsid: lsid,
+ txnNumber: txnNumber,
+ startTransaction: true,
+ autocommit: false,
+ };
+ assert.commandFailedWithCode(
+ db.runCommand(Object.assign({}, insertCmdObj0, {txnRetryCounter: txnRetryCounter0})),
+ ErrorCodes.LockBusy);
+ assert.commandWorked(
+ db.runCommand(Object.assign({}, insertCmdObj0, {txnRetryCounter: txnRetryCounter1})));
+
+ jsTest.log("Verify that the client must attach the last used txnRetryCounter in all commands " +
+ "in the transaction");
+ const insertCmdObj1 = {
+ insert: kCollName,
+ documents: [{x: 1}],
+ lsid: lsid,
+ txnNumber: txnNumber,
+ autocommit: false,
+ };
+ const insertRes0 = assert.commandFailedWithCode(
+ db.runCommand(Object.assign({}, insertCmdObj1, {txnRetryCounter: txnRetryCounter0})),
+ ErrorCodes.TxnRetryCounterTooOld);
+ assert.eq(txnRetryCounter1, insertRes0.txnRetryCounter, insertRes0);
+ // txnRetryCounter defaults to 0.
+ const insertRes1 = assert.commandFailedWithCode(db.runCommand(insertCmdObj1),
+ ErrorCodes.TxnRetryCounterTooOld);
+ assert.eq(txnRetryCounter1, insertRes1.txnRetryCounter, insertRes1);
+ assert.commandWorked(
+ db.runCommand(Object.assign({}, insertCmdObj1, {txnRetryCounter: txnRetryCounter1})));
+
+ jsTest.log("Verify that the client must attach the last used txnRetryCounter in the " +
+ "commitTransaction command");
+ const commitCmdObj = {
+ commitTransaction: 1,
+ lsid: lsid,
+ txnNumber: txnNumber,
+ autocommit: false,
+ };
+ const commitRes = assert.commandFailedWithCode(
+ db.adminCommand(Object.assign({}, commitCmdObj, {txnRetryCounter: txnRetryCounter0})),
+ ErrorCodes.TxnRetryCounterTooOld);
+ assert.eq(txnRetryCounter1, commitRes.txnRetryCounter, commitRes);
+
+ assert.commandWorked(
+ db.adminCommand(Object.assign({}, commitCmdObj, {txnRetryCounter: txnRetryCounter1})));
+}
+
+function testAbortAfterRetry(db, lsid, txnNumber) {
+ const txnRetryCounter0 = NumberInt(0);
+ const txnRetryCounter1 = NumberInt(1);
+
+ jsTest.log("Verify that the client can retry a transaction that failed with a transient " +
+ "transaction error by attaching a higher txnRetryCounter and abort the transaction");
+ configureFailPoint(shard0Primary,
+ "failCommand",
+ {
+ failInternalCommands: true,
+ failCommands: ["insert"],
+ errorCode: ErrorCodes.LockBusy,
+ namespace: kNs
+ },
+ {times: 1});
+ const insertCmdObj0 = {
+ insert: kCollName,
+ documents: [{x: 0}],
+ lsid: lsid,
+ txnNumber: txnNumber,
+ startTransaction: true,
+ autocommit: false,
+ };
+ assert.commandFailedWithCode(
+ db.runCommand(Object.assign({}, insertCmdObj0, {txnRetryCounter: txnRetryCounter0})),
+ ErrorCodes.LockBusy);
+ assert.commandWorked(
+ db.runCommand(Object.assign({}, insertCmdObj0, {txnRetryCounter: txnRetryCounter1})));
+
+ jsTest.log("Verify that the client must attach the last used txnRetryCounter in the " +
+ "abortTransaction command");
+ const abortCmdObj = {
+ abortTransaction: 1,
+ lsid: lsid,
+ txnNumber: txnNumber,
+ autocommit: false,
+ };
+ const abortRes = assert.commandFailedWithCode(
+ db.adminCommand(Object.assign({}, abortCmdObj, {txnRetryCounter: txnRetryCounter0})),
+ ErrorCodes.TxnRetryCounterTooOld);
+ assert.eq(txnRetryCounter1, abortRes.txnRetryCounter, abortRes);
+
+ assert.commandWorked(
+ db.adminCommand(Object.assign({}, abortCmdObj, {txnRetryCounter: txnRetryCounter1})));
+}
+
+(() => {
+ jsTest.log("Test transactions in a sharded cluster");
+ const sessionUUID = UUID();
+ const lsid0 = {id: sessionUUID};
+ testCommitAfterRetry(mongosTestDB, lsid0, NumberLong(0));
+ testAbortAfterRetry(mongosTestDB, lsid0, NumberLong(1));
+
+ const lsid1 = {id: sessionUUID, txnNumber: NumberLong(1), stmtId: NumberInt(0)};
+ testCommitAfterRetry(mongosTestDB, lsid1, NumberLong(0));
+ testAbortAfterRetry(mongosTestDB, lsid1, NumberLong(1));
+
+ const lsid2 = {id: sessionUUID, txnUUID: UUID()};
+ testCommitAfterRetry(mongosTestDB, lsid2, NumberLong(0));
+ testAbortAfterRetry(mongosTestDB, lsid2, NumberLong(1));
+})();
+
+(() => {
+ jsTest.log("Test transactions in a replica set");
+ const sessionUUID = UUID();
+ const lsid0 = {id: sessionUUID};
+ testCommitAfterRetry(shard0TestDB, lsid0, NumberLong(0));
+ testAbortAfterRetry(shard0TestDB, lsid0, NumberLong(1));
+
+ const lsid1 = {id: sessionUUID, txnNumber: NumberLong(1), stmtId: NumberInt(0)};
+ testCommitAfterRetry(shard0TestDB, lsid1, NumberLong(0));
+ testAbortAfterRetry(shard0TestDB, lsid1, NumberLong(1));
+
+ const lsid2 = {id: sessionUUID, txnUUID: UUID()};
+ testCommitAfterRetry(shard0TestDB, lsid2, NumberLong(0));
+ testAbortAfterRetry(shard0TestDB, lsid2, NumberLong(1));
+})();
+
+st.stop();
+})();
diff --git a/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_validation.js b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_validation.js
new file mode 100644
index 00000000000..0f3119e8679
--- /dev/null
+++ b/jstests/sharding/internal_transactions_retry_on_transient_transaction_error_validation.js
@@ -0,0 +1,100 @@
+/*
+ * Tests that txnRetryCounter is only supported in sharded clusters and in transactions.
+ *
+ * @tags: [requires_fcv_51, featureFlagInternalTransactions]
+ */
+(function() {
+'use strict';
+
+load("jstests/libs/fail_point_util.js");
+
+const kDbName = "testDb";
+const kCollName = "testColl";
+const kNs = kDbName + "." + kCollName;
+
+(() => {
+ const rst = new ReplSetTest({nodes: 1});
+
+ rst.startSet();
+ rst.initiate();
+
+ const primary = rst.getPrimary();
+ const testDB = primary.getDB(kDbName);
+
+ jsTest.log("Verify that txnRetryCounter is only supported in sharded clusters");
+ const insertCmdObj = {
+ insert: kCollName,
+ documents: [{x: 0}],
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(1),
+ startTransaction: true,
+ autocommit: false,
+ txnRetryCounter: NumberInt(0)
+ };
+ assert.commandFailedWithCode(testDB.runCommand(insertCmdObj), ErrorCodes.InvalidOptions);
+ rst.stopSet();
+})();
+
+(() => {
+ const st = new ShardingTest({shards: 1});
+
+ jsTest.log("Verify that txnRetryCounter is supported on shardvr");
+ const shard0Primary = st.rs0.getPrimary();
+ const insertCmdObjShardSvr = {
+ insert: kCollName,
+ documents: [{x: 0}],
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false,
+ txnRetryCounter: NumberInt(0)
+ };
+ assert.commandWorked(shard0Primary.getDB(kDbName).runCommand(insertCmdObjShardSvr));
+ assert.commandWorked(shard0Primary.adminCommand({
+ abortTransaction: 1,
+ lsid: insertCmdObjShardSvr.lsid,
+ txnNumber: insertCmdObjShardSvr.txnNumber,
+ autocommit: false,
+ txnRetryCounter: NumberInt(0)
+ }));
+
+ jsTest.log("Verify that txnRetryCounter is supported on configsvr");
+ const configRSPrimary = st.configRS.getPrimary();
+ const deleteCmdObjConfigSvr = {
+ delete: "chunks",
+ deletes: [{q: {}, limit: 1}],
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(0),
+ startTransaction: true,
+ autocommit: false,
+ txnRetryCounter: NumberInt(0)
+ };
+ assert.commandWorked(configRSPrimary.getDB("config").runCommand(deleteCmdObjConfigSvr));
+ assert.commandWorked(configRSPrimary.adminCommand({
+ abortTransaction: 1,
+ lsid: deleteCmdObjConfigSvr.lsid,
+ txnNumber: deleteCmdObjConfigSvr.txnNumber,
+ autocommit: false,
+ }));
+
+ jsTest.log("Test that the client cannot specify txnRetryCounter in a retryable write command");
+ const mongosTestDB = st.s.getDB(kDbName);
+ const shard0TestDB = shard0Primary.getDB(kDbName);
+ const insertCmdObj = {
+ insert: kCollName,
+ documents: [{x: 0}],
+ lsid: {id: UUID()},
+ txnNumber: NumberLong(0),
+ txnRetryCounter: NumberInt(0)
+ };
+ assert.commandFailedWithCode(mongosTestDB.runCommand(insertCmdObj), ErrorCodes.InvalidOptions);
+ // TODO (SERVER-59343): Investigate AuthorizationSession error in internal session tests in
+ // auth suites.
+ if (!TestData.auth) {
+ assert.commandFailedWithCode(shard0TestDB.runCommand(insertCmdObj),
+ ErrorCodes.InvalidOptions);
+ }
+
+ st.stop();
+})();
+})();
diff --git a/src/mongo/base/error_codes.yml b/src/mongo/base/error_codes.yml
index 66319639eee..098402198ae 100644
--- a/src/mongo/base/error_codes.yml
+++ b/src/mongo/base/error_codes.yml
@@ -416,7 +416,7 @@ error_codes:
- {code: 336, name: TimeseriesBucketCleared, categories: [InternalOnly]}
- {code: 337, name: AuthenticationAbandoned, categories: [InternalOnly]}
-
+
- {code: 338, name: ReshardCollectionInProgress}
- {code: 339, name: NoSuchReshardCollection}
- {code: 340, name: ReshardCollectionCommitted}
@@ -428,14 +428,14 @@ error_codes:
# case where it would be possible for MongoS to retry where MongoD couldn't.
- {code: 343, name: ShardCannotRefreshDueToLocksHeld,
extra: ShardCannotRefreshDueToLocksHeldInfo}
-
+
- {code: 344, name: AuditingNotEnabled}
- {code: 345, name: RuntimeAuditConfigurationNotEnabled}
-
+
- {code: 346,name: ChangeStreamInvalidated, extra: ChangeStreamInvalidationInfo}
- {code: 347, name: APIMismatchError, categories: [VersionedAPIError,VoteAbortError]}
-
+
- {code: 348,name: ChangeStreamTopologyChange, extra: ChangeStreamTopologyChangeInfo}
- {code: 349, name: KeyPatternShorterThanBound}
@@ -451,6 +451,8 @@ error_codes:
- {code: 355, name: InterruptedDueToStorageChange,categories: [Interruption,CancellationError]}
+ - {code: 356, name: TxnRetryCounterTooOld, extra: TxnRetryCounterTooOldInfo}
+
# Error codes 4000-8999 are reserved.
# Non-sequential error codes for compatibility only)
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 964edce0dda..5beccabd31f 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -63,6 +63,7 @@ env.Library(
'field_ref_set.cpp',
'field_parser.cpp',
'keypattern.cpp',
+ 'txn_retry_counter_too_old_info.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/initialize_operation_session_info.cpp b/src/mongo/db/initialize_operation_session_info.cpp
index c655b9ae7f7..d50af9c090c 100644
--- a/src/mongo/db/initialize_operation_session_info.cpp
+++ b/src/mongo/db/initialize_operation_session_info.cpp
@@ -32,6 +32,7 @@
#include "mongo/db/initialize_operation_session_info.h"
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
@@ -96,6 +97,10 @@ OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext*
if (getParentSessionId(lsid)) {
uassert(ErrorCodes::InvalidOptions,
+ "Internal sessions are not enabled",
+ feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility));
+ uassert(ErrorCodes::InvalidOptions,
"Internal sessions are not supported outside of transactions",
osi.getTxnNumber() && osi.getAutocommit() && !osi.getAutocommit().value());
}
@@ -120,6 +125,21 @@ OperationSessionInfoFromClient initializeOperationSessionInfo(OperationContext*
*osi.getTxnNumber() >= 0);
opCtx->setTxnNumber(*osi.getTxnNumber());
+
+ if (auto txnRetryCounter = osi.getTxnRetryCounter()) {
+ // TODO (SERVER-58759): Add a uassert that the client is internal.
+ uassert(ErrorCodes::InvalidOptions,
+ "txnRetryCounter is not enabled",
+ feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility));
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "Cannot specify txnRetryCounter for a retryable write",
+ osi.getAutocommit().has_value());
+ uassert(ErrorCodes::InvalidOptions,
+ "txnRetryCounter cannot be negative",
+ txnRetryCounter >= 0);
+ opCtx->setTxnRetryCounter(*txnRetryCounter);
+ }
} else {
uassert(ErrorCodes::InvalidOptions,
"'autocommit' field requires a transaction number to also be specified",
diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h
index 01d74fa7cc4..3270eece38e 100644
--- a/src/mongo/db/logical_session_id.h
+++ b/src/mongo/db/logical_session_id.h
@@ -40,6 +40,7 @@ namespace mongo {
using TxnNumber = std::int64_t;
using StmtId = std::int32_t;
+using TxnRetryCounter = std::int32_t;
// Default value for unassigned statementId.
const StmtId kUninitializedStmtId = -1;
@@ -48,6 +49,7 @@ const StmtId kUninitializedStmtId = -1;
const StmtId kIncompleteHistoryStmtId = -2;
const TxnNumber kUninitializedTxnNumber = -1;
+const TxnRetryCounter kUninitializedTxnRetryCounter = -1;
class BSONObjBuilder;
class OperationContext;
diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl
index 1f356d5f046..4fcae11e9ba 100644
--- a/src/mongo/db/logical_session_id.idl
+++ b/src/mongo/db/logical_session_id.idl
@@ -55,6 +55,14 @@ types:
cpp_type: "std::int32_t"
deserializer: "mongo::BSONElement::_numberInt"
+ TxnRetryCounter:
+ description: "A strictly-increasing per-transaction counter that starts at 0 and is incremented
+ every time the transaction is internally retried on a transient transaction
+ error."
+ bson_serialization_type: int
+ cpp_type: "std::int32_t"
+ deserializer: "mongo::BSONElement::_numberInt"
+
structs:
InternalSessionFields:
description: "Internal sessiond id fields"
@@ -131,6 +139,10 @@ structs:
operation executes."
type: TxnNumber
optional: true
+ txnRetryCounter:
+ description: "The transaction retry counter for this transaction."
+ type: TxnRetryCounter
+ optional: true
OperationSessionInfoFromClient:
description: "Parser for pulling out the sessionId/txnNumber combination from commands"
@@ -145,6 +157,10 @@ structs:
operation executes."
type: TxnNumber
optional: true
+ txnRetryCounter:
+ description: "The transaction retry counter for this transaction."
+ type: TxnRetryCounter
+ optional: true
autocommit:
type: bool
optional: true
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 4bbe23b0a66..1cc220d3746 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -593,7 +593,11 @@ public:
NamespaceString nss,
TxnNumber txnNum,
StmtId stmtId) {
- txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
{
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
@@ -766,7 +770,11 @@ class OpObserverTransactionTest : public OpObserverTxnParticipantTest {
public:
void setUp() override {
OpObserverTxnParticipantTest::setUp();
- txnParticipant().beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, true);
+ txnParticipant().beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
protected:
@@ -1454,7 +1462,11 @@ class OpObserverRetryableFindAndModifyTest : public OpObserverTxnParticipantTest
public:
void setUp() override {
OpObserverTxnParticipantTest::setUp();
- txnParticipant().beginOrContinue(opCtx(), txnNum(), boost::none, boost::none);
+ txnParticipant().beginOrContinue(opCtx(),
+ txnNum(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
void tearDown() override {
@@ -1660,7 +1672,11 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
opCtx->setTxnNumber(TxnNumber(testIdx));
contextSession.emplace(opCtx);
txnParticipant.emplace(TransactionParticipant::get(opCtx));
- txnParticipant->beginOrContinue(opCtx, TxnNumber(testIdx), boost::none, boost::none);
+ txnParticipant->beginOrContinue(opCtx,
+ TxnNumber(testIdx),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
if (testCase.imageType == StoreDocOption::None && !testCase.alwaysRecordPreImages) {
@@ -1787,7 +1803,11 @@ TEST_F(OpObserverTest, TestFundamentalOnInsertsOutputs) {
opCtx->setTxnNumber(TxnNumber(testIdx));
contextSession.emplace(opCtx);
txnParticipant.emplace(TransactionParticipant::get(opCtx));
- txnParticipant->beginOrContinue(opCtx, TxnNumber(testIdx), boost::none, boost::none);
+ txnParticipant->beginOrContinue(opCtx,
+ TxnNumber(testIdx),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
// Phase 2: Call the code we're testing.
@@ -1893,7 +1913,11 @@ TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) {
opCtx->setTxnNumber(TxnNumber(testIdx));
contextSession.emplace(opCtx);
txnParticipant.emplace(TransactionParticipant::get(opCtx));
- txnParticipant->beginOrContinue(opCtx, TxnNumber(testIdx), boost::none, boost::none);
+ txnParticipant->beginOrContinue(opCtx,
+ TxnNumber(testIdx),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
OpObserver::OplogDeleteEntryArgs deleteArgs;
switch (testCase.retryableOptions) {
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index cb97d6a197a..0b10331e968 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -406,6 +406,13 @@ void OperationContext::setTxnNumber(TxnNumber txnNumber) {
_txnNumber = txnNumber;
}
+void OperationContext::setTxnRetryCounter(TxnRetryCounter txnRetryCounter) {
+ invariant(_lsid);
+ invariant(_txnNumber);
+ invariant(!_txnRetryCounter.has_value());
+ _txnRetryCounter = txnRetryCounter;
+}
+
std::unique_ptr<RecoveryUnit> OperationContext::releaseRecoveryUnit() {
return std::move(_recoveryUnit);
}
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 010ab14756c..36c2c49a83d 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -235,6 +235,13 @@ public:
}
/**
+ * Returns the txnRetryCounter associated with this operation.
+ */
+ boost::optional<TxnRetryCounter> getTxnRetryCounter() const {
+ return _txnRetryCounter;
+ }
+
+ /**
* Returns a CancellationToken that will be canceled when the OperationContext is killed via
* markKilled (including for internal reasons, like the OperationContext deadline being
* reached).
@@ -264,6 +271,13 @@ public:
void setTxnNumber(TxnNumber txnNumber);
/**
+ * Associates a txnRetryCounter with this operation context. May only be called once for the
+ * lifetime of the operation and the operation must have a logical session id and a transaction
+ * number assigned.
+ */
+ void setTxnRetryCounter(TxnRetryCounter txnRetryCounter);
+
+ /**
* Returns the top-level WriteUnitOfWork associated with this operation context, if any.
*/
WriteUnitOfWork* getWriteUnitOfWork() {
@@ -443,6 +457,9 @@ public:
*/
void setInMultiDocumentTransaction() {
_inMultiDocumentTransaction = true;
+ if (!_txnRetryCounter.has_value()) {
+ _txnRetryCounter = 0;
+ }
}
/**
@@ -495,6 +512,7 @@ public:
_isStartingMultiDocumentTransaction = false;
_lsid = boost::none;
_txnNumber = boost::none;
+ _txnRetryCounter = boost::none;
}
/**
@@ -654,6 +672,7 @@ private:
boost::optional<LogicalSessionId> _lsid;
boost::optional<TxnNumber> _txnNumber;
+ boost::optional<TxnRetryCounter> _txnRetryCounter;
std::unique_ptr<Locker> _locker;
diff --git a/src/mongo/db/repl/tenant_migration_recipient_service.cpp b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
index 267bbc2ab5c..e996689c44b 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_service.cpp
+++ b/src/mongo/db/repl/tenant_migration_recipient_service.cpp
@@ -958,6 +958,8 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNumber);
opCtx->setInMultiDocumentTransaction();
+ auto txnRetryCounter = *opCtx->getTxnRetryCounter();
+ invariant(txnRetryCounter == 0);
MongoDOperationContextSession ocs(opCtx);
LOGV2_DEBUG(5351301,
@@ -994,7 +996,7 @@ void TenantMigrationRecipientService::Instance::_processCommittedTransactionEntr
return;
}
- txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, txnNumber);
+ txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, txnNumber, txnRetryCounter);
MutableOplogEntry noopEntry;
noopEntry.setOpType(repl::OpTypeEnum::kNoop);
diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp
index 7ca766859e9..a4a5ab525c3 100644
--- a/src/mongo/db/repl/tenant_oplog_applier.cpp
+++ b/src/mongo/db/repl/tenant_oplog_applier.cpp
@@ -579,6 +579,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNumber);
opCtx->setInMultiDocumentTransaction();
+ auto txnRetryCounter = *opCtx->getTxnRetryCounter();
+ invariant(txnRetryCounter == 0);
LOGV2_DEBUG(5351502,
1,
"Tenant Oplog Applier committing transaction",
@@ -605,7 +607,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
<< " because the transaction number "
<< txnParticipant.getActiveTxnNumber() << " has already started",
txnParticipant.getActiveTxnNumber() < txnNumber);
- txnParticipant.beginOrContinueTransactionUnconditionally(opCtx.get(), txnNumber);
+ txnParticipant.beginOrContinueTransactionUnconditionally(
+ opCtx.get(), txnNumber, txnRetryCounter);
// Only set sessionId and txnNumber for the final applyOp in a transaction.
noopEntry.setSessionId(sessionId);
@@ -735,7 +738,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
txnParticipant.beginOrContinue(opCtx.get(),
txnNumber,
boost::none /* autocommit */,
- boost::none /* startTransaction */);
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
// We could have an existing lastWriteOpTime for the same retryable write chain from a
// previously aborted migration. This could also happen if the tenant being migrated has
@@ -766,7 +770,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange(
txnParticipant.beginOrContinue(opCtx.get(),
txnNumber,
boost::none /* autocommit */,
- boost::none /* startTransaction */);
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
// We should never process the same donor statement twice, except in failover
diff --git a/src/mongo/db/s/migration_destination_manager.cpp b/src/mongo/db/s/migration_destination_manager.cpp
index 1451fd2eced..5f81beb94fa 100644
--- a/src/mongo/db/s/migration_destination_manager.cpp
+++ b/src/mongo/db/s/migration_destination_manager.cpp
@@ -108,7 +108,8 @@ void checkOutSessionAndVerifyTxnState(OperationContext* opCtx) {
TransactionParticipant::get(opCtx).beginOrContinue(opCtx,
*opCtx->getTxnNumber(),
boost::none /* autocommit */,
- boost::none /* startTransaction */);
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
template <typename Callable>
@@ -921,7 +922,8 @@ void MigrationDestinationManager::_migrateThread() {
txnParticipant.beginOrContinue(opCtx,
*opCtx->getTxnNumber(),
boost::none /* autocommit */,
- boost::none /* startTransaction */);
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
_migrateDriver(opCtx);
} catch (...) {
_setStateFail(str::stream() << "migrate failed: " << redact(exceptionToStatus()));
diff --git a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
index 3461c15b815..745e83a40f9 100644
--- a/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
+++ b/src/mongo/db/s/resharding/resharding_data_copy_util.cpp
@@ -249,7 +249,11 @@ boost::optional<SharedSemiFuture<void>> withSessionCheckedOut(OperationContext*
auto txnParticipant = TransactionParticipant::get(opCtx);
try {
- txnParticipant.beginOrContinue(opCtx, txnNumber, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNumber,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
if (stmtId && txnParticipant.checkStatementExecuted(opCtx, *stmtId)) {
// Skip the incoming statement because it has already been logged locally.
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index 104e79cdd37..5a71b14a7da 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -86,7 +86,11 @@ void runWithTransaction(OperationContext* opCtx, unique_function<void(OperationC
}
});
- txnParticipant.beginOrContinue(asr.opCtx(), txnNumber, false, true);
+ txnParticipant.beginOrContinue(asr.opCtx(),
+ txnNumber,
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(asr.opCtx(), "reshardingOplogApplication");
func(asr.opCtx());
diff --git a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
index 9691a947954..f89da1f5b2d 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_batch_applier_test.cpp
@@ -177,8 +177,11 @@ public:
MongoDOperationContextSession ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(
- opCtx, txnNumber, false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNumber,
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(opCtx, "prepareTransaction");
@@ -198,8 +201,11 @@ public:
MongoDOperationContextSession ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(
- opCtx, txnNumber, false /* autocommit */, boost::none /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNumber,
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(opCtx, "abortTransaction");
txnParticipant.abortTransaction(opCtx);
diff --git a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp
index 61a519c7493..29c6fd3c26b 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_session_application_test.cpp
@@ -79,7 +79,11 @@ public:
MongoDOperationContextSession ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(opCtx, txnNumber, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNumber,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
WriteUnitOfWork wuow(opCtx);
auto opTime = repl::getNextOpTime(opCtx);
@@ -102,8 +106,11 @@ public:
MongoDOperationContextSession ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(
- opCtx, txnNumber, false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNumber,
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(opCtx, "prepareTransaction");
@@ -123,8 +130,11 @@ public:
MongoDOperationContextSession ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(
- opCtx, txnNumber, false /* autocommit */, boost::none /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNumber,
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(opCtx, "abortTransaction");
txnParticipant.abortTransaction(opCtx);
@@ -255,7 +265,11 @@ public:
MongoDOperationContextSession ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(opCtx, txnNumber, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNumber,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT_TRUE(bool(txnParticipant.checkStatementExecuted(opCtx, stmtId)));
}
};
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
index e302f943693..ba1efd5a017 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
@@ -243,9 +243,17 @@ protected:
auto txnParticipant = TransactionParticipant::get(opCtx);
ASSERT(txnParticipant);
if (multiDocTxn) {
- txnParticipant.beginOrContinue(opCtx, txnNum, false, true);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNum,
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
} else {
- txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
}
@@ -389,8 +397,11 @@ protected:
MongoDOperationContextSession ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(
- opCtx, txnNumber, false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNumber,
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(opCtx, "prepareTransaction");
@@ -410,8 +421,11 @@ protected:
MongoDOperationContextSession ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(
- opCtx, txnNumber, false /* autocommit */, boost::none /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNumber,
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(opCtx, "abortTransaction");
txnParticipant.abortTransaction(opCtx);
diff --git a/src/mongo/db/s/resharding_destined_recipient_test.cpp b/src/mongo/db/s/resharding_destined_recipient_test.cpp
index 98d534ca694..6592bb4f35d 100644
--- a/src/mongo/db/s/resharding_destined_recipient_test.cpp
+++ b/src/mongo/db/s/resharding_destined_recipient_test.cpp
@@ -70,7 +70,11 @@ void runInTransaction(OperationContext* opCtx, Callable&& func) {
auto txnParticipant = TransactionParticipant::get(opCtx);
ASSERT(txnParticipant);
- txnParticipant.beginOrContinue(opCtx, txnNum, false, true);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNum,
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(opCtx, "SetDestinedRecipient");
func();
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 80226a9c463..8350f974581 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -247,7 +247,11 @@ ProcessOplogResult processSessionOplog(const BSONObj& oplogBSON,
auto txnParticipant = TransactionParticipant::get(opCtx);
try {
- txnParticipant.beginOrContinue(opCtx, result.txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx,
+ result.txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
if (txnParticipant.checkStatementExecuted(opCtx, stmtIds.front())) {
// Skip the incoming statement because it has already been logged locally
return lastResult;
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 7d770e47cff..446fedd435a 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -183,7 +183,11 @@ public:
opCtx->setTxnNumber(txnNum);
MongoDOperationContextSession ocs(opCtx);
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinue(opCtx, txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx,
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
void checkOplog(const repl::OplogEntry& originalOplog, const repl::OplogEntry& oplogToCheck) {
@@ -250,8 +254,11 @@ public:
initializeOperationSessionInfo(innerOpCtx.get(), insertBuilder.obj(), true, true, true);
MongoDOperationContextSession sessionTxnState(innerOpCtx.get());
auto txnParticipant = TransactionParticipant::get(innerOpCtx.get());
- txnParticipant.beginOrContinue(
- innerOpCtx.get(), *sessionInfo.getTxnNumber(), boost::none, boost::none);
+ txnParticipant.beginOrContinue(innerOpCtx.get(),
+ *sessionInfo.getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
const auto reply = write_ops_exec::performInserts(innerOpCtx.get(), insertRequest);
ASSERT(reply.results.size() == 1);
@@ -1879,7 +1886,11 @@ TEST_F(SessionCatalogMigrationDestinationTest,
auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant.refreshFromStorageIfNeeded(opCtx);
- txnParticipant.beginOrContinue(opCtx, 3, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx,
+ 3,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
OperationSessionInfo sessionInfo2;
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index b228c301d95..3fce3dda429 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -314,7 +314,8 @@ public:
txnParticipant.beginOrContinue(opCtx,
*opCtx->getTxnNumber(),
false /* autocommit */,
- boost::none /* startTransaction */);
+ boost::none /* startTransaction */,
+ *opCtx->getTxnRetryCounter());
if (txnParticipant.transactionIsCommitted())
return;
@@ -336,7 +337,8 @@ public:
txnParticipant.beginOrContinue(opCtx,
*opCtx->getTxnNumber(),
false /* autocommit */,
- boost::none /* startTransaction */);
+ boost::none /* startTransaction */,
+ *opCtx->getTxnRetryCounter());
invariant(!txnParticipant.transactionIsOpen(),
"The participant should not be in progress after we waited for the "
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 5e2bd446b35..64cf8a9ff1b 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -862,7 +862,8 @@ void CheckoutSessionAndInvokeCommand::_checkOutSession() {
_txnParticipant->beginOrContinue(opCtx,
*sessionOptions.getTxnNumber(),
sessionOptions.getAutocommit(),
- sessionOptions.getStartTransaction());
+ sessionOptions.getStartTransaction(),
+ sessionOptions.getTxnRetryCounter());
beganOrContinuedTxn = true;
} catch (const ExceptionFor<ErrorCodes::PreparedTransactionInProgress>&) {
auto prepareCompleted = _txnParticipant->onExitPrepare();
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index 075e5bda2cc..6e1878dc582 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -482,9 +482,11 @@ MongoDOperationContextSessionWithoutRefresh::MongoDOperationContextSessionWithou
: _operationContextSession(opCtx), _opCtx(opCtx) {
invariant(!opCtx->getClient()->isInDirectClient());
const auto clientTxnNumber = *opCtx->getTxnNumber();
+ const auto clientTxnRetryCounter = *opCtx->getTxnRetryCounter();
auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant.beginOrContinueTransactionUnconditionally(opCtx, clientTxnNumber);
+ txnParticipant.beginOrContinueTransactionUnconditionally(
+ opCtx, clientTxnNumber, clientTxnRetryCounter);
}
MongoDOperationContextSessionWithoutRefresh::~MongoDOperationContextSessionWithoutRefresh() {
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index a9cfe387ad9..c588ec4d22a 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -64,6 +64,7 @@
#include "mongo/db/storage/flow_control.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/db/transaction_participant_gen.h"
+#include "mongo/db/txn_retry_counter_too_old_info.h"
#include "mongo/db/vector_clock_mutable.h"
#include "mongo/logv2/log.h"
#include "mongo/util/fail_point.h"
@@ -423,11 +424,11 @@ const LogicalSessionId& TransactionParticipant::Observer::_sessionId() const {
return owningSession->getSessionId();
}
-void TransactionParticipant::Participant::_beginOrContinueRetryableWrite(OperationContext* opCtx,
- TxnNumber txnNumber) {
+void TransactionParticipant::Participant::_beginOrContinueRetryableWrite(
+ OperationContext* opCtx, const TxnNumber& txnNumber) {
if (txnNumber > o().activeTxnNumber) {
// New retryable write.
- _setNewTxnNumber(opCtx, txnNumber);
+ _setNewTxnNumber(opCtx, txnNumber, kUninitializedTxnRetryCounter);
p().autoCommit = boost::none;
} else {
// Retrying a retryable write.
@@ -438,8 +439,8 @@ void TransactionParticipant::Participant::_beginOrContinueRetryableWrite(Operati
}
}
-void TransactionParticipant::Participant::_continueMultiDocumentTransaction(OperationContext* opCtx,
- TxnNumber txnNumber) {
+void TransactionParticipant::Participant::_continueMultiDocumentTransaction(
+ OperationContext* opCtx, const TxnNumber& txnNumber, const TxnRetryCounter& txnRetryCounter) {
uassert(ErrorCodes::NoSuchTransaction,
str::stream()
<< "Given transaction number " << txnNumber
@@ -447,6 +448,19 @@ void TransactionParticipant::Participant::_continueMultiDocumentTransaction(Oper
<< o().activeTxnNumber,
txnNumber == o().activeTxnNumber && !o().txnState.isInRetryableWriteMode());
+ uassert(TxnRetryCounterTooOldInfo(o().activeTxnRetryCounter),
+ str::stream() << "Cannot continue transaction " << txnNumber << " on session "
+ << _sessionId() << " using txnRetryCounter " << txnRetryCounter
+ << " because it has already been restarted using a higher"
+ << " txnRetryCounter " << o().activeTxnRetryCounter,
+ txnRetryCounter >= o().activeTxnRetryCounter);
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Cannot continue transaction " << txnNumber << " on session "
+ << _sessionId() << " using txnRetryCounter " << txnRetryCounter
+ << " because it is currently in state " << o().txnState
+ << " with txnRetryCounter " << o().activeTxnRetryCounter,
+ txnRetryCounter == o().activeTxnRetryCounter);
+
if (o().txnState.isInProgress() && !o().txnResourceStash) {
// This indicates that the first command in the transaction failed but did not implicitly
// abort the transaction. It is not safe to continue the transaction, in particular because
@@ -467,13 +481,58 @@ void TransactionParticipant::Participant::_continueMultiDocumentTransaction(Oper
<< "Transaction " << txnNumber
<< " has been aborted because an earlier command in this transaction failed.");
}
- return;
}
-void TransactionParticipant::Participant::_beginMultiDocumentTransaction(OperationContext* opCtx,
- TxnNumber txnNumber) {
+void TransactionParticipant::Participant::_beginMultiDocumentTransaction(
+ OperationContext* opCtx, const TxnNumber& txnNumber, const TxnRetryCounter& txnRetryCounter) {
+ if (txnNumber == o().activeTxnNumber) {
+ if (txnRetryCounter < o().activeTxnRetryCounter) {
+ uasserted(TxnRetryCounterTooOldInfo(o().activeTxnRetryCounter),
+ str::stream()
+ << "Cannot start a transaction at given transaction number " << txnNumber
+ << " on session " << _sessionId() << " using txnRetryCounter "
+ << txnRetryCounter << " because it has already been restarted using a "
+ << "higher txnRetryCounter " << o().activeTxnRetryCounter);
+ } else if (txnRetryCounter == o().activeTxnRetryCounter ||
+ o().activeTxnRetryCounter == kUninitializedTxnRetryCounter) {
+ // Servers in a sharded cluster can start a new transaction at the active transaction
+ // number to allow internal retries by routers on re-targeting errors, like
+ // StaleShard/DatabaseVersion or SnapshotTooOld.
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Only servers in a sharded cluster can start a new transaction at the active "
+ "transaction number",
+ serverGlobalParams.clusterRole != ClusterRole::None);
+
+ // The active transaction number can only be reused if:
+ // 1. The transaction participant is in retryable write mode and has not yet executed a
+ // retryable write, or
+ // 2. A transaction is aborted and has not been involved in a two phase commit.
+ //
+ // Assuming routers target primaries in increasing order of term and in the absence of
+ // byzantine messages, this check should never fail.
+ const auto restartableStates =
+ TransactionState::kNone | TransactionState::kAbortedWithoutPrepare;
+ uassert(50911,
+ str::stream() << "Cannot start a transaction at given transaction number "
+ << txnNumber << " a transaction with the same number is in state "
+ << o().txnState,
+ o().txnState.isInSet(restartableStates));
+ } else {
+ const auto restartableStates = TransactionState::kNone | TransactionState::kInProgress |
+ TransactionState::kAbortedWithoutPrepare | TransactionState::kAbortedWithPrepare;
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Cannot restart transaction " << txnNumber
+ << " using txnRetryCounter " << txnRetryCounter
+ << " because it is already in state " << o().txnState
+ << " with txnRetryCounter " << o().activeTxnRetryCounter,
+ o().txnState.isInSet(restartableStates));
+ }
+ } else {
+ invariant(txnNumber > o().activeTxnNumber);
+ }
+
// Aborts any in-progress txns.
- _setNewTxnNumber(opCtx, txnNumber);
+ _setNewTxnNumber(opCtx, txnNumber, txnRetryCounter);
p().autoCommit = false;
stdx::lock_guard<Client> lk(*opCtx->getClient());
@@ -498,10 +557,12 @@ void TransactionParticipant::Participant::_beginMultiDocumentTransaction(Operati
invariant(p().transactionOperations.empty());
}
-void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCtx,
- TxnNumber txnNumber,
- boost::optional<bool> autocommit,
- boost::optional<bool> startTransaction) {
+void TransactionParticipant::Participant::beginOrContinue(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ boost::optional<bool> autocommit,
+ boost::optional<bool> startTransaction,
+ boost::optional<TxnRetryCounter> txnRetryCounter) {
// Make sure we are still a primary. We need to hold on to the RSTL through the end of this
// method, as we otherwise risk stepping down in the interim and incorrectly updating the
// transaction number, which can abort active transactions.
@@ -546,6 +607,8 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt
// startTransaction, which is verified earlier when parsing the request.
if (!autocommit) {
invariant(!startTransaction);
+ invariant(!txnRetryCounter.has_value(),
+ "Cannot specify a txnRetryCounter for retryable write");
_beginOrContinueRetryableWrite(opCtx, txnNumber);
return;
}
@@ -555,9 +618,17 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt
// is verified earlier when parsing the request.
invariant(*autocommit == false);
invariant(opCtx->inMultiDocumentTransaction());
+ if (txnRetryCounter.has_value()) {
+ uassert(ErrorCodes::InvalidOptions,
+ "txnRetryCounter is only supported in sharded clusters",
+ serverGlobalParams.clusterRole != ClusterRole::None);
+ invariant(*txnRetryCounter >= 0, "Cannot specify a negative txnRetryCounter");
+ } else {
+ txnRetryCounter = 0;
+ }
if (!startTransaction) {
- _continueMultiDocumentTransaction(opCtx, txnNumber);
+ _continueMultiDocumentTransaction(opCtx, txnNumber, *txnRetryCounter);
return;
}
@@ -565,37 +636,11 @@ void TransactionParticipant::Participant::beginOrContinue(OperationContext* opCt
// an argument on the request. The 'startTransaction' argument currently can only be specified
// as true, which is verified earlier, when parsing the request.
invariant(*startTransaction);
-
- if (txnNumber == o().activeTxnNumber) {
- // Servers in a sharded cluster can start a new transaction at the active transaction number
- // to allow internal retries by routers on re-targeting errors, like
- // StaleShard/DatabaseVersion or SnapshotTooOld.
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "Only servers in a sharded cluster can start a new transaction at the active "
- "transaction number",
- serverGlobalParams.clusterRole != ClusterRole::None);
-
- // The active transaction number can only be reused if:
- // 1. The transaction participant is in retryable write mode and has not yet executed a
- // retryable write, or
- // 2. A transaction is aborted and has not been involved in a two phase commit.
- //
- // Assuming routers target primaries in increasing order of term and in the absence of
- // byzantine messages, this check should never fail.
- const auto restartableStates =
- TransactionState::kNone | TransactionState::kAbortedWithoutPrepare;
- uassert(50911,
- str::stream() << "Cannot start a transaction at given transaction number "
- << txnNumber << " a transaction with the same number is in state "
- << o().txnState,
- o().txnState.isInSet(restartableStates));
- }
-
- _beginMultiDocumentTransaction(opCtx, txnNumber);
+ _beginMultiDocumentTransaction(opCtx, txnNumber, *txnRetryCounter);
}
void TransactionParticipant::Participant::beginOrContinueTransactionUnconditionally(
- OperationContext* opCtx, TxnNumber txnNumber) {
+ OperationContext* opCtx, TxnNumber txnNumber, TxnRetryCounter txnRetryCounter) {
invariant(opCtx->inMultiDocumentTransaction());
// We don't check or fetch any on-disk state, so treat the transaction as 'valid' for the
@@ -603,7 +648,7 @@ void TransactionParticipant::Participant::beginOrContinueTransactionUnconditiona
p().isValid = true;
if (o().activeTxnNumber != txnNumber) {
- _beginMultiDocumentTransaction(opCtx, txnNumber);
+ _beginMultiDocumentTransaction(opCtx, txnNumber, txnRetryCounter);
} else {
invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared),
str::stream() << "Current state: " << o().txnState);
@@ -2173,7 +2218,8 @@ void TransactionParticipant::Participant::_logSlowTransaction(
}
void TransactionParticipant::Participant::_setNewTxnNumber(OperationContext* opCtx,
- const TxnNumber& txnNumber) {
+ const TxnNumber& txnNumber,
+ const TxnRetryCounter& txnRetryCounter) {
uassert(ErrorCodes::PreparedTransactionInProgress,
"Cannot change transaction number while the session has a prepared transaction",
!o().txnState.isInSet(TransactionState::kPrepared));
@@ -2195,6 +2241,7 @@ void TransactionParticipant::Participant::_setNewTxnNumber(OperationContext* opC
stdx::lock_guard<Client> lk(*opCtx->getClient());
o(lk).activeTxnNumber = txnNumber;
+ o(lk).activeTxnRetryCounter = txnRetryCounter;
o(lk).lastWriteOpTime = repl::OpTime();
// Reset the retryable writes state
@@ -2229,6 +2276,7 @@ void TransactionParticipant::Participant::_refreshFromStorageIfNeeded(OperationC
if (lastTxnRecord) {
stdx::lock_guard<Client> lg(*opCtx->getClient());
o(lg).activeTxnNumber = lastTxnRecord->getTxnNum();
+ o(lg).activeTxnRetryCounter = lastTxnRecord->getState() ? 0 : kUninitializedTxnRetryCounter;
o(lg).lastWriteOpTime = lastTxnRecord->getLastWriteOpTime();
p().activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements);
p().hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory;
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index de02c0e3e9c..2ab95a71d5c 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -275,6 +275,14 @@ public:
}
/**
+ * Returns the last used transaction retry counter for the currently active transaction on
+ * this participant.
+ */
+ TxnRetryCounter getActiveTxnRetryCounter() const {
+ return o().activeTxnRetryCounter;
+ }
+
+ /**
* Returns the op time of the last committed write for this session and transaction. If no
* write has completed yet, returns an empty timestamp.
*/
@@ -401,13 +409,20 @@ public:
*
* 'startTransaction' comes from the 'startTransaction' field in the original client
* request. See below for the acceptable values and the meaning of the combinations of
- * autocommit and startTransaction.
+ * autocommit, startTransaction and txnRetryCounter.
*
- * autocommit = boost::none, startTransaction = boost::none: Means retryable write
- * autocommit = false, startTransaction = boost::none: Means continuation of a
- * multi-statement transaction
- * autocommit = false, startTransaction = true: Means abort whatever transaction is in
- * progress on the session and start a new transaction
+ * autocommit = boost::none, startTransaction = boost::none and txnRetryCounter =
+ * boost::none means retryable write.
+ *
+ * autocommit = false, startTransaction = boost::none and txnRetryCounter = last seen
+ * txnRetryCounter means continuation of a multi-statement transaction.
+ *
+ * autocommit = false, startTransaction = true, txnNumber = active txnNumber and
+ * txnRetryCounter > last seen txnRetryCounter (defaults to 0) means restart the existing
+ * transaction as long as it has not been committed or prepared.
+ *
+ * autocommit = false, startTransaction = true, txnNumber > active txnNumber means abort
+ * whatever transaction is in progress on the session and starts a new transaction.
*
* Any combination other than the ones listed above will invariant since it is expected that
* the caller has performed the necessary customer input validations.
@@ -415,6 +430,11 @@ public:
* Exceptions of note, which can be thrown are:
* - TransactionTooOld - if an attempt is made to start a transaction older than the
* currently active one or the last one which committed
+ * - TxnRetryCounterTooOld - if an attempt is made to start or continue a transaction with
+ * a txnRetryCounter less than the last seen one.
+ * - IllegalOperation - if an attempt is made to use a txnRetryCounter greater than the
+ * last seen one to continue a transaction, or to restart a transaction that has already
+ * been committed or prepared.
* - PreparedTransactionInProgress - if the transaction is in the prepared state and a new
* transaction or retryable write is attempted
* - NotWritablePrimary - if the node is not a primary when this method is called.
@@ -426,7 +446,8 @@ public:
void beginOrContinue(OperationContext* opCtx,
TxnNumber txnNumber,
boost::optional<bool> autocommit,
- boost::optional<bool> startTransaction);
+ boost::optional<bool> startTransaction,
+ boost::optional<TxnRetryCounter> txnRetryCounter);
/**
* Used only by the secondary oplog application logic. Similar to 'beginOrContinue' without
@@ -434,7 +455,8 @@ public:
* the past.
*/
void beginOrContinueTransactionUnconditionally(OperationContext* opCtx,
- TxnNumber txnNumber);
+ TxnNumber txnNumber,
+ TxnRetryCounter txnRetryCounter);
/**
* If the participant is in prepare, returns a future whose promise is fulfilled when
@@ -775,17 +797,24 @@ public:
repl::ReadConcernArgs readConcernArgs) const;
// Bumps up the transaction number of this transaction and perform the necessary cleanup.
- void _setNewTxnNumber(OperationContext* opCtx, const TxnNumber& txnNumber);
+ void _setNewTxnNumber(OperationContext* opCtx,
+ const TxnNumber& txnNumber,
+ const TxnRetryCounter& txnRetryCounter);
// Attempt to begin or retry a retryable write at the given transaction number.
- void _beginOrContinueRetryableWrite(OperationContext* opCtx, TxnNumber txnNumber);
+ void _beginOrContinueRetryableWrite(OperationContext* opCtx, const TxnNumber& txnNumber);
- // Attempt to begin a new multi document transaction at the given transaction number.
- void _beginMultiDocumentTransaction(OperationContext* opCtx, TxnNumber txnNumber);
+ // Attempt to begin a new multi document transaction at the given transaction number and
+ // transaction retry counter.
+ void _beginMultiDocumentTransaction(OperationContext* opCtx,
+ const TxnNumber& txnNumber,
+ const TxnRetryCounter& txnRetryCounter);
// Attempt to continue an in-progress multi document transaction at the given transaction
- // number.
- void _continueMultiDocumentTransaction(OperationContext* opCtx, TxnNumber txnNumber);
+ // number and transaction retry counter.
+ void _continueMultiDocumentTransaction(OperationContext* opCtx,
+ const TxnNumber& txnNumber,
+ const TxnRetryCounter& txnRetryCounter);
// Implementation of public refreshFromStorageIfNeeded methods.
void _refreshFromStorageIfNeeded(OperationContext* opCtx, bool fetchOplogEntries);
@@ -934,6 +963,11 @@ private:
// performed any writes.
TxnNumber activeTxnNumber{kUninitializedTxnNumber};
+ // Tracks the last seen txnRetryCounter for the the current transaction. Should always be
+ // kUninitializedTxnRetryCounter for a retryable write, and non-negative for a
+ // multi-statement transaction.
+ TxnRetryCounter activeTxnRetryCounter{kUninitializedTxnRetryCounter};
+
// Caches what is known to be the last optime written for the active transaction.
repl::OpTime lastWriteOpTime;
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index c2582f6ba60..915a6c5a697 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -229,7 +229,11 @@ protected:
boost::optional<DurableTxnStateEnum> txnState) {
const auto session = OperationContextSession::get(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
const auto uuid = UUID::gen();
@@ -291,7 +295,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin)
txnParticipant.refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 20;
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT(txnParticipant.getLastWriteOpTime().isNull());
DBDirectClient client(opCtx());
@@ -307,7 +315,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit
const auto& sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 21;
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
const auto opTime = writeTxnRecord(txnNum, {0}, {}, boost::none);
@@ -392,12 +404,19 @@ TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) {
txnParticipant.refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 20;
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
-
- ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(opCtx(), txnNum - 1, boost::none, boost::none),
- AssertionException,
- ErrorCodes::TransactionTooOld);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
+
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ txnNum - 1,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::TransactionTooOld);
ASSERT(txnParticipant.getLastWriteOpTime().isNull());
}
@@ -411,11 +430,18 @@ TEST_F(TransactionParticipantRetryableWritesTest,
StringBuilder sb;
sb << "Retryable write with txnNumber 21 is prohibited on session " << sessionId
<< " because a newer retryable write with txnNumber 22 has already started on this session.";
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
- ASSERT_THROWS_WHAT(
- txnParticipant.beginOrContinue(opCtx(), txnNum - 1, boost::none, boost::none),
- AssertionException,
- sb.str());
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
+ ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(),
+ txnNum - 1,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */),
+ AssertionException,
+ sb.str());
ASSERT(txnParticipant.getLastWriteOpTime().isNull());
}
@@ -430,8 +456,16 @@ TEST_F(TransactionParticipantRetryableWritesTest,
StringBuilder sb;
sb << "Cannot start transaction 21 on session " << sessionId
<< " because a newer retryable write with txnNumber 22 has already started on this session.";
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
- ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(), txnNum - 1, autocommit, boost::none),
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
+ ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(),
+ txnNum - 1,
+ autocommit,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */),
AssertionException,
sb.str());
ASSERT(txnParticipant.getLastWriteOpTime().isNull());
@@ -450,7 +484,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN
ASSERT(client.runCommand(nss.db().toString(), BSON("drop" << nss.coll()), dropResult));
const TxnNumber txnNum = 21;
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
@@ -471,7 +509,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) {
txnParticipant.refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 100;
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT(!txnParticipant.checkStatementExecuted(opCtx(), 1000));
ASSERT(!txnParticipant.checkStatementExecutedNoOplogEntryFetch(1000));
@@ -513,7 +555,11 @@ DEATH_TEST_REGEX_F(
const auto& sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 100;
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
const auto uuid = UUID::gen();
@@ -554,7 +600,11 @@ DEATH_TEST_REGEX_F(
const auto& sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 100;
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
const auto uuid = UUID::gen();
@@ -594,7 +644,11 @@ DEATH_TEST_REGEX_F(
txnParticipant.refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 100;
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
@@ -765,7 +819,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.refreshFromStorageIfNeeded(opCtx());
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
repl::MutableOplogEntry oplogEntry;
oplogEntry.setSessionId(sessionId);
@@ -871,7 +929,11 @@ TEST_F(ShardTxnParticipantRetryableWritesTest,
opCtx()->setTxnNumber(txnNum);
const auto uuid = UUID::gen();
- txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
{
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
@@ -892,7 +954,8 @@ TEST_F(ShardTxnParticipantRetryableWritesTest,
opCtx()->setInMultiDocumentTransaction();
ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(opCtx(), txnNum, autocommit, startTransaction),
+ txnParticipant.beginOrContinue(
+ opCtx(), txnNum, autocommit, startTransaction, boost::none /* txnRetryCounter */),
AssertionException,
50911);
@@ -901,7 +964,8 @@ TEST_F(ShardTxnParticipantRetryableWritesTest,
txnParticipant.refreshFromStorageIfNeeded(opCtx());
ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(opCtx(), txnNum, autocommit, startTransaction),
+ txnParticipant.beginOrContinue(
+ opCtx(), txnNum, autocommit, startTransaction, boost::none /* txnRetryCounter */),
AssertionException,
50911);
}
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 288872c78ee..ea5f53ca6bb 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -51,6 +51,7 @@
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/transaction_participant_gen.h"
+#include "mongo/db/txn_retry_counter_too_old_info.h"
#include "mongo/stdx/future.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
@@ -312,7 +313,11 @@ protected:
opCtx()->setInMultiDocumentTransaction();
auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, startNewTxn);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ startNewTxn /* startTransaction */,
+ boost::none /* txnRetryCounter */);
return opCtxSession;
}
@@ -385,7 +390,11 @@ TEST_F(TxnParticipantTest, TransactionThrowsLockTimeoutIfLockIsUnavailable) {
MongoDOperationContextSession newOpCtxSession(newOpCtx.get());
auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get());
- newTxnParticipant.beginOrContinue(newOpCtx.get(), newTxnNum, false, true);
+ newTxnParticipant.beginOrContinue(newOpCtx.get(),
+ newTxnNum,
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
newTxnParticipant.unstashTransactionResources(newOpCtx.get(), "insert");
Date_t t1 = Date_t::now();
@@ -455,10 +464,13 @@ TEST_F(TxnParticipantTest, CannotSpecifyStartTransactionOnInProgressTxn) {
ASSERT_TRUE(txnParticipant.transactionIsOpen());
// Cannot try to start a transaction that already started.
- ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, true),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(TxnParticipantTest, AutocommitRequiredOnEveryTxnOp) {
@@ -473,12 +485,20 @@ TEST_F(TxnParticipantTest, AutocommitRequiredOnEveryTxnOp) {
auto txnNum = *opCtx()->getTxnNumber();
// Omitting 'autocommit' after the first statement of a transaction should throw an error.
- ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(), txnNum, boost::none, boost::none),
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ txnNum,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */),
AssertionException,
ErrorCodes::IncompleteTransactionHistory);
// Including autocommit=false should succeed.
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
DEATH_TEST_F(TxnParticipantTest, AutocommitCannotBeTrue, "invariant") {
@@ -486,7 +506,11 @@ DEATH_TEST_F(TxnParticipantTest, AutocommitCannotBeTrue, "invariant") {
auto txnParticipant = TransactionParticipant::get(opCtx());
// Passing 'autocommit=true' is not allowed and should crash.
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), true, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ true /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
DEATH_TEST_F(TxnParticipantTest, StartTransactionCannotBeFalse, "invariant") {
@@ -494,7 +518,11 @@ DEATH_TEST_F(TxnParticipantTest, StartTransactionCannotBeFalse, "invariant") {
auto txnParticipant = TransactionParticipant::get(opCtx());
// Passing 'startTransaction=false' is not allowed and should crash.
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, false);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ false /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
TEST_F(TxnParticipantTest, SameTransactionPreservesStoredStatements) {
@@ -517,7 +545,11 @@ TEST_F(TxnParticipantTest, SameTransactionPreservesStoredStatements) {
txnParticipant.getTransactionOperationsForTest()[0].toBSON());
// Re-opening the same transaction should have no effect.
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT_BSONOBJ_EQ(operation.toBSON(),
txnParticipant.getTransactionOperationsForTest()[0].toBSON());
}
@@ -797,8 +829,11 @@ TEST_F(TxnParticipantTest, KillOpBeforeCommittingPreparedTransaction) {
// Check out the session and continue the transaction.
auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
auto newTxnParticipant = TransactionParticipant::get(opCtx);
- newTxnParticipant.beginOrContinue(
- opCtx, *(opCtx->getTxnNumber()), false, boost::none /*startNewTxn*/);
+ newTxnParticipant.beginOrContinue(opCtx,
+ *(opCtx->getTxnNumber()),
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
newTxnParticipant.unstashTransactionResources(opCtx, "commitTransaction");
newTxnParticipant.commitPreparedTransaction(opCtx, prepareTimestamp, boost::none);
@@ -839,8 +874,11 @@ TEST_F(TxnParticipantTest, KillOpBeforeAbortingPreparedTransaction) {
// Check out the session and continue the transaction.
auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
auto newTxnParticipant = TransactionParticipant::get(opCtx);
- newTxnParticipant.beginOrContinue(
- opCtx, *(opCtx->getTxnNumber()), false, boost::none /*startNewTxn*/);
+ newTxnParticipant.beginOrContinue(opCtx,
+ *(opCtx->getTxnNumber()),
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
newTxnParticipant.unstashTransactionResources(opCtx, "commitTransaction");
newTxnParticipant.commitPreparedTransaction(opCtx, prepareTimestamp, boost::none);
@@ -1144,10 +1182,13 @@ TEST_F(TxnParticipantTest, ContinuingATransactionWithNoResourcesAborts) {
MongoDOperationContextSession sessionCheckout(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, boost::none),
- AssertionException,
- ErrorCodes::NoSuchTransaction);
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
}
TEST_F(TxnParticipantTest, CannotStartNewTransactionIfNotPrimary) {
@@ -1158,10 +1199,13 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionIfNotPrimary) {
auto txnParticipant = TransactionParticipant::get(opCtx());
// Include 'autocommit=false' for transactions.
- ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, true),
- AssertionException,
- ErrorCodes::NotWritablePrimary);
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::NotWritablePrimary);
}
TEST_F(TxnParticipantTest, CannotStartRetryableWriteIfNotPrimary) {
@@ -1172,10 +1216,13 @@ TEST_F(TxnParticipantTest, CannotStartRetryableWriteIfNotPrimary) {
auto txnParticipant = TransactionParticipant::get(opCtx());
// Omit the 'autocommit' field for retryable writes.
- ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, true),
- AssertionException,
- ErrorCodes::NotWritablePrimary);
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ boost::none /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::NotWritablePrimary);
}
TEST_F(TxnParticipantTest, CannotContinueTransactionIfNotPrimary) {
@@ -1189,10 +1236,13 @@ TEST_F(TxnParticipantTest, CannotContinueTransactionIfNotPrimary) {
// Technically, the transaction should have been aborted on stepdown anyway, but it
// doesn't hurt to have this kind of coverage.
- ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, false),
- AssertionException,
- ErrorCodes::NotWritablePrimary);
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ false /* startTransaction */,
+ boost::none /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::NotWritablePrimary);
}
TEST_F(TxnParticipantTest, OlderTransactionFailsOnSessionWithNewerTransaction) {
@@ -1207,8 +1257,11 @@ TEST_F(TxnParticipantTest, OlderTransactionFailsOnSessionWithNewerTransaction) {
StringBuilder sb;
sb << "Cannot start transaction 19 on session " << sessionId
<< " because a newer transaction with txnNumber 20 has already started on this session.";
- ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber() - 1, autocommit, startTransaction),
+ ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber() - 1,
+ autocommit,
+ startTransaction,
+ boost::none /* txnRetryCounter */),
AssertionException,
sb.str());
ASSERT(txnParticipant.getLastWriteOpTime().isNull());
@@ -1225,8 +1278,11 @@ TEST_F(TxnParticipantTest, OldRetryableWriteFailsOnSessionWithNewerTransaction)
StringBuilder sb;
sb << "Retryable write with txnNumber 19 is prohibited on session " << sessionId
<< " because a newer transaction with txnNumber 20 has already started on this session.";
- ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber() - 1, boost::none, boost::none),
+ ASSERT_THROWS_WHAT(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber() - 1,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */),
AssertionException,
sb.str());
ASSERT(txnParticipant.getLastWriteOpTime().isNull());
@@ -1257,20 +1313,23 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr
auto guard = makeGuard([&]() { OperationContextSession::checkOut(opCtx()); });
// Try to start a new transaction while there is already a prepared transaction on the
// session. This should fail with a PreparedTransactionInProgress error.
- runFunctionFromDifferentOpCtx(
- [lsid = *opCtx()->getLogicalSessionId(),
- txnNumberToStart = *opCtx()->getTxnNumber() + 1](OperationContext* newOpCtx) {
- newOpCtx->setLogicalSessionId(lsid);
- newOpCtx->setTxnNumber(txnNumberToStart);
- newOpCtx->setInMultiDocumentTransaction();
-
- MongoDOperationContextSession ocs(newOpCtx);
- auto txnParticipant = TransactionParticipant::get(newOpCtx);
- ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(newOpCtx, txnNumberToStart, false, true),
- AssertionException,
- ErrorCodes::PreparedTransactionInProgress);
- });
+ runFunctionFromDifferentOpCtx([lsid = *opCtx()->getLogicalSessionId(),
+ txnNumberToStart = *opCtx()->getTxnNumber() +
+ 1](OperationContext* newOpCtx) {
+ newOpCtx->setLogicalSessionId(lsid);
+ newOpCtx->setTxnNumber(txnNumberToStart);
+ newOpCtx->setInMultiDocumentTransaction();
+
+ MongoDOperationContextSession ocs(newOpCtx);
+ auto txnParticipant = TransactionParticipant::get(newOpCtx);
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(newOpCtx,
+ txnNumberToStart,
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::PreparedTransactionInProgress);
+ });
}
ASSERT_FALSE(txnParticipant.transactionIsAborted());
@@ -1299,10 +1358,13 @@ TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) {
TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) {
MongoDOperationContextSession opCtxSession(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- ASSERT_THROWS_CODE(
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), false, boost::none),
- AssertionException,
- ErrorCodes::NoSuchTransaction);
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
}
// Tests that a transaction aborts if it becomes too large based on the server parameter
@@ -1420,8 +1482,11 @@ protected:
auto txnParticipant = TransactionParticipant::get(opCtx());
ASSERT(txnParticipant.transactionIsOpen());
- ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction),
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ autocommit,
+ startTransaction,
+ boost::none /* txnRetryCounter */),
AssertionException,
50911);
}
@@ -1437,8 +1502,11 @@ protected:
txnParticipant.abortTransaction(opCtx());
ASSERT(txnParticipant.transactionIsAborted());
- txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ autocommit,
+ startTransaction,
+ boost::none /* txnRetryCounter */);
ASSERT(txnParticipant.transactionIsOpen());
}
@@ -1453,8 +1521,11 @@ protected:
txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
txnParticipant.commitUnpreparedTransaction(opCtx());
- ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction),
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ autocommit,
+ startTransaction,
+ boost::none /* txnRetryCounter */),
AssertionException,
50911);
}
@@ -1473,8 +1544,11 @@ protected:
txnParticipant.addTransactionOperation(opCtx(), operation);
txnParticipant.prepareTransaction(opCtx(), {});
- ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction),
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ autocommit,
+ startTransaction,
+ boost::none /* txnRetryCounter */),
AssertionException,
50911);
}
@@ -1495,8 +1569,11 @@ protected:
ASSERT(txnParticipant.transactionIsAborted());
startTransaction = true;
- ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction),
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ autocommit,
+ startTransaction,
+ boost::none /* txnRetryCounter */),
AssertionException,
50911);
}
@@ -1505,14 +1582,21 @@ protected:
MongoDOperationContextSession opCtxSession(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT_FALSE(txnParticipant.transactionIsOpen());
auto autocommit = false;
auto startTransaction = true;
- txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ autocommit,
+ startTransaction,
+ boost::none /* txnRetryCounter */);
ASSERT(txnParticipant.transactionIsOpen());
}
@@ -2129,8 +2213,11 @@ TEST_F(TransactionsMetricsTest, TransactionErrorsBeforeUnstash) {
auto txnParticipant = TransactionParticipant::get(opCtx());
const bool autocommit = false;
const boost::optional<bool> startTransaction = boost::none;
- ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), autocommit, startTransaction),
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ autocommit,
+ startTransaction,
+ boost::none /* txnRetryCounter */),
AssertionException,
ErrorCodes::NoSuchTransaction);
@@ -2389,7 +2476,11 @@ TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldBeSetUponUnstashAndStash)
// Start a new transaction.
const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
- txnParticipant.beginOrContinue(opCtx(), higherTxnNum, false, true);
+ txnParticipant.beginOrContinue(opCtx(),
+ higherTxnNum,
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
// Time active should be zero for a new transaction.
ASSERT_EQ(txnParticipant.getSingleTransactionStatsForTest().getTimeActiveMicros(
@@ -2918,7 +3009,11 @@ TEST_F(TransactionsMetricsTest, ReportUnstashedResourcesForARetryableWrite) {
MongoDOperationContextSession opCtxSession(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(opCtx(), "find");
// Build a BSONObj containing the details which we expect to see reported when we invoke
@@ -2948,7 +3043,11 @@ TEST_F(TransactionsMetricsTest, UseAPIParametersOnOpCtxForARetryableWrite) {
MongoDOperationContextSession opCtxSession(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant.beginOrContinue(opCtx(), *opCtx()->getTxnNumber(), boost::none, boost::none);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
APIParameters secondAPIParameters = APIParameters();
secondAPIParameters.setAPIVersion("3");
@@ -4210,7 +4309,8 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyFalseIfInRetryableWrite) {
txnParticipant.beginOrContinue(opCtx(),
*opCtx()->getTxnNumber(),
boost::none /* autocommit */,
- boost::none /* startTransaction */);
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly());
}
@@ -4220,8 +4320,11 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyTrueIfInProgressAndOperati
ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly());
// Start a transaction.
- txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly());
txnParticipant.unstashTransactionResources(opCtx(), "find");
@@ -4235,8 +4338,11 @@ TEST_F(TxnParticipantTest,
ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly());
// Start a transaction.
- txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly());
txnParticipant.unstashTransactionResources(opCtx(), "insert");
@@ -4255,8 +4361,11 @@ TEST_F(TxnParticipantTest, ResponseMetadataHasReadOnlyFalseIfAborted) {
ASSERT_FALSE(txnParticipant.getResponseMetadata().getReadOnly());
// Start a transaction.
- txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT_TRUE(txnParticipant.getResponseMetadata().getReadOnly());
txnParticipant.unstashTransactionResources(opCtx(), "find");
@@ -4340,8 +4449,11 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnAbortAfterPrepare) {
MongoDOperationContextSession opCtxSession(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT_TRUE(txnParticipant.onExitPrepare().isReady());
txnParticipant.unstashTransactionResources(opCtx(), "find");
@@ -4367,8 +4479,11 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnCommitAfterPrepare) {
MongoDOperationContextSession opCtxSession(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant.beginOrContinue(
- opCtx(), *opCtx()->getTxnNumber(), false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
ASSERT_TRUE(txnParticipant.onExitPrepare().isReady());
txnParticipant.unstashTransactionResources(opCtx(), "find");
@@ -4389,5 +4504,287 @@ TEST_F(TxnParticipantTest, ExitPreparePromiseIsFulfilledOnCommitAfterPrepare) {
ASSERT_TRUE(txnParticipant.onExitPrepare().isReady());
}
+TEST_F(ShardTxnParticipantTest, CanSpecifyTxnRetryCounterOnShardSvr) {
+ MongoDOperationContextSession opCtxSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 0 /* txnRetryCounter */);
+}
+
+TEST_F(ConfigTxnParticipantTest, CanSpecifyTxnRetryCounterOnConfigSvr) {
+ MongoDOperationContextSession opCtxSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 0 /* txnRetryCounter */);
+}
+
+TEST_F(TxnParticipantTest, CanOnlySpecifyTxnRetryCounterInShardedClusters) {
+ MongoDOperationContextSession opCtxSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 0 /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::InvalidOptions);
+}
+
+DEATH_TEST_F(ShardTxnParticipantTest,
+ CannotSpecifyNegativeTxnRetryCounter,
+ "Cannot specify a negative txnRetryCounter") {
+ MongoDOperationContextSession opCtxSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ -1 /* txnRetryCounter */);
+}
+
+DEATH_TEST_F(ShardTxnParticipantTest,
+ CannotSpecifyTxnRetryCounterForRetryableWrite,
+ "Cannot specify a txnRetryCounter for retryable write") {
+ MongoDOperationContextSession opCtxSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ 0 /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::InvalidOptions);
+}
+
+TEST_F(ShardTxnParticipantTest,
+ CanRestartInProgressTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 1 /* txnRetryCounter */);
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+}
+
+TEST_F(ShardTxnParticipantTest,
+ CanRestartAbortedUnpreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0 /* txnRetryCounter */);
+
+ txnParticipant.abortTransaction(opCtx());
+ ASSERT(txnParticipant.transactionIsAborted());
+
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 1 /* txnRetryCounter */);
+ ASSERT(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1);
+}
+
+TEST_F(ShardTxnParticipantTest,
+ CanRestartAbortedPreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT(txnParticipant.transactionIsInProgress());
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+ txnParticipant.prepareTransaction(opCtx(), {});
+ txnParticipant.abortTransaction(opCtx());
+ ASSERT(txnParticipant.transactionIsAborted());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 1 /* txnRetryCounter */);
+ ASSERT(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1);
+}
+
+TEST_F(ShardTxnParticipantTest,
+ CannotRestartCommittedUnpreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT(txnParticipant.transactionIsInProgress());
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+ txnParticipant.commitUnpreparedTransaction(opCtx());
+ ASSERT(txnParticipant.transactionIsCommitted());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 1 /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+ ASSERT(txnParticipant.transactionIsCommitted());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+}
+
+TEST_F(ShardTxnParticipantTest,
+ CannotRestartCommittedPreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT(txnParticipant.transactionIsInProgress());
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+ const auto prepareTimestamp = txnParticipant.prepareTransaction(opCtx(), {});
+ const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
+ txnParticipant.commitPreparedTransaction(opCtx(), commitTS, {});
+ ASSERT_TRUE(txnParticipant.transactionIsCommitted());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 1 /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+ ASSERT(txnParticipant.transactionIsCommitted());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+}
+
+TEST_F(ShardTxnParticipantTest,
+ CannotRestartPreparedTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT(txnParticipant.transactionIsInProgress());
+ txnParticipant.unstashTransactionResources(opCtx(), "commitTransaction");
+ txnParticipant.prepareTransaction(opCtx(), {});
+ ASSERT_TRUE(txnParticipant.transactionIsPrepared());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 1 /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+ ASSERT(txnParticipant.transactionIsPrepared());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+}
+
+TEST_F(ShardTxnParticipantTest, CannotRestartTransactionUsingTxnRetryCounterLessThanLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 1 /* txnRetryCounter */);
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1);
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 0 /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::TxnRetryCounterTooOld);
+ try {
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 0 /* txnRetryCounter */);
+ ASSERT(false);
+ } catch (const TxnRetryCounterTooOldException& ex) {
+ auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>();
+ ASSERT_EQ(info->getTxnRetryCounter(), 1);
+ }
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1);
+}
+
+TEST_F(ShardTxnParticipantTest, CanContinueTransactionUsingTxnRetryCounterEqualToLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ txnParticipant.stashTransactionResources(opCtx());
+
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ 0 /* txnRetryCounter */);
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+}
+
+TEST_F(ShardTxnParticipantTest, CannotContinueTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ txnParticipant.stashTransactionResources(opCtx());
+
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ 1 /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 0);
+}
+
+TEST_F(ShardTxnParticipantTest, CannotContinueTransactionUsingTxnRetryCounterLessThanLastUsed) {
+ auto sessionCheckout = checkOutSession();
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 1 /* txnRetryCounter */);
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1);
+
+ txnParticipant.unstashTransactionResources(opCtx(), "insert");
+ txnParticipant.stashTransactionResources(opCtx());
+ ASSERT_THROWS_CODE(txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ boost::none /* startTransaction */,
+ 0 /* txnRetryCounter */),
+ AssertionException,
+ ErrorCodes::TxnRetryCounterTooOld);
+ try {
+ txnParticipant.beginOrContinue(opCtx(),
+ *opCtx()->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ 0 /* txnRetryCounter */);
+ ASSERT(false);
+ } catch (const TxnRetryCounterTooOldException& ex) {
+ auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>();
+ ASSERT_EQ(info->getTxnRetryCounter(), 1);
+ }
+ ASSERT_TRUE(txnParticipant.transactionIsInProgress());
+ ASSERT_EQ(txnParticipant.getActiveTxnRetryCounter(), 1);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/txn_retry_counter_too_old_info.cpp b/src/mongo/db/txn_retry_counter_too_old_info.cpp
new file mode 100644
index 00000000000..ba5f01429d9
--- /dev/null
+++ b/src/mongo/db/txn_retry_counter_too_old_info.cpp
@@ -0,0 +1,54 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/txn_retry_counter_too_old_info.h"
+
+#include "mongo/base/init.h"
+
+namespace mongo {
+
+namespace {
+
+MONGO_INIT_REGISTER_ERROR_EXTRA_INFO(TxnRetryCounterTooOldInfo);
+
+constexpr StringData kTxnRetryCounterFieldName = "txnRetryCounter"_sd;
+
+} // namespace
+
+void TxnRetryCounterTooOldInfo::serialize(BSONObjBuilder* bob) const {
+ bob->append(kTxnRetryCounterFieldName, _txnRetryCounter);
+}
+
+std::shared_ptr<const ErrorExtraInfo> TxnRetryCounterTooOldInfo::parse(const BSONObj& obj) {
+ return std::make_shared<TxnRetryCounterTooOldInfo>(obj[kTxnRetryCounterFieldName].Int());
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/txn_retry_counter_too_old_info.h b/src/mongo/db/txn_retry_counter_too_old_info.h
new file mode 100644
index 00000000000..f126af7ede1
--- /dev/null
+++ b/src/mongo/db/txn_retry_counter_too_old_info.h
@@ -0,0 +1,59 @@
+/**
+ * Copyright (C) 2021-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/error_extra_info.h"
+#include "mongo/bson/bsonobj.h"
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/logical_session_id.h"
+
+namespace mongo {
+
+class TxnRetryCounterTooOldInfo final : public ErrorExtraInfo {
+public:
+ static constexpr auto code = ErrorCodes::TxnRetryCounterTooOld;
+
+ explicit TxnRetryCounterTooOldInfo(const TxnRetryCounter txnRetryCounter)
+ : _txnRetryCounter(txnRetryCounter){};
+
+ const auto& getTxnRetryCounter() const {
+ return _txnRetryCounter;
+ }
+
+ void serialize(BSONObjBuilder* bob) const override;
+ static std::shared_ptr<const ErrorExtraInfo> parse(const BSONObj&);
+
+private:
+ TxnRetryCounter _txnRetryCounter;
+};
+
+using TxnRetryCounterTooOldException = ExceptionFor<ErrorCodes::TxnRetryCounterTooOld>;
+
+} // namespace mongo
diff --git a/src/mongo/dbtests/storage_timestamp_tests.cpp b/src/mongo/dbtests/storage_timestamp_tests.cpp
index c0f0a8551a3..067ab52d4cd 100644
--- a/src/mongo/dbtests/storage_timestamp_tests.cpp
+++ b/src/mongo/dbtests/storage_timestamp_tests.cpp
@@ -1767,8 +1767,11 @@ public:
auto txnParticipant = TransactionParticipant::get(_opCtx);
ASSERT(txnParticipant);
- txnParticipant.beginOrContinue(
- _opCtx, *_opCtx->getTxnNumber(), false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(_opCtx,
+ *_opCtx->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(_opCtx, "insert");
{
// Insert a document that will set the index as multikey.
@@ -3464,8 +3467,11 @@ public:
auto txnParticipant = TransactionParticipant::get(_opCtx);
ASSERT(txnParticipant);
// Start a retryable write.
- txnParticipant.beginOrContinue(
- _opCtx, txnNumber, boost::none /* autocommit */, boost::none /* startTransaction */);
+ txnParticipant.beginOrContinue(_opCtx,
+ txnNumber,
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */,
+ boost::none /* txnRetryCounter */);
}
protected:
@@ -3670,8 +3676,11 @@ public:
auto txnParticipant = TransactionParticipant::get(_opCtx);
ASSERT(txnParticipant);
- txnParticipant.beginOrContinue(
- _opCtx, *_opCtx->getTxnNumber(), false /* autocommit */, true /* startTransaction */);
+ txnParticipant.beginOrContinue(_opCtx,
+ *_opCtx->getTxnNumber(),
+ false /* autocommit */,
+ true /* startTransaction */,
+ boost::none /* txnRetryCounter */);
txnParticipant.unstashTransactionResources(_opCtx, "insert");
{
AutoGetCollection autoColl(_opCtx, nss, LockMode::MODE_IX);
diff --git a/src/mongo/idl/generic_argument.idl b/src/mongo/idl/generic_argument.idl
index c384e52f34d..016fe8d043b 100644
--- a/src/mongo/idl/generic_argument.idl
+++ b/src/mongo/idl/generic_argument.idl
@@ -99,6 +99,8 @@ generic_argument_lists:
forward_to_shards: false
$topologyTime:
forward_to_shards: false
+ txnRetryCounter:
+ forward_to_shards: true
generic_reply_field_lists:
generic_reply_fields_api_v1:
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index ea3efda12ea..93ba0651bf6 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -64,6 +64,7 @@ env.Library(
'grid',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
'$BUILD_DIR/mongo/db/sessions_collection',
],
)
diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp
index d4dda8ae836..99324ffcd86 100644
--- a/src/mongo/s/commands/document_shard_key_update_util.cpp
+++ b/src/mongo/s/commands/document_shard_key_update_util.cpp
@@ -148,7 +148,8 @@ void startTransactionForShardKeyUpdate(OperationContext* opCtx) {
auto txnNumber = opCtx->getTxnNumber();
invariant(txnNumber);
- txnRouter.beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */);
}
BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx) {
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index c90ac6bf405..c512ccc9867 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -627,6 +627,9 @@ Status ParseAndRunCommand::RunInvocation::_setup() {
auto txnNumber = opCtx->getTxnNumber();
invariant(txnNumber);
+ auto txnRetryCounter = opCtx->getTxnRetryCounter();
+ invariant(txnRetryCounter);
+
auto transactionAction = ([&] {
auto startTxnSetting = _parc->_osi->getStartTransaction();
if (startTxnSetting && *startTxnSetting) {
@@ -641,7 +644,7 @@ Status ParseAndRunCommand::RunInvocation::_setup() {
})();
startTransaction = (transactionAction == TransactionRouter::TransactionActions::kStart);
- txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction);
+ txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction, *txnRetryCounter);
}
bool supportsWriteConcern = invocation->supportsWriteConcern();
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 160bce00191..64742e972c7 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -40,10 +40,12 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/internal_transactions_feature_flag_gen.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/transaction_validation.h"
+#include "mongo/db/txn_retry_counter_too_old_info.h"
#include "mongo/db/vector_clock.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/logv2/log.h"
@@ -244,6 +246,18 @@ std::string commitTypeToString(TransactionRouter::CommitType state) {
MONGO_UNREACHABLE;
}
+std::string actionTypeToString(TransactionRouter::TransactionActions action) {
+ switch (action) {
+ case TransactionRouter::TransactionActions::kStart:
+ return "start";
+ case TransactionRouter::TransactionActions::kContinue:
+ return "continue";
+ case TransactionRouter::TransactionActions::kCommit:
+ return "commit";
+ }
+ MONGO_UNREACHABLE;
+}
+
} // unnamed namespace
TransactionRouter::TransactionRouter() = default;
@@ -453,6 +467,12 @@ BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(
invariant(sharedOptions.txnNumber == *osi.getTxnNumber());
}
+ if (feature_flags::gFeatureFlagInternalTransactions.isEnabled(
+ serverGlobalParams.featureCompatibility)) {
+ newCmd.append(OperationSessionInfoFromClient::kTxnRetryCounterFieldName,
+ sharedOptions.txnRetryCounter);
+ }
+
return newCmd.obj();
}
@@ -634,6 +654,7 @@ TransactionRouter::Participant& TransactionRouter::Router::_createParticipant(
SharedTransactionOptions sharedOptions = {
o().txnNumber,
+ o().txnRetryCounter,
o().apiParameters,
o().readConcernArgs,
o().atClusterTime ? boost::optional<LogicalTime>(o().atClusterTime->getTime())
@@ -893,31 +914,26 @@ void TransactionRouter::Router::_setAtClusterTime(
o(lk).atClusterTime->setTime(candidateTime, p().latestStmtId);
}
-void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
- TxnNumber txnNumber,
- TransactionActions action) {
- if (txnNumber < o().txnNumber) {
- // This transaction is older than the transaction currently in progress, so throw an error.
- uasserted(ErrorCodes::TransactionTooOld,
- str::stream() << "txnNumber " << txnNumber << " is less than last txnNumber "
- << o().txnNumber << " seen in session " << _sessionId());
- } else if (txnNumber == o().txnNumber) {
- // This is the same transaction as the one in progress.
- auto apiParamsFromClient = APIParameters::get(opCtx);
- if (action == TransactionActions::kContinue || action == TransactionActions::kCommit) {
- uassert(
- ErrorCodes::APIMismatchError,
- "API parameter mismatch: transaction-continuing command used {}, the transaction's"
- " first command used {}"_format(apiParamsFromClient.toBSON().toString(),
- o().apiParameters.toBSON().toString()),
- apiParamsFromClient == o().apiParameters);
- }
-
+void TransactionRouter::Router::_beginOrContinueActiveTxnNumber(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ TransactionActions action,
+ TxnRetryCounter txnRetryCounter) {
+ invariant(txnNumber == o().txnNumber);
+
+ if (txnRetryCounter < o().txnRetryCounter) {
+ uasserted(TxnRetryCounterTooOldInfo(o().txnRetryCounter),
+ str::stream() << "Cannot " << actionTypeToString(action) << " transaction "
+ << txnNumber << " on session " << _sessionId()
+ << " using txnRetryCounter " << txnRetryCounter
+ << " because the transaction has already been restarted using"
+ << " a higher txnRetryCounter " << o().txnRetryCounter);
+ } else if (txnRetryCounter == o().txnRetryCounter) {
switch (action) {
case TransactionActions::kStart: {
uasserted(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "txnNumber " << o().txnNumber << " for session "
<< _sessionId() << " already started");
+ break;
}
case TransactionActions::kContinue: {
uassert(ErrorCodes::InvalidOptions,
@@ -936,62 +952,83 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
_onContinue(opCtx);
break;
}
- } else if (txnNumber > o().txnNumber) {
- // This is a newer transaction.
- switch (action) {
- case TransactionActions::kStart: {
- auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
- uassert(
- ErrorCodes::InvalidOptions,
- "The first command in a transaction cannot specify a readConcern level other "
- "than local, majority, or snapshot",
- !readConcernArgs.hasLevel() ||
- isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel()));
-
- _resetRouterState(opCtx, txnNumber);
-
- {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- o(lk).apiParameters = APIParameters::get(opCtx);
- o(lk).readConcernArgs = readConcernArgs;
- }
+ } else {
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Cannot " << actionTypeToString(action) << " transaction "
+ << txnNumber << " on session " << _sessionId()
+ << " using txnRetryCounter " << txnRetryCounter
+ << " because it is using a lower txnRetryCounter "
+ << o().txnRetryCounter,
+ action == TransactionActions::kStart);
+ uassert(ErrorCodes::IllegalOperation,
+ str::stream() << "Cannot restart transaction " << txnNumber << " on session "
+ << _sessionId() << " using txnRetryCounter " << txnRetryCounter
+ << " because it has already started to commit",
+ o().commitType == CommitType::kNotInitiated || !o().abortCause.empty());
+ _resetRouterStateForStartTransaction(opCtx, txnNumber, txnRetryCounter);
+ }
+}
- if (o().readConcernArgs.getLevel() ==
- repl::ReadConcernLevel::kSnapshotReadConcern) {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
- o(lk).atClusterTime.emplace();
- }
+void TransactionRouter::Router::_beginNewTxnNumber(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ TransactionActions action,
+ TxnRetryCounter txnRetryCounter) {
+ invariant(txnNumber > o().txnNumber);
+
+ switch (action) {
+ case TransactionActions::kStart: {
+ _resetRouterStateForStartTransaction(opCtx, txnNumber, txnRetryCounter);
+ break;
+ }
+ case TransactionActions::kContinue: {
+ uasserted(ErrorCodes::NoSuchTransaction,
+ str::stream() << "cannot continue txnId " << o().txnNumber << " for session "
+ << _sessionId() << " with txnId " << txnNumber);
+ }
+ case TransactionActions::kCommit: {
+ _resetRouterState(opCtx, txnNumber, txnRetryCounter);
+ // If the first action seen by the router for this transaction is to commit, that
+ // means that the client is attempting to recover a commit decision.
+ p().isRecoveringCommit = true;
- LOGV2_DEBUG(22889,
- 3,
- "{sessionId}:{txnNumber} New transaction started",
- "New transaction started",
- "sessionId"_attr = _sessionId().getId(),
- "txnNumber"_attr = o().txnNumber);
- break;
- }
- case TransactionActions::kContinue: {
- uasserted(ErrorCodes::NoSuchTransaction,
- str::stream()
- << "cannot continue txnId " << o().txnNumber << " for session "
- << _sessionId() << " with txnId " << txnNumber);
- }
- case TransactionActions::kCommit: {
- _resetRouterState(opCtx, txnNumber);
- // If the first action seen by the router for this transaction is to commit, that
- // means that the client is attempting to recover a commit decision.
- p().isRecoveringCommit = true;
-
- LOGV2_DEBUG(22890,
- 3,
- "{sessionId}:{txnNumber} Commit recovery started",
- "Commit recovery started",
- "sessionId"_attr = _sessionId().getId(),
- "txnNumber"_attr = o().txnNumber);
+ LOGV2_DEBUG(22890,
+ 3,
+ "{sessionId}:{txnNumber} Commit recovery started",
+ "Commit recovery started",
+ "sessionId"_attr = _sessionId().getId(),
+ "txnNumber"_attr = o().txnNumber);
- break;
- }
- };
+ break;
+ }
+ };
+}
+
+void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ TransactionActions action,
+ TxnRetryCounter txnRetryCounter) {
+ invariant(txnRetryCounter >= 0, "Cannot specify a negative txnRetryCounter");
+
+ if (txnNumber < o().txnNumber) {
+ // This transaction is older than the transaction currently in progress, so throw an error.
+ uasserted(ErrorCodes::TransactionTooOld,
+ str::stream() << "txnNumber " << txnNumber << " is less than last txnNumber "
+ << o().txnNumber << " seen in session " << _sessionId());
+ } else if (txnNumber == o().txnNumber) {
+ // This is the same transaction as the one in progress.
+ auto apiParamsFromClient = APIParameters::get(opCtx);
+ if (action == TransactionActions::kContinue || action == TransactionActions::kCommit) {
+ uassert(
+ ErrorCodes::APIMismatchError,
+ "API parameter mismatch: transaction-continuing command used {}, the transaction's"
+ " first command used {}"_format(apiParamsFromClient.toBSON().toString(),
+ o().apiParameters.toBSON().toString()),
+ apiParamsFromClient == o().apiParameters);
+ }
+ _beginOrContinueActiveTxnNumber(opCtx, txnNumber, action, txnRetryCounter);
+ } else {
+ // This is a newer transaction.
+ _beginNewTxnNumber(opCtx, txnNumber, action, txnRetryCounter);
}
_updateLastClientInfo(opCtx->getClient());
@@ -1326,9 +1363,11 @@ void TransactionRouter::Router::appendRecoveryToken(BSONObjBuilder* builder) con
}
void TransactionRouter::Router::_resetRouterState(OperationContext* opCtx,
- const TxnNumber& txnNumber) {
+ const TxnNumber& txnNumber,
+ const TxnRetryCounter& txnRetryCounter) {
stdx::lock_guard<Client> lk(*opCtx->getClient());
o(lk).txnNumber = txnNumber;
+ o(lk).txnRetryCounter = txnRetryCounter;
o(lk).commitType = CommitType::kNotInitiated;
p().isRecoveringCommit = false;
o(lk).participants.clear();
@@ -1350,6 +1389,37 @@ void TransactionRouter::Router::_resetRouterState(OperationContext* opCtx,
p().firstStmtId = kDefaultFirstStmtId;
};
+void TransactionRouter::Router::_resetRouterStateForStartTransaction(
+ OperationContext* opCtx, const TxnNumber& txnNumber, const TxnRetryCounter& txnRetryCounter) {
+ auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
+ uassert(ErrorCodes::InvalidOptions,
+ "The first command in a transaction cannot specify a readConcern level "
+ "other than local, majority, or snapshot",
+ !readConcernArgs.hasLevel() ||
+ isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel()));
+
+ _resetRouterState(opCtx, txnNumber, txnRetryCounter);
+
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).apiParameters = APIParameters::get(opCtx);
+ o(lk).readConcernArgs = readConcernArgs;
+ }
+
+ if (o().readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).atClusterTime.emplace();
+ }
+
+ LOGV2_DEBUG(22889,
+ 3,
+ "{sessionId}:{txnNumber} New transaction started",
+ "New transaction started",
+ "sessionId"_attr = _sessionId().getId(),
+ "txnNumber"_attr = o().txnNumber,
+ "txnRetryCounter"_attr = txnRetryCounter);
+};
+
BSONObj TransactionRouter::Router::_commitWithRecoveryToken(OperationContext* opCtx,
const TxnRecoveryToken& recoveryToken) {
uassert(ErrorCodes::NoSuchTransaction,
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index b31be66cc5d..3b360b1eb6f 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -83,6 +83,7 @@ public:
struct SharedTransactionOptions {
// Set for all distributed transactions.
TxnNumber txnNumber;
+ TxnRetryCounter txnRetryCounter;
APIParameters apiParameters;
repl::ReadConcernArgs readConcernArgs;
@@ -367,7 +368,8 @@ public:
*/
void beginOrContinueTxn(OperationContext* opCtx,
TxnNumber txnNumber,
- TransactionActions action);
+ TransactionActions action,
+ TxnRetryCounter txnRetryCounter);
/**
* Updates transaction diagnostics when the transaction's session is checked in.
@@ -534,7 +536,35 @@ public:
* time. This is required because we don't create a new router object for each transaction,
* but instead reuse the same object across different transactions.
*/
- void _resetRouterState(OperationContext* opCtx, const TxnNumber& txnNumber);
+ void _resetRouterState(OperationContext* opCtx,
+ const TxnNumber& txnNumber,
+ const TxnRetryCounter& txnRetryCounter);
+
+ /**
+ * Calls _resetRouterState and then resets the read concern and the cluster time of the
+ * timestamp that all participant shards in the current transaction with snapshot level read
+ * concern must read from.
+ */
+ void _resetRouterStateForStartTransaction(OperationContext* opCtx,
+ const TxnNumber& txnNumber,
+ const TxnRetryCounter& txnRetryCounter);
+
+ /**
+ * Continues or restarts the currently active transaction.
+ */
+ void _beginOrContinueActiveTxnNumber(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ TransactionActions action,
+ TxnRetryCounter txnRetryCounter);
+
+ /**
+ * Starts a new transaction or continues a transaction started by a different router to
+ * recover the commit decision.
+ */
+ void _beginNewTxnNumber(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ TransactionActions action,
+ TxnRetryCounter txnRetryCounter);
/**
* Internal method for committing a transaction. Should only throw on failure to send
@@ -696,6 +726,10 @@ private:
// called. Otherwise set to kUninitializedTxnNumber.
TxnNumber txnNumber{kUninitializedTxnNumber};
+ // The last seen txnRetryCounter for the currently active transaction, if beginOrContinueTxn
+ // has been called. Otherwise set to kUninitializedTxnRetryCounter.
+ TxnRetryCounter txnRetryCounter{kUninitializedTxnRetryCounter};
+
// Is updated at commit time to reflect which commit path was taken.
CommitType commitType{CommitType::kNotInitiated};
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 4e286306a4d..447cfadc389 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -38,7 +38,10 @@
#include "mongo/db/commands.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/repl/read_concern_args.h"
+#include "mongo/db/txn_retry_counter_too_old_info.h"
#include "mongo/db/vector_clock.h"
+#include "mongo/idl/server_parameter_test_util.h"
+#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/type_shard.h"
#include "mongo/s/router_transactions_metrics.h"
@@ -110,6 +113,8 @@ protected:
StaleConfigInfo(kViewNss, ChunkVersion::UNSHARDED(), boost::none, shard1),
"The metadata for the collection is not loaded"};
+ RAIIServerParameterControllerForTest _controller{"featureFlagInternalTransactions", true};
+
void setUp() override {
ShardingTestFixture::setUp();
configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
@@ -230,8 +235,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
@@ -241,7 +248,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
- << "autocommit" << false << "txnNumber" << txnNum);
+ << "autocommit" << false << "txnNumber" << txnNum
+ << "txnRetryCounter" << 0);
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
@@ -259,7 +267,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
ASSERT_BSONOBJ_EQ(BSON("update"
<< "test"
<< "coordinator" << true << "autocommit" << false << "txnNumber"
- << txnNum),
+ << txnNum << "txnRetryCounter" << 0),
newCmd);
}
}
@@ -268,8 +276,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime)
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
@@ -279,7 +289,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime)
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
- << "autocommit" << false << "txnNumber" << txnNum);
+ << "autocommit" << false << "txnNumber" << txnNum
+ << "txnRetryCounter" << 0);
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
@@ -297,7 +308,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime)
ASSERT_BSONOBJ_EQ(BSON("update"
<< "test"
<< "coordinator" << true << "autocommit" << false << "txnNumber"
- << txnNum),
+ << txnNum << "txnRetryCounter" << 0),
newCmd);
}
}
@@ -307,8 +318,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting)
auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue),
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */),
AssertionException,
ErrorCodes::NoSuchTransaction);
}
@@ -317,8 +330,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
@@ -328,7 +343,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
- << "autocommit" << false << "txnNumber" << txnNum);
+ << "autocommit" << false << "txnNumber" << txnNum
+ << "txnRetryCounter" << 0);
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
@@ -346,7 +362,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe
ASSERT_BSONOBJ_EQ(BSON("update"
<< "test"
<< "coordinator" << true << "autocommit" << false << "txnNumber"
- << txnNum),
+ << txnNum << "txnRetryCounter" << 0),
newCmd);
}
@@ -357,7 +373,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "autocommit" << false << "txnNumber"
- << txnNum);
+ << txnNum << "txnRetryCounter" << 0);
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
@@ -374,7 +390,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe
<< "test"));
ASSERT_BSONOBJ_EQ(BSON("update"
<< "test"
- << "autocommit" << false << "txnNumber" << txnNum),
+ << "autocommit" << false << "txnNumber" << txnNum
+ << "txnRetryCounter" << 0),
newCmd);
}
}
@@ -383,8 +400,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState)
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
{
@@ -399,13 +418,16 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState)
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
- << "autocommit" << false << "txnNumber" << txnNum),
+ << "autocommit" << false << "txnNumber" << txnNum
+ << "txnRetryCounter" << 0),
newCmd);
}
TxnNumber txnNum2{5};
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum2,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
@@ -415,7 +437,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState)
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
- << "autocommit" << false << "txnNumber" << txnNum2);
+ << "autocommit" << false << "txnNumber" << txnNum2
+ << "txnRetryCounter" << 0);
{
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
@@ -426,12 +449,329 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState)
}
}
-TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) {
+DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
+ CannotSpecifyNegativeTxnRetryCounter,
+ "Cannot specify a negative txnRetryCounter") {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, -1);
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession, NotAttachTxnRetryCounterIfFeatureFlagIsNotEnabled) {
+ RAIIServerParameterControllerForTest controller{"featureFlagInternalTransactions", false};
+
TxnNumber txnNum{3};
+ BSONObj expectedCmd = BSON("insert"
+ << "test"
+ << "readConcern"
+ << BSON("level"
+ << "snapshot"
+ << "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
+ << "startTransaction" << true << "coordinator" << true
+ << "autocommit" << false << "txnNumber" << txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
+
txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ ASSERT_BSONOBJ_EQ(expectedCmd,
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(),
+ shard1,
+ BSON("insert"
+ << "test")));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CanRestartInProgressTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ TxnNumber txnNum{3};
+ BSONObj expectedCmd = BSON("insert"
+ << "test"
+ << "readConcern"
+ << BSON("level"
+ << "snapshot"
+ << "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
+ << "startTransaction" << true << "coordinator" << true
+ << "autocommit" << false << "txnNumber" << txnNum);
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ ASSERT_BSONOBJ_EQ(expectedCmd.addFields(BSON("txnRetryCounter" << 0)),
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(),
+ shard1,
+ BSON("insert"
+ << "test")));
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ ASSERT_BSONOBJ_EQ(expectedCmd.addFields(BSON("txnRetryCounter" << 1)),
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(),
+ shard1,
+ BSON("insert"
+ << "test")));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CanRestartAbortedTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ TxnNumber txnNum{3};
+ BSONObj expectedCmd = BSON("insert"
+ << "test"
+ << "readConcern"
+ << BSON("level"
+ << "snapshot"
+ << "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
+ << "startTransaction" << true << "coordinator" << true
+ << "autocommit" << false << "txnNumber" << txnNum);
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ ASSERT_BSONOBJ_EQ(expectedCmd.addFields(BSON("txnRetryCounter" << 0)),
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(),
+ shard1,
+ BSON("insert"
+ << "test")));
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
+ startCapturingLogMessages();
+ auto future = launchAsync([&] { txnRouter.abortTransaction(operationContext()); });
+ expectAbortTransactions({hostAndPort1}, *operationContext()->getLogicalSessionId(), txnNum);
+ future.default_timed_get();
+ stopCapturingLogMessages();
+
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ ASSERT_BSONOBJ_EQ(expectedCmd.addFields(BSON("txnRetryCounter" << 1)),
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(),
+ shard1,
+ BSON("insert"
+ << "test")));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CanRestartTransactionWithFailedCommitUsingTxnRetryCounterGreaterThanLastUsed) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
+ ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly ==
+ txnRouter.getParticipant(shard1)->readOnly);
+
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 0);
+ auto future =
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), boost::none); });
+ expectCommitTransaction(kDummyErrorRes);
+ future.default_timed_get();
+
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1);
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 1);
+ auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
+ ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CannotRestartCommittedTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 0);
+ auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
+ ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
+
+ ASSERT_THROWS_CODE(
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+
+ commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
+ ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CannotRestartTransactionUsingTxnRetryCounterLessThanLastUsed) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1);
+
+ ASSERT_THROWS_CODE(
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0),
+ AssertionException,
+ ErrorCodes::TxnRetryCounterTooOld);
+ try {
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 0);
+ } catch (const TxnRetryCounterTooOldException& ex) {
+ auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>();
+ ASSERT_EQ(info->getTxnRetryCounter(), 1);
+ }
+
+ auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
+ ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CanContinueTransactionUsingTxnRetryCounterEqualToLastUsed) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ // (Must reset readConcern from "snapshot".)
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 1);
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CannotContinueTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ // (Must reset readConcern from "snapshot".)
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ ASSERT_THROWS_CODE(
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 1),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+
+ auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
+ ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CannotContinueTransactionUsingTxnRetryCounterLessThanLastUsed) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ // (Must reset readConcern from "snapshot".)
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ ASSERT_THROWS_CODE(
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 0),
+ AssertionException,
+ ErrorCodes::TxnRetryCounterTooOld);
+ try {
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue, 0);
+ } catch (const TxnRetryCounterTooOldException& ex) {
+ auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>();
+ ASSERT_EQ(info->getTxnRetryCounter(), 1);
+ }
+
+ auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
+ ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CanCommitTransactionUsingTxnRetryCounterEqualToLastUsed) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ // (Must reset readConcern from "snapshot".)
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 1);
+
+ auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
+ ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CannotCommitTransactionUsingTxnRetryCounterGreaterThanLastUsed) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 0);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ // (Must reset readConcern from "snapshot".)
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ ASSERT_THROWS_CODE(
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 1),
+ AssertionException,
+ ErrorCodes::IllegalOperation);
+
+ auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
+ ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession,
+ CannotCommitTransactionUsingTxnRetryCounterLessThanLastUsed) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, 1);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ // (Must reset readConcern from "snapshot".)
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ ASSERT_THROWS_CODE(
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 0),
+ AssertionException,
+ ErrorCodes::TxnRetryCounterTooOld);
+ try {
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit, 0);
+ } catch (const TxnRetryCounterTooOldException& ex) {
+ auto info = ex.extraInfo<TxnRetryCounterTooOldInfo>();
+ ASSERT_EQ(info->getTxnRetryCounter(), 1);
+ }
+
+ auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
+ ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_FALSE(txnRouter.getCoordinatorId());
@@ -453,8 +793,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) {
}
TxnNumber txnNum2{5};
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum2,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_FALSE(txnRouter.getCoordinatorId());
@@ -472,8 +814,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForRea
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// The recovery shard is unset initially.
@@ -492,8 +836,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForRea
txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse);
ASSERT_FALSE(txnRouter.getRecoveryShardId());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
// The recovery shard is not set even if the participants say they did a write for commit.
auto future =
@@ -514,8 +860,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -529,8 +877,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -550,8 +900,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Shard1's response says read-only.
@@ -571,8 +923,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Shard1's response says not read-only.
@@ -595,8 +949,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Shard1's response says not read-only.
@@ -607,8 +963,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// New statement.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
// Shard2 responds, it doesn't matter whether it's read-only, just that it's a pending
// participant.
@@ -631,8 +989,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNe
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Shard1's response says not read-only.
@@ -643,8 +1003,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNe
// Start new transaction on session.
TxnNumber newTxnNum{4};
- txnRouter.beginOrContinueTxn(
- operationContext(), newTxnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ newTxnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
ASSERT_FALSE(txnRouter.getRecoveryShardId());
}
@@ -653,8 +1015,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, DoesNotAttachTxnNumIfAlreadyTher
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
@@ -664,7 +1028,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, DoesNotAttachTxnNumIfAlreadyTher
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
- << "autocommit" << false);
+ << "autocommit" << false << "txnRetryCounter" << 0);
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
shard1,
@@ -680,8 +1044,10 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(),
@@ -695,8 +1061,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfA
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
{
@@ -714,7 +1082,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfA
<< "snapshot"
<< "atClusterTime" << kInMemoryLogicalTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
- << "autocommit" << false << "txnNumber" << txnNum),
+ << "autocommit" << false << "txnNumber" << txnNum
+ << "txnRetryCounter" << 0),
newCmd);
}
}
@@ -726,14 +1095,18 @@ TEST_F(TransactionRouterTestWithDefaultSession, SameAPIParametersAfterFirstState
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Continuing with the same API params succeeds. (Must reset readConcern from "snapshot".)
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
}
TEST_F(TransactionRouterTestWithDefaultSession, DifferentAPIParametersAfterFirstStatement) {
@@ -743,16 +1116,20 @@ TEST_F(TransactionRouterTestWithDefaultSession, DifferentAPIParametersAfterFirst
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Can't continue with different params. (Must reset readConcern from "snapshot".)
APIParameters::get(operationContext()).setAPIStrict(true);
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
ASSERT_THROWS_CODE(
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue),
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */),
DBException,
ErrorCodes::APIMismatchError);
}
@@ -764,16 +1141,20 @@ TEST_F(TransactionRouterTestWithDefaultSession, NoAPIParametersAfterFirstStateme
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Can't continue without params. (Must reset readConcern from "snapshot".)
APIParameters::get(operationContext()) = APIParameters();
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
ASSERT_THROWS_CODE(
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue),
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */),
DBException,
ErrorCodes::APIMismatchError);
}
@@ -782,13 +1163,17 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFir
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_THROWS_CODE(
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue),
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */),
DBException,
ErrorCodes::InvalidOptions);
}
@@ -799,15 +1184,17 @@ TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughEmptyReadConcernToP
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
<< "test"
<< "readConcern" << BSONObj() << "startTransaction" << true
<< "coordinator" << true << "autocommit" << false << "txnNumber"
- << txnNum);
+ << txnNum << "txnRetryCounter" << 0);
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
shard1,
@@ -825,8 +1212,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedNewObj = BSON("insert"
@@ -834,7 +1223,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
<< "readConcern"
<< BSON("afterClusterTime" << kAfterClusterTime.asTimestamp())
<< "startTransaction" << true << "coordinator" << true
- << "autocommit" << false << "txnNumber" << txnNum);
+ << "autocommit" << false << "txnNumber" << txnNum
+ << "txnRetryCounter" << 0);
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
shard1,
@@ -850,8 +1240,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedReadConcernLeve
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart),
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */),
DBException,
ErrorCodes::InvalidOptions);
}
@@ -865,8 +1257,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart),
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */),
DBException,
ErrorCodes::InvalidOptions);
}
@@ -880,8 +1274,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart),
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */),
DBException,
ErrorCodes::InvalidOptions);
}
@@ -891,8 +1287,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsO
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_THROWS(txnRouter.commitTransaction(operationContext(), boost::none), AssertionException);
@@ -901,6 +1299,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsO
void checkSessionDetails(const BSONObj& cmdObj,
const LogicalSessionId& lsid,
const TxnNumber txnNum,
+ const TxnRetryCounter txnRetryCounter,
boost::optional<bool> isCoordinator) {
auto osi = OperationSessionInfoFromClient::parse("testTxnRouter"_sd, cmdObj);
@@ -913,6 +1312,9 @@ void checkSessionDetails(const BSONObj& cmdObj,
ASSERT(osi.getAutocommit());
ASSERT_FALSE(*osi.getAutocommit());
+ ASSERT(osi.getTxnRetryCounter());
+ ASSERT_EQ(txnRetryCounter, *osi.getTxnRetryCounter());
+
if (isCoordinator) {
ASSERT_EQ(*isCoordinator, cmdObj["coordinator"].trueValue());
} else {
@@ -931,12 +1333,16 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
auto commitResult = txnRouter.commitTransaction(operationContext(), boost::none);
ASSERT_BSONOBJ_EQ(commitResult, BSON("ok" << 1));
@@ -950,10 +1356,11 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyForSingleParticipantThatIsReadOnly) {
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -962,8 +1369,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ txnRetryCounter);
auto future =
launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
@@ -975,7 +1384,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "commitTransaction");
- checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */);
return BSON("ok" << 1);
});
@@ -986,10 +1396,11 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyForSingleParticipantThatDidAWrite) {
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -998,8 +1409,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ txnRetryCounter);
auto future =
launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
@@ -1011,7 +1424,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "commitTransaction");
- checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */);
return BSON("ok" << 1);
});
@@ -1022,10 +1436,11 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyForMultipleParticipantsThatAreAllReadOnly) {
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -1036,8 +1451,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ txnRetryCounter);
auto future =
launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
@@ -1057,8 +1474,11 @@ TEST_F(TransactionRouterTestWithDefaultSession,
ASSERT_EQ(cmdName, "commitTransaction");
// The shard with hostAndPort1 is expected to be the coordinator.
- checkSessionDetails(
- request.cmdObj, getSessionId(), txnNum, (request.target == hostAndPort1));
+ checkSessionDetails(request.cmdObj,
+ getSessionId(),
+ txnNum,
+ txnRetryCounter,
+ (request.target == hostAndPort1));
return kOkReadOnlyTrueResponse;
});
@@ -1072,10 +1492,11 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCoordinateCommitForMultipleParticipantsOnlyOneDidAWrite) {
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -1083,8 +1504,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ txnRetryCounter);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1109,7 +1532,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
expectedParticipants.erase(shardId);
}
- checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */);
return BSON("ok" << 1);
});
@@ -1120,10 +1544,11 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCoordinateCommitForMultipleParticipantsMoreThanOneDidAWrite) {
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -1131,8 +1556,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ txnRetryCounter);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1157,7 +1584,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
expectedParticipants.erase(shardId);
}
- checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */);
return BSON("ok" << 1);
});
@@ -1168,6 +1596,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setLogicalSessionId(lsid);
@@ -1178,7 +1607,8 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, txnRetryCounter);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1196,7 +1626,8 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) {
auto participantElements = request.cmdObj["participants"].Array();
ASSERT_TRUE(participantElements.empty());
- checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */);
checkWriteConcern(request.cmdObj, writeConcern);
return BSON("ok" << 1);
@@ -1207,8 +1638,10 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) {
// Sending commit with a recovery token again should cause the router to use the recovery path
// again.
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ txnRetryCounter);
future = launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
@@ -1222,7 +1655,8 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) {
auto participantElements = request.cmdObj["participants"].Array();
ASSERT_TRUE(participantElements.empty());
- checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */);
checkWriteConcern(request.cmdObj, writeConcern);
return BSON("ok" << 1);
@@ -1234,6 +1668,7 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) {
TEST_F(TransactionRouterTestWithDefaultSession,
CrossShardTxnCommitWorksAfterRecoveryCommitForPreviousTransaction) {
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setTxnNumber(txnNum);
@@ -1244,7 +1679,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
auto txnRouter = TransactionRouter::get(opCtx);
// Simulate recovering a commit with a recovery token and no participants.
{
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, txnRetryCounter);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1262,7 +1698,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
auto participantElements = request.cmdObj["participants"].Array();
ASSERT_TRUE(participantElements.empty());
- checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */);
checkWriteConcern(request.cmdObj, writeConcern);
return BSON("ok" << 1);
@@ -1275,8 +1712,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// should be sent with the correct participant list.
{
++txnNum;
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -1284,8 +1723,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ txnRetryCounter);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1310,7 +1751,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
expectedParticipants.erase(shardId);
}
- checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */);
return BSON("ok" << 1);
});
@@ -1322,6 +1764,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
RouterShouldWorkAsRecoveryRouterEvenIfItHasSeenPreviousTransactions) {
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setTxnNumber(txnNum);
@@ -1333,8 +1776,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// Run a cross-shard transaction with two-phase commit. The commit should be sent with the
// correct participant list.
{
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -1342,8 +1787,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ txnRetryCounter);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1368,7 +1815,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
expectedParticipants.erase(shardId);
}
- checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */);
return BSON("ok" << 1);
});
@@ -1381,7 +1829,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
{
++txnNum;
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, txnRetryCounter);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1399,7 +1848,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
auto participantElements = request.cmdObj["participants"].Array();
ASSERT_TRUE(participantElements.empty());
- checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */);
checkWriteConcern(request.cmdObj, writeConcern);
return BSON("ok" << 1);
@@ -1422,7 +1872,8 @@ TEST_F(TransactionRouterTest, CommitWithEmptyRecoveryToken) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, 0 /* txnRetryCounter */);
TxnRecoveryToken recoveryToken;
ASSERT_THROWS_CODE(txnRouter.commitTransaction(operationContext(), recoveryToken),
@@ -1443,7 +1894,8 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, 0 /* txnRetryCounter */);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(ShardId("magicShard"));
@@ -1461,12 +1913,56 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) {
ASSERT_THROWS_CODE(future.default_timed_get(), DBException, ErrorCodes::ShardNotFound);
}
+TEST_F(TransactionRouterTestWithDefaultSession, CommitWithRecoveryTokenAndTxnRetryCounter) {
+ TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{1};
+
+ operationContext()->setTxnNumber(txnNum);
+
+ WriteConcernOptions writeConcern(10, WriteConcernOptions::SyncMode::NONE, 0);
+ operationContext()->setWriteConcern(writeConcern);
+
+ auto txnRouter = TransactionRouter::get(operationContext());
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kCommit,
+ txnRetryCounter);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+
+ TxnRecoveryToken recoveryToken;
+ recoveryToken.setRecoveryShardId(shard1);
+
+ auto future =
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
+
+ onCommand([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(hostAndPort1, request.target);
+ ASSERT_EQ("admin", request.dbname);
+
+ auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
+ ASSERT_EQ(cmdName, "coordinateCommitTransaction");
+
+ auto participantElements = request.cmdObj["participants"].Array();
+ ASSERT_TRUE(participantElements.empty());
+
+ checkSessionDetails(
+ request.cmdObj, getSessionId(), txnNum, txnRetryCounter, true /* isCoordinator */);
+ checkWriteConcern(request.cmdObj, writeConcern);
+
+ return BSON("ok" << 1);
+ });
+
+ future.default_timed_get();
+}
+
TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsResetAtClusterTime) {
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedReadConcern = BSON("level"
@@ -1513,8 +2009,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
BSONObj expectedReadConcern = BSON("level"
@@ -1552,8 +2050,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// Later statements cannot change atClusterTime.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
LogicalTime laterTimeNewStmt(Timestamp(1000, 1));
ASSERT_GT(laterTimeNewStmt, laterTimeSameStmt);
@@ -1574,8 +2074,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipa
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Successfully start a transaction on two shards, selecting one as the coordinator.
@@ -1624,8 +2126,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAft
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT(txnRouter.canContinueOnSnapshotError());
@@ -1633,21 +2137,27 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAft
txnRouter.setDefaultAtClusterTime(operationContext());
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
ASSERT_FALSE(txnRouter.canContinueOnSnapshotError());
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
ASSERT_FALSE(txnRouter.canContinueOnSnapshotError());
}
TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreatedAt) {
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Transaction 1 contacts shard1 and shard2 during the first command, then shard3 in the second
@@ -1661,8 +2171,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate
ASSERT_EQ(txnRouter.getParticipant(shard2)->stmtIdCreatedAt, initialStmtId);
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
ShardId shard3("shard3");
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {});
@@ -1677,8 +2189,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate
repl::ReadConcernArgs::get(operationContext()) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern);
TxnNumber txnNum2{5};
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum2,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {});
@@ -1688,8 +2202,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate
ASSERT_EQ(txnRouter.getParticipant(shard2)->stmtIdCreatedAt, initialStmtId);
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum2, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum2,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
ASSERT_EQ(txnRouter.getParticipant(shard1)->stmtIdCreatedAt, initialStmtId + 1);
@@ -1700,8 +2216,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Start a transaction on two shards, selecting one as the coordinator, but simulate a
@@ -1745,8 +2263,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClea
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// First statement successfully targets one shard, selecing it as the coordinator.
@@ -1760,8 +2280,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClea
// least one of them.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {});
@@ -1789,8 +2311,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
// First statement selects an atClusterTime.
@@ -1800,8 +2324,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// change the atClusterTime.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
LogicalTime laterTime(Timestamp(1000, 1));
ASSERT_GT(laterTime, kInMemoryLogicalTime);
@@ -1842,8 +2368,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, WritesCanOnlyBeRetriedIfFirstOve
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
for (auto writeCmd : writeCmds) {
@@ -1857,8 +2385,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, WritesCanOnlyBeRetriedIfFirstOve
// Advance to the next command.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
for (auto writeCmd : writeCmds) {
ASSERT_FALSE(txnRouter.canContinueOnStaleShardOrDbError(writeCmd, kDummyStatus));
@@ -1879,7 +2409,8 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_THROWS_CODE(
@@ -1889,6 +2420,7 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) {
TEST_F(TransactionRouterTest, AbortForSingleParticipant) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setLogicalSessionId(lsid);
@@ -1897,7 +2429,8 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -1910,7 +2443,8 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) {
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "abortTransaction");
- checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */);
return kOkReadOnlyFalseResponse;
});
@@ -1922,6 +2456,7 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) {
TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setLogicalSessionId(lsid);
@@ -1930,7 +2465,8 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
@@ -1951,7 +2487,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) {
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "abortTransaction");
- checkSessionDetails(request.cmdObj, lsid, txnNum, target->second);
+ checkSessionDetails(request.cmdObj, lsid, txnNum, txnRetryCounter, target->second);
targets.erase(request.target);
return kOkReadOnlyFalseResponse;
@@ -1965,6 +2501,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) {
TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransaction) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setLogicalSessionId(lsid);
@@ -1973,7 +2510,8 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransa
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
@@ -1997,7 +2535,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransa
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "abortTransaction");
- checkSessionDetails(request.cmdObj, lsid, txnNum, target->second);
+ checkSessionDetails(request.cmdObj, lsid, txnNum, txnRetryCounter, target->second);
targets.erase(request.target);
@@ -2014,6 +2552,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransa
TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setLogicalSessionId(lsid);
@@ -2022,7 +2561,8 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
@@ -2046,7 +2586,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "abortTransaction");
- checkSessionDetails(request.cmdObj, lsid, txnNum, target->second);
+ checkSessionDetails(request.cmdObj, lsid, txnNum, txnRetryCounter, target->second);
targets.erase(request.target);
@@ -2068,8 +2608,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNe
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// One shard is targeted by the first statement.
@@ -2097,8 +2639,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNe
// Advance to a later client statement that targets a new shard.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
auto secondShardCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
ASSERT_TRUE(secondShardCmd["startTransaction"].trueValue());
@@ -2126,7 +2670,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Should not throw.
@@ -2136,6 +2681,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) {
TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setLogicalSessionId(lsid);
@@ -2144,7 +2690,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2158,7 +2705,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) {
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "abortTransaction");
- checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */);
return kOkReadOnlyFalseResponse;
});
@@ -2169,6 +2717,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) {
TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setLogicalSessionId(lsid);
@@ -2177,7 +2726,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
@@ -2197,7 +2747,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) {
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "abortTransaction");
- checkSessionDetails(request.cmdObj, lsid, txnNum, target->second);
+ checkSessionDetails(request.cmdObj, lsid, txnNum, txnRetryCounter, target->second);
targets.erase(request.target);
return kOkReadOnlyFalseResponse;
@@ -2210,6 +2760,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) {
TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
+ TxnRetryCounter txnRetryCounter{0};
auto opCtx = operationContext();
opCtx->setLogicalSessionId(lsid);
@@ -2218,7 +2769,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, txnRetryCounter);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2232,7 +2784,8 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) {
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "abortTransaction");
- checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+ checkSessionDetails(
+ request.cmdObj, lsid, txnNum, txnRetryCounter, true /* isCoordinator */);
return BSON("ok" << 0);
});
@@ -2249,7 +2802,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, AbortPropagatesWriteConcern) {
WriteConcernOptions writeConcern(10, WriteConcernOptions::SyncMode::NONE, 0);
opCtx->setWriteConcern(writeConcern);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(opCtx);
txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {});
@@ -2272,8 +2826,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinueOnlyOnStaleVersionOnFirs
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2311,8 +2867,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinueOnlyOnStaleVersionOnFirs
// Start a new transaction statement.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
// Cannot retry on a stale config error with one participant after the first statement.
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2324,13 +2882,17 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinuingTransactionPlacesItsRe
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
ASSERT(repl::ReadConcernArgs::get(operationContext()).getLevel() ==
repl::ReadConcernLevel::kSnapshotReadConcern);
@@ -2341,14 +2903,18 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
// First statement does not select an atClusterTime, but does not target any participants.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
// Subsequent statement does select an atClusterTime and does target a participant.
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2366,8 +2932,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// The next statement cannot change the atClusterTime.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
LogicalTime laterTimeSameStmt(Timestamp(100, 1));
ASSERT_GT(laterTimeSameStmt, kInMemoryLogicalTime);
@@ -2388,8 +2956,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernHasNoAtClu
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second);
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum++,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
// No atClusterTime is placed on the router by default.
ASSERT_FALSE(txnRouter.mustUseAtClusterTime());
@@ -2410,8 +2980,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second);
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum++,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
const BSONObj expectedRC = BSON("level" << rcIt.first);
@@ -2447,8 +3019,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
repl::ReadConcernArgs(clusterTime, rcIt.second);
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum++,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
@@ -2468,8 +3042,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernLevelsPres
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(opTime, rcIt.second);
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum++,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
// Call setDefaultAtClusterTime to simulate real command execution.
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2488,8 +3064,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
//
// NoSuchTransaction is ignored when it is the top-level error code.
@@ -2518,8 +3096,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
//
// Retryable top-level error.
@@ -2550,8 +3130,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
//
// Non-retryable top-level error.
@@ -2582,8 +3164,10 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
// Add some participants to the list.
@@ -2599,8 +3183,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2614,8 +3200,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2639,8 +3227,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2664,8 +3254,10 @@ TEST_F(
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2690,8 +3282,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2715,8 +3309,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -2733,8 +3329,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
repl::ReadConcernArgs secondRequestEmptyReadConcern;
repl::ReadConcernArgs::get(operationContext()) = secondRequestEmptyReadConcern;
- txnRouter.beginOrContinueTxn(
- operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ txnNum,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
// The router should throw regardless of whether the response says readOnly true or false.
ASSERT_THROWS_CODE(
@@ -2753,14 +3351,16 @@ TEST_F(TransactionRouterTestWithDefaultSession,
auto opCtx = operationContext();
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(opCtx);
txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {});
// Continue causes the _latestStmtId to be bumped.
repl::ReadConcernArgs::get(opCtx) = repl::ReadConcernArgs();
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kContinue, 0 /* txnRetryCounter */);
// Aborting will set the termination initiation state.
auto future = launchAsync([&] { txnRouter.abortTransaction(opCtx); });
@@ -2779,7 +3379,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
auto opCtx = operationContext();
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(opCtx);
txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {});
@@ -2801,7 +3402,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
auto opCtx = operationContext();
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(opCtx);
txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {});
@@ -2812,7 +3414,8 @@ TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.getParticipant(shard1)->readOnly);
// Commit causes the _latestStmtId to be bumped.
- txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(
+ opCtx, txnNum, TransactionRouter::TransactionActions::kCommit, 0 /* txnRetryCounter */);
// Committing will set the termination initiation state.
auto future = launchAsync([&] { txnRouter.commitTransaction(opCtx, boost::none); });
@@ -2837,8 +3440,10 @@ protected:
TransactionRouterTestWithDefaultSession::setUp();
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
}
};
@@ -2909,27 +3514,35 @@ protected:
}
void beginTxnWithDefaultTxnNumber() {
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter().setDefaultAtClusterTime(operationContext());
}
void beginSlowTxnWithDefaultTxnNumber() {
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter().setDefaultAtClusterTime(operationContext());
tickSource()->advance(Milliseconds(serverGlobalParams.slowMS + 1));
}
void beginRecoverCommitWithDefaultTxnNumber() {
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
txnRouter().setDefaultAtClusterTime(operationContext());
}
void beginSlowRecoverCommitWithDefaultTxnNumber() {
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
txnRouter().setDefaultAtClusterTime(operationContext());
tickSource()->advance(Milliseconds(serverGlobalParams.slowMS + 1));
}
@@ -3104,8 +3717,10 @@ protected:
}
void runRecoverWithTokenCommit(boost::optional<ShardId> recoveryShard) {
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(recoveryShard);
@@ -3347,8 +3962,10 @@ TEST_F(TransactionRouterMetricsTest, SlowLoggingPrintsTimeActiveAndInactive) {
tickSource()->advance(Microseconds(222222));
assertTimeInactiveIs(Microseconds(222222));
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
runCommit(kDummyOkRes);
ASSERT_EQUALS(
@@ -3786,8 +4403,10 @@ TEST_F(TransactionRouterMetricsTest, DurationResetByNewTransaction) {
txnRouter().commitTransaction(operationContext(), kDummyRecoveryToken);
// Start a new transaction and verify the duration was reset.
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
assertDurationIs(Microseconds(0));
tickSource()->advance(Microseconds(50));
@@ -3865,8 +4484,10 @@ TEST_F(TransactionRouterMetricsTest, CommitDurationResetByNewTransaction) {
future.default_timed_get();
// Start a new transaction and verify the commit duration was reset.
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
future = beginAndPauseCommit();
@@ -3997,8 +4618,10 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceSeparatelyAndSu
// Will not advance after commit.
// Neither can advance after a successful commit.
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
runCommit(kDummyOkRes);
tickSource()->advance(Microseconds(150));
@@ -4065,8 +4688,10 @@ TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedActiveTxn) {
assertTimeInactiveIs(kDefaultTimeInactive);
assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive);
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
tickSource()->advance(Microseconds(100));
assertTimeActiveIs(kDefaultTimeActive + Microseconds(100));
@@ -4089,8 +4714,10 @@ TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedEndedTxn) {
assertTimeInactiveIs(kDefaultTimeInactive);
assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive);
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
runCommit(kDummyOkRes);
txnRouter().stash(operationContext());
@@ -4112,8 +4739,10 @@ TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterSuc
setUpDefaultTimeActiveAndInactive();
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
runCommit(kDummyOkRes);
// Neither can advance.
@@ -4126,8 +4755,10 @@ TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterFai
setUpDefaultTimeActiveAndInactive();
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
runCommit(kDummyErrorRes);
// Neither can advance.
@@ -4140,8 +4771,10 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom
setUpDefaultTimeActiveAndInactive();
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
runCommit(Status(ErrorCodes::HostUnreachable, "dummy"), true /* expectRetries */);
// timeActive can advance.
@@ -4155,8 +4788,10 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom
assertTimeActiveIs(kDefaultTimeActive + Microseconds(100));
assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100));
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
runCommit(kDummyRetryableErrorRes, true /* expectRetries */);
// timeActive can advance.
@@ -4170,8 +4805,10 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom
assertTimeActiveIs(kDefaultTimeActive + Microseconds(200));
assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(200));
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kCommit,
+ 0 /* txnRetryCounter */);
runCommit(kDummyOkRes);
// The result is known, so neither can advance.
@@ -4185,8 +4822,10 @@ TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterAbo
setUpDefaultTimeActiveAndInactive();
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
ASSERT_THROWS_CODE(txnRouter().abortTransaction(operationContext()),
AssertionException,
ErrorCodes::NoSuchTransaction);
@@ -4201,8 +4840,10 @@ TEST_F(TransactionRouterMetricsTest, NeitherTimeActiveNorInactiveAdvanceAfterImp
setUpDefaultTimeActiveAndInactive();
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
txnRouter().implicitlyAbortTransaction(operationContext(), kDummyStatus);
// Neither can advance.
@@ -4241,8 +4882,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_Stash) {
TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAfterStash) {
beginRecoverCommitWithDefaultTxnNumber();
txnRouter().stash(operationContext());
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
@@ -4256,22 +4899,28 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_AreNotCumulative) {
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentActive());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
// Test inactive.
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 2, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 2,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter().stash(operationContext());
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 3, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 3,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter().stash(operationContext());
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
@@ -4328,8 +4977,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAndStashForEndedT
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
@@ -4402,8 +5053,10 @@ TEST_F(TransactionRouterTest, RouterMetricsCurrent_ReapForInactiveTxn) {
// Start a transaction on the session.
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), 5, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ 5,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_EQUALS(1L, routerTxnMetrics->getCurrentOpen());
@@ -4506,8 +5159,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalStarted_IsCumulative) {
beginTxnWithDefaultTxnNumber();
ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalStarted());
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalStarted());
// Shouldn't go down when a transaction ends.
@@ -4558,8 +5213,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalCommitted_NotIncreasedByI
TEST_F(TransactionRouterMetricsTest,
RouterMetricsTotalCommitted_NotIncreasedByAbandonedTransaction) {
beginTxnWithDefaultTxnNumber();
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalCommitted());
}
@@ -4574,8 +5231,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalCommitted_IsCumulative) {
runCommit(kDummyOkRes);
ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalCommitted());
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
runCommit(kDummyOkRes);
ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalCommitted());
}
@@ -4602,8 +5261,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalAborted_NotIncreasedByUnk
TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalAborted_NotIncreasedByAbandonedTransaction) {
beginTxnWithDefaultTxnNumber();
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalAborted());
}
@@ -4640,8 +5301,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalAborted_IsCumulative) {
ErrorCodes::NoSuchTransaction);
ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalAborted());
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
ASSERT_THROWS_CODE(txnRouter().abortTransaction(operationContext()),
AssertionException,
ErrorCodes::NoSuchTransaction);
@@ -4667,8 +5330,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalContactedParticipants) {
ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalContactedParticipants());
// Is cumulative across transactions.
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalContactedParticipants());
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -4750,8 +5415,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalParticipantsAtCommit) {
ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalParticipantsAtCommit());
// Is cumulative across transactions.
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
runCommit(kDummyOkRes);
ASSERT_EQUALS(3L, routerTxnMetrics()->getTotalParticipantsAtCommit());
@@ -4832,8 +5499,10 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCommitTypeStatsSuccessfulDurat
.successfulDurationMicros.load());
// Start a new transaction and verify that successful commit duration is cumulative.
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber + 1,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter().setDefaultAtClusterTime(operationContext());
future = beginAndPauseCommit();
tickSource()->advance(Microseconds(100));
@@ -4948,8 +5617,10 @@ TEST_F(TransactionRouterMetricsTest, ReportResourcesWithParticipantList) {
txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
txnRouter().processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse);
- txnRouter().beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
+ txnRouter().beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kContinue,
+ 0 /* txnRetryCounter */);
// Verify participants array has been updated with proper ReadOnly responses.
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index 6f8f077859d..8f09574e608 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -1843,8 +1843,10 @@ public:
_scopedSession.emplace(operationContext());
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
}
@@ -1994,8 +1996,10 @@ public:
_scopedSession.emplace(operationContext());
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
}
@@ -2126,8 +2130,10 @@ public:
_scopedSession.emplace(operationContext());
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
txnRouter.setDefaultAtClusterTime(operationContext());
}
diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp
index 43fc32f13b2..7e53707d78d 100644
--- a/src/mongo/s/write_ops/batch_write_op_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_op_test.cpp
@@ -1553,8 +1553,10 @@ public:
_scopedSession.emplace(operationContext());
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter.beginOrContinueTxn(
- operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(operationContext(),
+ kTxnNumber,
+ TransactionRouter::TransactionActions::kStart,
+ 0 /* txnRetryCounter */);
}
void tearDown() override {
diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp
index 68851b61cd4..4ba35fb5470 100644
--- a/src/mongo/s/write_ops/write_op_test.cpp
+++ b/src/mongo/s/write_ops/write_op_test.cpp
@@ -411,7 +411,8 @@ TEST_F(WriteOpTransactionTest, TargetMultiAllShardsAndErrorSingleChildOp) {
_opCtx->setTxnNumber(kTxnNumber);
auto txnRouter = TransactionRouter::get(_opCtx);
- txnRouter.beginOrContinueTxn(_opCtx, kTxnNumber, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(
+ _opCtx, kTxnNumber, TransactionRouter::TransactionActions::kStart, 0 /* txnRetryCounter */);
// Do multi-target write op
WriteOp writeOp(BatchItemRef(&request, 0), true);