summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--buildscripts/resmokeconfig/suites/causally_consistent_jscore_txns_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/core_txns.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets.yml3
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth_0.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth_1.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth_2.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth_3.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth_4.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth_5.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth_6.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth_misc.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese_0.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese_1.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese_2.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese_3.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese_4.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese_5.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese_6.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_ese_misc.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml1
-rw-r--r--buildscripts/templates/generate_resmoke_suites/replica_sets_auth.yml.j21
-rw-r--r--buildscripts/templates/generate_resmoke_suites/replica_sets_ese.yml.j21
-rw-r--r--jstests/core/txns/empty_prepare.js11
-rw-r--r--jstests/core/txns/ensure_active_txn_for_prepare_transaction.js3
-rw-r--r--jstests/core/txns/errors_on_committed_transaction.js19
-rw-r--r--jstests/core/txns/libs/prepare_helpers.js4
-rw-r--r--jstests/core/txns/no_writes_to_config_transactions_with_prepared_transaction.js1
-rw-r--r--jstests/core/txns/prepare_nonexistent_transaction.js42
-rw-r--r--jstests/core/txns/prepare_requires_fcv42.js5
-rw-r--r--jstests/core/txns/statement_ids_accepted.js2
-rw-r--r--jstests/noPassthrough/server_transaction_metrics_for_prepared_transactions.js2
-rw-r--r--jstests/replsets/prepare_prepared_transaction_wc_timeout.js1
-rw-r--r--jstests/sharding/restart_transactions.js8
-rw-r--r--jstests/sharding/txn_basic_two_phase_commit.js277
-rw-r--r--jstests/sharding/txn_coordinator_commands_basic_requirements.js47
-rw-r--r--src/mongo/base/error_codes.err2
-rw-r--r--src/mongo/db/commands/txn_cmds.cpp17
-rw-r--r--src/mongo/db/commands/txn_two_phase_commit_cmds.idl4
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp271
-rw-r--r--src/mongo/db/transaction_coordinator.cpp205
-rw-r--r--src/mongo/db/transaction_coordinator.h185
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.cpp8
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.h5
-rw-r--r--src/mongo/db/transaction_coordinator_catalog_test.cpp72
-rw-r--r--src/mongo/db/transaction_coordinator_commands_impl.cpp282
-rw-r--r--src/mongo/db/transaction_coordinator_commands_impl.h45
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp117
-rw-r--r--src/mongo/db/transaction_coordinator_service.h48
-rw-r--r--src/mongo/db/transaction_coordinator_service_test.cpp271
-rw-r--r--src/mongo/db/transaction_coordinator_state_machine_test.cpp63
-rw-r--r--src/mongo/db/transaction_coordinator_test.cpp112
-rw-r--r--src/mongo/s/transaction_router.cpp36
-rw-r--r--src/mongo/s/transaction_router_test.cpp17
-rw-r--r--src/mongo/shell/servers.js10
-rw-r--r--src/mongo/shell/utils.js2
64 files changed, 1026 insertions, 1201 deletions
diff --git a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_txns_passthrough.yml b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_txns_passthrough.yml
index e1222e352a5..1078b12b4ce 100644
--- a/buildscripts/resmokeconfig/suites/causally_consistent_jscore_txns_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/causally_consistent_jscore_txns_passthrough.yml
@@ -35,6 +35,5 @@ executor:
set_parameters:
enableTestCommands: 1
numInitialSyncAttempts: 1
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
# Use a 1-node replica set.
num_nodes: 1
diff --git a/buildscripts/resmokeconfig/suites/core_txns.yml b/buildscripts/resmokeconfig/suites/core_txns.yml
index e5298c947a6..e17ed4e02b5 100644
--- a/buildscripts/resmokeconfig/suites/core_txns.yml
+++ b/buildscripts/resmokeconfig/suites/core_txns.yml
@@ -29,6 +29,5 @@ executor:
set_parameters:
enableTestCommands: 1
numInitialSyncAttempts: 1
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
# Use a 1-node replica set.
num_nodes: 1
diff --git a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml
index 1cda89d0369..7c05723aa9f 100644
--- a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_100ms_refresh_jscore_passthrough.yml
@@ -72,5 +72,4 @@ executor:
numInitialSyncAttempts: 1
disableLogicalSessionCacheRefresh: false
logicalSessionRefreshMillis: 100
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
num_nodes: 3
diff --git a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml
index 9320b637352..e967ce0755b 100644
--- a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_10sec_refresh_jscore_passthrough.yml
@@ -71,5 +71,4 @@ executor:
numInitialSyncAttempts: 1
disableLogicalSessionCacheRefresh: false
logicalSessionRefreshMillis: 10000
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
num_nodes: 3
diff --git a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml
index 352115f2df3..22d87a766d2 100644
--- a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_1sec_refresh_jscore_passthrough.yml
@@ -71,5 +71,4 @@ executor:
numInitialSyncAttempts: 1
disableLogicalSessionCacheRefresh: false
logicalSessionRefreshMillis: 1000
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
num_nodes: 3
diff --git a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml
index 8327f70eb11..4467f26d565 100644
--- a/buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/logical_session_cache_replication_default_refresh_jscore_passthrough.yml
@@ -70,5 +70,4 @@ executor:
enableTestCommands: 1
numInitialSyncAttempts: 1
disableLogicalSessionCacheRefresh: false
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
num_nodes: 3
diff --git a/buildscripts/resmokeconfig/suites/replica_sets.yml b/buildscripts/resmokeconfig/suites/replica_sets.yml
index 2bd7ada38da..f7515dab9ab 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets.yml
@@ -8,7 +8,4 @@ executor:
config:
shell_options:
nodb: ''
- global_vars:
- TestData:
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth.yml
index 1e560c8803d..8dbde6e9005 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth.yml
@@ -24,6 +24,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth_0.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth_0.yml
index 7d3b2513d03..6ab934e9e92 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth_0.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth_0.yml
@@ -33,6 +33,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth_1.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth_1.yml
index 29ea03801d4..08e7b6c3367 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth_1.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth_1.yml
@@ -41,6 +41,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth_2.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth_2.yml
index e17ef95e839..309350144c2 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth_2.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth_2.yml
@@ -48,6 +48,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth_3.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth_3.yml
index 075cbfde463..4b4b41f8039 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth_3.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth_3.yml
@@ -56,6 +56,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth_4.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth_4.yml
index 2a40200e3d7..e3d16727876 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth_4.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth_4.yml
@@ -75,6 +75,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth_5.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth_5.yml
index 7ea2840a523..f3ebddc8fad 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth_5.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth_5.yml
@@ -99,6 +99,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth_6.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth_6.yml
index ed2c3801e25..2e450c9189f 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth_6.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth_6.yml
@@ -71,6 +71,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth_misc.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth_misc.yml
index 7ed8350b3e3..13cc7f618b5 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth_misc.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth_misc.yml
@@ -237,6 +237,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese.yml
index 7d855ff63e6..3e202ae3c1b 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese.yml
@@ -16,5 +16,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_0.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_0.yml
index 5f60ffe213d..a1c5a4615ac 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese_0.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_0.yml
@@ -27,5 +27,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_1.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_1.yml
index 16fdf1121f1..806294e0feb 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese_1.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_1.yml
@@ -37,5 +37,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_2.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_2.yml
index c697d71b15e..3b202f9492a 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese_2.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_2.yml
@@ -43,5 +43,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_3.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_3.yml
index 4b86c2e1231..878bd23580a 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese_3.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_3.yml
@@ -49,5 +49,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_4.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_4.yml
index bdf8a9ec504..10f43df15fa 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese_4.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_4.yml
@@ -64,5 +64,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_5.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_5.yml
index 055afc0e35a..6258010f231 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese_5.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_5.yml
@@ -87,5 +87,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_6.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_6.yml
index d22fed32057..716a4ee78c9 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese_6.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_6.yml
@@ -80,5 +80,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_ese_misc.yml b/buildscripts/resmokeconfig/suites/replica_sets_ese_misc.yml
index 015d7d7e9a7..d69ed8dc58b 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_ese_misc.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_ese_misc.yml
@@ -235,5 +235,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
index 16701f02891..5d399f081bf 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_jscore_passthrough.yml
@@ -80,6 +80,5 @@ executor:
numInitialSyncAttempts: 10
collectionClonerBatchSize: 10
initialSyncOplogFetcherBatchSize: 10
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
num_nodes: 2
start_initial_sync_node: True
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
index cd466b980d8..5bcd6f0f62e 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_initsync_static_jscore_passthrough.yml
@@ -51,7 +51,6 @@ executor:
numInitialSyncAttempts: 1
collectionClonerBatchSize: 10
initialSyncOplogFetcherBatchSize: 10
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
num_nodes: 2
start_initial_sync_node: True
replset_config_options:
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
index 29079e322d5..ef4c3244808 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_jscore_passthrough.yml
@@ -62,5 +62,4 @@ executor:
#
# TODO SERVER-35156: Remove the following line to disable the periodic no-op writer.
writePeriodicNoops: 1
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
num_nodes: 2
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
index 7d10a1456ee..0a31217d067 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_kill_secondaries_jscore_passthrough.yml
@@ -56,6 +56,5 @@ executor:
rollback: 2
storage:
recovery: 2
- failpoint.skipShardingPartsOfPrepareTransaction: "{mode:'alwaysOn'}"
num_nodes: 2
voting_secondaries: false
diff --git a/buildscripts/templates/generate_resmoke_suites/replica_sets_auth.yml.j2 b/buildscripts/templates/generate_resmoke_suites/replica_sets_auth.yml.j2
index 7dd29e3b7df..e2f89521be6 100644
--- a/buildscripts/templates/generate_resmoke_suites/replica_sets_auth.yml.j2
+++ b/buildscripts/templates/generate_resmoke_suites/replica_sets_auth.yml.j2
@@ -46,6 +46,5 @@ executor:
authMechanism: SCRAM-SHA-1
keyFile: *keyFile
keyFileData: *keyFileData
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
nodb: ''
readMode: commands
diff --git a/buildscripts/templates/generate_resmoke_suites/replica_sets_ese.yml.j2 b/buildscripts/templates/generate_resmoke_suites/replica_sets_ese.yml.j2
index 04132480cb6..041cbfe22f8 100644
--- a/buildscripts/templates/generate_resmoke_suites/replica_sets_ese.yml.j2
+++ b/buildscripts/templates/generate_resmoke_suites/replica_sets_ese.yml.j2
@@ -39,5 +39,4 @@ executor:
TestData:
enableEncryption: ''
encryptionKeyFile: *keyFile
- setSkipShardingPartsOfPrepareTransactionFailpoint: true
readMode: commands
diff --git a/jstests/core/txns/empty_prepare.js b/jstests/core/txns/empty_prepare.js
index 9fb36a24b28..3b8dc8d3051 100644
--- a/jstests/core/txns/empty_prepare.js
+++ b/jstests/core/txns/empty_prepare.js
@@ -25,17 +25,15 @@
session.startTransaction();
// TODO SERVER-35787: make this fail with NoSuchTransaction.
- assert.commandFailedWithCode(
- sessionDB.adminCommand({prepareTransaction: 1, coordinatorId: "dummy"}),
- ErrorCodes.OperationNotSupportedInTransaction);
+ assert.commandFailedWithCode(sessionDB.adminCommand({prepareTransaction: 1}),
+ ErrorCodes.OperationNotSupportedInTransaction);
session.abortTransaction_forTesting();
// ---- Test 2. Only reads before prepare ----
session.startTransaction();
assert.eq(doc, sessionColl.findOne({a: 1}));
- let res = assert.commandWorked(
- sessionDB.adminCommand({prepareTransaction: 1, coordinatorId: "dummy"}));
+ let res = assert.commandWorked(sessionDB.adminCommand({prepareTransaction: 1}));
// Makes sure prepareTransaction returns prepareTimestamp in its response.
assert(res.hasOwnProperty("prepareTimestamp"), tojson(res));
session.abortTransaction_forTesting();
@@ -47,8 +45,7 @@
assert.eq(res.nMatched, 1, tojson(res));
assert.eq(res.nModified, 0, tojson(res));
assert.eq(res.nUpserted, 0, tojson(res));
- res = assert.commandWorked(
- sessionDB.adminCommand({prepareTransaction: 1, coordinatorId: "dummy"}));
+ res = assert.commandWorked(sessionDB.adminCommand({prepareTransaction: 1}));
// Makes sure prepareTransaction returns prepareTimestamp in its response.
assert(res.hasOwnProperty("prepareTimestamp"), tojson(res));
session.abortTransaction_forTesting();
diff --git a/jstests/core/txns/ensure_active_txn_for_prepare_transaction.js b/jstests/core/txns/ensure_active_txn_for_prepare_transaction.js
index 395af5c7dc6..f3281332b7a 100644
--- a/jstests/core/txns/ensure_active_txn_for_prepare_transaction.js
+++ b/jstests/core/txns/ensure_active_txn_for_prepare_transaction.js
@@ -24,7 +24,6 @@
"the session");
assert.commandFailedWithCode(sessionDB.adminCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: NumberLong(0),
stmtId: NumberInt(1),
autocommit: false
@@ -39,7 +38,6 @@
assert.commandFailedWithCode(sessionDB.adminCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: NumberLong(0),
stmtId: NumberInt(1),
autocommit: false
@@ -54,7 +52,6 @@
assert.commandFailedWithCode(sessionDB.adminCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: NumberLong(1),
stmtId: NumberInt(1),
autocommit: false
diff --git a/jstests/core/txns/errors_on_committed_transaction.js b/jstests/core/txns/errors_on_committed_transaction.js
index dec27851ee9..69e5a0789cc 100644
--- a/jstests/core/txns/errors_on_committed_transaction.js
+++ b/jstests/core/txns/errors_on_committed_transaction.js
@@ -28,32 +28,25 @@
// Call prepare on committed transaction.
jsTestLog("Test that calling prepare on a committed transaction fails.");
- assert.commandFailedWithCode(sessionDB.adminCommand({
- prepareTransaction: 1,
- coordinatorId: "dummy",
- txnNumber: txnNumber,
- autocommit: false
- }),
- ErrorCodes.TransactionCommitted);
+ assert.commandFailedWithCode(
+ sessionDB.adminCommand({prepareTransaction: 1, txnNumber: txnNumber, autocommit: false}),
+ ErrorCodes.TransactionCommitted);
jsTestLog("Test the error precedence when calling prepare on a committed transaction but not " +
"providing txnNumber to prepareTransaction.");
- assert.commandFailedWithCode(
- sessionDB.adminCommand({prepareTransaction: 1, coordinatorId: "dummy", autocommit: false}),
- ErrorCodes.InvalidOptions);
+ assert.commandFailedWithCode(sessionDB.adminCommand({prepareTransaction: 1, autocommit: false}),
+ ErrorCodes.InvalidOptions);
jsTestLog("Test the error precedence when calling prepare on a committed transaction but not " +
"providing autocommit to prepareTransaction.");
assert.commandFailedWithCode(
- sessionDB.adminCommand(
- {prepareTransaction: 1, coordinatorId: "dummy", txnNumber: txnNumber}),
+ sessionDB.adminCommand({prepareTransaction: 1, txnNumber: txnNumber}),
ErrorCodes.InvalidOptions);
jsTestLog("Test the error precedence when calling prepare on a committed transaction and " +
"providing startTransaction to prepareTransaction.");
assert.commandFailedWithCode(sessionDB.adminCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: txnNumber,
autocommit: false,
startTransaction: true
diff --git a/jstests/core/txns/libs/prepare_helpers.js b/jstests/core/txns/libs/prepare_helpers.js
index 37217d04245..b832fbc22ff 100644
--- a/jstests/core/txns/libs/prepare_helpers.js
+++ b/jstests/core/txns/libs/prepare_helpers.js
@@ -15,8 +15,8 @@ const PrepareHelpers = (function() {
function prepareTransaction(session) {
assert(session);
- const res = assert.commandWorked(session.getDatabase('admin').adminCommand(
- {prepareTransaction: 1, coordinatorId: "dummy"}));
+ const res = assert.commandWorked(
+ session.getDatabase('admin').adminCommand({prepareTransaction: 1}));
assert(res.prepareTimestamp,
"prepareTransaction did not return a 'prepareTimestamp': " + tojson(res));
const prepareTimestamp = res.prepareTimestamp;
diff --git a/jstests/core/txns/no_writes_to_config_transactions_with_prepared_transaction.js b/jstests/core/txns/no_writes_to_config_transactions_with_prepared_transaction.js
index d9445aef260..9a450c2c9f0 100644
--- a/jstests/core/txns/no_writes_to_config_transactions_with_prepared_transaction.js
+++ b/jstests/core/txns/no_writes_to_config_transactions_with_prepared_transaction.js
@@ -45,7 +45,6 @@
}));
assert.commandWorked(sessionDB.adminCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: NumberLong(0),
stmtId: NumberInt(1),
autocommit: false
diff --git a/jstests/core/txns/prepare_nonexistent_transaction.js b/jstests/core/txns/prepare_nonexistent_transaction.js
index a7c268fe103..5ef5f74fa88 100644
--- a/jstests/core/txns/prepare_nonexistent_transaction.js
+++ b/jstests/core/txns/prepare_nonexistent_transaction.js
@@ -22,13 +22,10 @@
jsTestLog("Test that if there is no transaction active on the current session, errors with " +
"'NoSuchTransaction'.");
- assert.commandFailedWithCode(sessionDB.adminCommand({
- prepareTransaction: 1,
- coordinatorId: "dummy",
- txnNumber: NumberLong(0),
- autocommit: false
- }),
- ErrorCodes.NoSuchTransaction);
+ assert.commandFailedWithCode(
+ sessionDB.adminCommand(
+ {prepareTransaction: 1, txnNumber: NumberLong(0), autocommit: false}),
+ ErrorCodes.NoSuchTransaction);
jsTestLog("Test that if there is a transaction running on the current session and the " +
"'txnNumber' given is greater than the current transaction, errors with " +
@@ -37,7 +34,6 @@
assert.commandWorked(sessionColl.insert(doc));
assert.commandFailedWithCode(sessionDB.adminCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: NumberLong(session.getTxnNumber_forTesting() + 1),
autocommit: false
}),
@@ -52,7 +48,6 @@
"last known transaction was aborted then it errors with 'NoSuchTransaction'.");
assert.commandFailedWithCode(sessionDB.adminCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: NumberLong(session.getTxnNumber_forTesting()),
autocommit: false
}),
@@ -63,31 +58,24 @@
"'TransactionTooOld'.");
session.startTransaction();
assert.commandWorked(sessionColl.insert(doc));
- assert.commandFailedWithCode(sessionDB.adminCommand({
- prepareTransaction: 1,
- coordinatorId: "dummy",
- txnNumber: NumberLong(0),
- autocommit: false
- }),
- ErrorCodes.TransactionTooOld);
+ assert.commandFailedWithCode(
+ sessionDB.adminCommand(
+ {prepareTransaction: 1, txnNumber: NumberLong(0), autocommit: false}),
+ ErrorCodes.TransactionTooOld);
session.abortTransaction_forTesting();
jsTestLog("Test that if there is no transaction active on the current session and the " +
"'txnNumber' given is less than the current transaction, errors with " +
"'TransactionTooOld'.");
- assert.commandFailedWithCode(sessionDB.adminCommand({
- prepareTransaction: 1,
- coordinatorId: "dummy",
- txnNumber: NumberLong(0),
- autocommit: false
- }),
- ErrorCodes.TransactionTooOld);
+ assert.commandFailedWithCode(
+ sessionDB.adminCommand(
+ {prepareTransaction: 1, txnNumber: NumberLong(0), autocommit: false}),
+ ErrorCodes.TransactionTooOld);
jsTestLog("Test the error precedence when calling prepare on a nonexistent transaction but " +
"not providing txnNumber to prepareTransaction.");
- assert.commandFailedWithCode(
- sessionDB.adminCommand({prepareTransaction: 1, coordinatorId: "dummy", autocommit: false}),
- ErrorCodes.InvalidOptions);
+ assert.commandFailedWithCode(sessionDB.adminCommand({prepareTransaction: 1, autocommit: false}),
+ ErrorCodes.InvalidOptions);
// It isn't ideal that NoSuchTransaction is thrown instead of InvalidOptions here, but it's okay
// to leave as is for now since this case fails in some way.
@@ -95,7 +83,6 @@
"not providing autocommit to prepareTransaction.");
assert.commandFailedWithCode(sessionDB.adminCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: NumberLong(session.getTxnNumber_forTesting() + 1),
}),
ErrorCodes.NoSuchTransaction);
@@ -106,7 +93,6 @@
prepareTransaction: 1,
// The last txnNumber we used was saved on the server's session, so we use a txnNumber that
// is greater than that to make sure it has never been seen before.
- coordinatorId: "dummy",
txnNumber: NumberLong(session.getTxnNumber_forTesting() + 2),
autocommit: false,
startTransaction: true
diff --git a/jstests/core/txns/prepare_requires_fcv42.js b/jstests/core/txns/prepare_requires_fcv42.js
index 43db290ed83..e7147aa13c6 100644
--- a/jstests/core/txns/prepare_requires_fcv42.js
+++ b/jstests/core/txns/prepare_requires_fcv42.js
@@ -35,9 +35,8 @@
jsTestLog("Transaction fails to prepare in last stable FCV.");
session.startTransaction();
assert.commandWorked(sessionDB[collName].insert({_id: "b"}));
- assert.commandFailedWithCode(
- sessionDB.adminCommand({prepareTransaction: 1, coordinatorId: "dummy"}),
- ErrorCodes.CommandNotSupported);
+ assert.commandFailedWithCode(sessionDB.adminCommand({prepareTransaction: 1}),
+ ErrorCodes.CommandNotSupported);
// Abort the transaction in the shell.
session.abortTransaction_forTesting();
diff --git a/jstests/core/txns/statement_ids_accepted.js b/jstests/core/txns/statement_ids_accepted.js
index ff76adc15af..669dfa19fbf 100644
--- a/jstests/core/txns/statement_ids_accepted.js
+++ b/jstests/core/txns/statement_ids_accepted.js
@@ -231,7 +231,6 @@
// prepareTransaction can only be run on the admin database.
assert.commandWorked(sessionDb.adminCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: NumberLong(txnNumber),
stmtId: NumberInt(1),
autocommit: false
@@ -244,7 +243,6 @@
}));
assert.commandFailedWithCode(sessionDb.runCommand({
prepareTransaction: 1,
- coordinatorId: "dummy",
txnNumber: NumberLong(txnNumber++),
stmtId: NumberInt(0),
autocommit: false
diff --git a/jstests/noPassthrough/server_transaction_metrics_for_prepared_transactions.js b/jstests/noPassthrough/server_transaction_metrics_for_prepared_transactions.js
index 0e22ea56fa2..619dc38b5c2 100644
--- a/jstests/noPassthrough/server_transaction_metrics_for_prepared_transactions.js
+++ b/jstests/noPassthrough/server_transaction_metrics_for_prepared_transactions.js
@@ -41,8 +41,6 @@
rst.startSet();
rst.initiate();
const primary = rst.getPrimary();
- assert.commandWorked(primary.adminCommand(
- {configureFailPoint: "skipShardingPartsOfPrepareTransaction", mode: "alwaysOn"}));
// Set up the test database.
const dbName = "test";
diff --git a/jstests/replsets/prepare_prepared_transaction_wc_timeout.js b/jstests/replsets/prepare_prepared_transaction_wc_timeout.js
index 09f7cf69a8c..cbda29be3b1 100644
--- a/jstests/replsets/prepare_prepared_transaction_wc_timeout.js
+++ b/jstests/replsets/prepare_prepared_transaction_wc_timeout.js
@@ -44,7 +44,6 @@
secConn,
{
prepareTransaction: 1,
- coordinatorId: "dummy",
lsid: {id: lsid},
txnNumber: NumberLong(39),
autocommit: false,
diff --git a/jstests/sharding/restart_transactions.js b/jstests/sharding/restart_transactions.js
index ebf93496762..cbe60dfad25 100644
--- a/jstests/sharding/restart_transactions.js
+++ b/jstests/sharding/restart_transactions.js
@@ -108,12 +108,8 @@
autocommit: false,
startTransaction: true
}));
- assert.commandWorked(directDB.adminCommand({
- prepareTransaction: 1,
- coordinatorId: "dummy",
- txnNumber: NumberLong(txnNumber),
- autocommit: false
- }));
+ assert.commandWorked(directDB.adminCommand(
+ {prepareTransaction: 1, txnNumber: NumberLong(txnNumber), autocommit: false}));
assert.commandFailedWithCode(directDB.runCommand({
find: collName,
diff --git a/jstests/sharding/txn_basic_two_phase_commit.js b/jstests/sharding/txn_basic_two_phase_commit.js
index 072f75b70a5..6b164725ae5 100644
--- a/jstests/sharding/txn_basic_two_phase_commit.js
+++ b/jstests/sharding/txn_basic_two_phase_commit.js
@@ -15,175 +15,166 @@
let participant1 = st.shard1;
let participant2 = st.shard2;
- // Create a sharded collection with a chunk on each shard:
- // shard0: [-inf, 0)
- // shard1: [0, 10)
- // shard2: [10, +inf)
- assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
- assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: coordinator.shardName}));
- assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
- assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
- assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 10}}));
- assert.commandWorked(
- st.s.adminCommand({moveChunk: ns, find: {_id: 0}, to: participant1.shardName}));
- assert.commandWorked(
- st.s.adminCommand({moveChunk: ns, find: {_id: 10}, to: participant2.shardName}));
-
- /**
- * Sends an insert on 'collName' to each shard with 'coordinator: true' on the first shard.
- *
- * lsid should be an object with format {id: <UUID>}
- * txnNumber should be an integer
- */
- let setUp = function(lsid, txnNumber) {
- // Simulate that the first statement in the transaction touched 'shard0', so mongos
- // forwarded the statement with 'coordinator: true' to shard0.
- assert.commandWorked(coordinator.getDB(dbName).runCommand({
+ let lsid = {id: UUID()};
+ let txnNumber = 0;
+
+ const startSimulatingNetworkFailures = function(connArray) {
+ connArray.forEach(function(conn) {
+ assert.commandWorked(conn.adminCommand({
+ configureFailPoint: "failCommand",
+ mode: {times: 10},
+ data: {
+ errorCode: ErrorCodes.NotMaster,
+ failCommands:
+ ["prepareTransaction", "abortTransaction", "commitTransaction"]
+ }
+ }));
+ assert.commandWorked(conn.adminCommand({
+ configureFailPoint:
+ "participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic",
+ mode: {times: 5}
+ }));
+ assert.commandWorked(conn.adminCommand({
+ configureFailPoint: "participantReturnNetworkErrorForAbortAfterExecutingAbortLogic",
+ mode: {times: 5}
+ }));
+ assert.commandWorked(conn.adminCommand({
+ configureFailPoint:
+ "participantReturnNetworkErrorForCommitAfterExecutingCommitLogic",
+ mode: {times: 5}
+ }));
+ });
+ };
+
+ const stopSimulatingNetworkFailures = function(connArray) {
+ connArray.forEach(function(conn) {
+ assert.commandWorked(conn.adminCommand({
+ configureFailPoint: "failCommand",
+ mode: "off",
+ }));
+ assert.commandWorked(conn.adminCommand({
+ configureFailPoint:
+ "participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic",
+ mode: "off"
+ }));
+ assert.commandWorked(conn.adminCommand({
+ configureFailPoint: "participantReturnNetworkErrorForAbortAfterExecutingAbortLogic",
+ mode: "off"
+ }));
+ assert.commandWorked(conn.adminCommand({
+ configureFailPoint:
+ "participantReturnNetworkErrorForCommitAfterExecutingCommitLogic",
+ mode: "off"
+ }));
+ });
+ };
+
+ const setUp = function() {
+ // Create a sharded collection with a chunk on each shard:
+ // shard0: [-inf, 0)
+ // shard1: [0, 10)
+ // shard2: [10, +inf)
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ assert.commandWorked(st.s.adminCommand({movePrimary: dbName, to: coordinator.shardName}));
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
+ assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
+ assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 10}}));
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 0}, to: participant1.shardName}));
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 10}, to: participant2.shardName}));
+
+ assert.commandWorked(coordinator.adminCommand({_flushRoutingTableCacheUpdates: ns}));
+ assert.commandWorked(participant1.adminCommand({_flushRoutingTableCacheUpdates: ns}));
+ assert.commandWorked(participant2.adminCommand({_flushRoutingTableCacheUpdates: ns}));
+
+ // Start a new transaction by inserting a document onto each shard.
+ assert.commandWorked(st.s.getDB(dbName).runCommand({
insert: collName,
- documents: [{_id: -5}],
+ documents: [{_id: -5}, {_id: 5}, {_id: 15}],
lsid: lsid,
txnNumber: NumberLong(txnNumber),
stmtId: NumberInt(0),
startTransaction: true,
autocommit: false,
- coordinator: true,
}));
+ };
- // Simulate that some statement in the transaction touched shards 'shard1' and 'shard2'.
- assert.commandWorked(participant1.getDB(dbName).runCommand({
- insert: collName,
- documents: [{_id: 5}],
+ const testTwoPhaseAbort = function(simulateNetworkFailures) {
+ jsTest.log("Testing two-phase abort with simulateNetworkFailures: " +
+ simulateNetworkFailures);
+
+ txnNumber++;
+ setUp();
+
+ // Manually abort the transaction on one of the participants, so that the participant fails
+ // to prepare.
+ assert.commandWorked(participant2.adminCommand({
+ abortTransaction: 1,
lsid: lsid,
txnNumber: NumberLong(txnNumber),
stmtId: NumberInt(0),
- startTransaction: true,
autocommit: false,
}));
- assert.commandWorked(participant2.getDB(dbName).runCommand({
- insert: collName,
- documents: [{_id: 15}],
+
+ if (simulateNetworkFailures) {
+ startSimulatingNetworkFailures([participant1, participant2, coordinator]);
+ }
+ assert.commandFailedWithCode(st.s.adminCommand({
+ commitTransaction: 1,
lsid: lsid,
txnNumber: NumberLong(txnNumber),
stmtId: NumberInt(0),
- startTransaction: true,
autocommit: false,
- }));
+ }),
+ ErrorCodes.NoSuchTransaction);
+ if (simulateNetworkFailures) {
+ stopSimulatingNetworkFailures([participant1, participant2, coordinator]);
+ }
+
+ // Verify that the transaction was aborted on all shards.
+ assert.eq(0, st.s.getDB(dbName).getCollection(collName).find().itcount());
+ st.s.getDB(dbName).getCollection(collName).drop();
};
- /**
- * Calls 'coordinateCommitTransaction' on the coordinator with a participant list
- * containing all the shards.
- *
- * lsid should be an object with format {id: <UUID>}
- * txnNumber should be an integer
- */
- let coordinateCommit = function(lsid, txnNumber) {
- assert.commandWorked(coordinator.adminCommand({
- coordinateCommitTransaction: 1,
- participants: [
- {shardId: coordinator.shardName},
- {shardId: participant1.shardName},
- {shardId: participant2.shardName}
- ],
+ const testTwoPhaseCommit = function(simulateNetworkFailures) {
+ jsTest.log("Testing two-phase commit with simulateNetworkFailures: " +
+ simulateNetworkFailures);
+
+ txnNumber++;
+ setUp();
+
+ if (simulateNetworkFailures) {
+ startSimulatingNetworkFailures([participant1, participant2, coordinator]);
+ }
+ assert.commandWorked(st.s.adminCommand({
+ commitTransaction: 1,
lsid: lsid,
txnNumber: NumberLong(txnNumber),
stmtId: NumberInt(0),
autocommit: false,
}));
+ if (simulateNetworkFailures) {
+ stopSimulatingNetworkFailures([participant1, participant2, coordinator]);
+ }
+
+ // Verify that the transaction was committed on all shards.
+ // Use assert.soon(), because although coordinateCommitTransaction currently blocks until
+ // the commit process is fully complete, it will eventually be changed to only block until
+ // the decision is *written*, at which point the test can pass the operationTime returned by
+ // coordinateCommitTransaction as 'afterClusterTime' in the read to ensure the read sees the
+ // transaction's writes (TODO SERVER-37165).
+ assert.soon(function() {
+ return 3 === st.s.getDB(dbName).getCollection(collName).find().itcount();
+ });
+
+ st.s.getDB(dbName).getCollection(collName).drop();
};
- //
- // Test two-phase abort
- //
-
- let lsid = {id: UUID()};
- let txnNumber = 0;
-
- setUp(lsid, txnNumber);
-
- // Simulate that mongos sends 'prepare' with the coordinator's id to participant 1. This should
- // cause a voteCommit to be sent to the coordinator from participant 1, so that when the
- // coordinator receives voteAbort, it will also send abort to participant 1.
- assert.commandWorked(participant1.adminCommand({
- prepareTransaction: 1,
- coordinatorId: coordinator.shardName,
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(0),
- autocommit: false,
- }));
-
- // Simulate that participant 2 votes to abort.
- assert.commandWorked(coordinator.adminCommand({
- voteAbortTransaction: 1,
- shardId: participant2.shardName,
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(0),
- autocommit: false,
- }));
-
- // Manually abort the transaction on participant 2, since the coordinator will not send
- // abortTransaction to a participant that voted to abort.
- assert.commandWorked(participant2.adminCommand({
- abortTransaction: 1,
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(0),
- autocommit: false,
- }));
-
- // Simulate that mongos sends the participant list to the coordinator, which should have already
- // aborted locally. The coordinator object will no longer exist and coordinateCommit will thus
- // return NoSuchTransaction.
- let error = assert.throws(function() {
- coordinateCommit(lsid, txnNumber);
- }, [], "Expected NoSuchTransaction error");
-
- assert.eq(error.code, 251 /*NoSuchTransaction*/, tojson(error));
-
- // Verify that the transaction was aborted on all shards.
- assert.eq(0, st.s.getDB(dbName).getCollection(collName).find().itcount());
-
- //
- // Test two-phase commit
- //
-
- txnNumber++;
-
- setUp(lsid, txnNumber);
-
- // Simulate that mongos sends 'prepare' with the coordinator's id to the non-coordinator
- // participants.
- assert.commandWorked(participant1.adminCommand({
- prepareTransaction: 1,
- coordinatorId: coordinator.shardName,
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(0),
- autocommit: false,
- }));
- assert.commandWorked(participant2.adminCommand({
- prepareTransaction: 1,
- coordinatorId: coordinator.shardName,
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(0),
- autocommit: false,
- }));
-
- // Simulate that mongos sends the participant list to the coordinator.
- coordinateCommit(lsid, txnNumber);
-
- // Verify that the transaction was committed on all shards.
- // Use assert.soon(), because although coordinateCommitTransaction currently blocks until the
- // commit process is fully complete, it will eventually be changed to only block until the
- // decision is *written*, at which point the test can pass the operationTime returned by
- // coordinateCommitTransaction as 'afterClusterTime' in the read to ensure the read sees the
- // transaction's writes (TODO SERVER-37165).
- assert.soon(function() {
- return 3 === st.s.getDB(dbName).getCollection(collName).find().itcount();
- });
+ testTwoPhaseAbort(false);
+ testTwoPhaseCommit(false);
+ testTwoPhaseAbort(true);
+ testTwoPhaseCommit(true);
st.stop();
diff --git a/jstests/sharding/txn_coordinator_commands_basic_requirements.js b/jstests/sharding/txn_coordinator_commands_basic_requirements.js
index 6586fa2e595..1e48f4f5ad5 100644
--- a/jstests/sharding/txn_coordinator_commands_basic_requirements.js
+++ b/jstests/sharding/txn_coordinator_commands_basic_requirements.js
@@ -15,28 +15,7 @@
const checkCoordinatorCommandsRejected = function(conn, expectedErrorCode) {
assert.commandFailedWithCode(conn.adminCommand({
coordinateCommitTransaction: 1,
- participants: [{shardId: "voteCommitDummy"}, {shardId: "voteAbortDummy"}],
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(1),
- autocommit: false
- }),
- expectedErrorCode);
-
- assert.commandFailedWithCode(conn.adminCommand({
- voteCommitTransaction: 1,
- shardId: "voteCommitDummy",
- prepareTimestamp: Timestamp(0, 0),
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(1),
- autocommit: false
- }),
- expectedErrorCode);
-
- assert.commandFailedWithCode(conn.adminCommand({
- voteAbortTransaction: 1,
- shardId: "voteAbortDummy",
+ participants: [{shardId: "dummy1"}, {shardId: "dummy2"}],
lsid: lsid,
txnNumber: NumberLong(txnNumber),
stmtId: NumberInt(1),
@@ -47,31 +26,9 @@
const checkCoordinatorCommandsAgainstNonAdminDbRejected = function(conn) {
const testDB = conn.getDB(dbName);
-
assert.commandFailedWithCode(testDB.runCommand({
coordinateCommitTransaction: 1,
- participants: [{shardId: "voteCommitDummy"}, {shardId: "voteAbortDummy"}],
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(0),
- autocommit: false
- }),
- ErrorCodes.Unauthorized);
-
- assert.commandFailedWithCode(testDB.runCommand({
- voteCommitTransaction: 1,
- shardId: "voteCommitDummy",
- prepareTimestamp: Timestamp(0, 0),
- lsid: lsid,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(0),
- autocommit: false
- }),
- ErrorCodes.Unauthorized);
-
- assert.commandFailedWithCode(testDB.runCommand({
- voteAbortTransaction: 1,
- shardId: "voteAbortDummy",
+ participants: [{shardId: "dummy1"}, {shardId: "dummy2"}],
lsid: lsid,
txnNumber: NumberLong(txnNumber),
stmtId: NumberInt(0),
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 6b2ff3d0762..0d79c3a1618 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -333,3 +333,5 @@ error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag",
error_class("ExceededTimeLimitError", ["ExceededTimeLimit", "MaxTimeMSExpired", "NetworkInterfaceExceededTimeLimit"])
error_class("SnapshotError", ["SnapshotTooOld", "SnapshotUnavailable", "StaleChunkHistory", "MigrationConflict"])
+
+error_class("VoteAbortError", ["NoSuchTransaction", "TransactionTooOld"])
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp
index 99ce548cd23..c043a38dba8 100644
--- a/src/mongo/db/commands/txn_cmds.cpp
+++ b/src/mongo/db/commands/txn_cmds.cpp
@@ -46,6 +46,9 @@
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForAbortAfterExecutingAbortLogic);
+MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForCommitAfterExecutingCommitLogic);
+
class CmdCommitTxn : public BasicCommand {
public:
CmdCommitTxn() : BasicCommand("commitTransaction") {}
@@ -94,6 +97,11 @@ public:
// commit oplog entry.
auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient());
replClient.setLastOpToSystemLastOpTime(opCtx);
+ if (MONGO_FAIL_POINT(participantReturnNetworkErrorForCommitAfterExecutingCommitLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
+ }
+
return true;
}
@@ -109,6 +117,10 @@ public:
// commitUnpreparedTransaction will throw if the transaction is prepared.
txnParticipant->commitUnpreparedTransaction(opCtx);
}
+ if (MONGO_FAIL_POINT(participantReturnNetworkErrorForCommitAfterExecutingCommitLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
+ }
return true;
}
@@ -172,6 +184,11 @@ public:
<< exceptionToStatus());
throw;
}
+ if (MONGO_FAIL_POINT(participantReturnNetworkErrorForAbortAfterExecutingAbortLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
+ }
+
return true;
}
diff --git a/src/mongo/db/commands/txn_two_phase_commit_cmds.idl b/src/mongo/db/commands/txn_two_phase_commit_cmds.idl
index d90cc4fa49a..0b77fc9827f 100644
--- a/src/mongo/db/commands/txn_two_phase_commit_cmds.idl
+++ b/src/mongo/db/commands/txn_two_phase_commit_cmds.idl
@@ -45,10 +45,6 @@ commands:
description: "Parser for the 'prepareTransaction' command."
strict: true
namespace: ignored
- fields:
- coordinatorId:
- description: "The coordinator shard for this transaction."
- type: shard_id
voteCommitTransaction:
description: "Parser for the 'voteCommitTransaction' command."
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index cb4766efc4e..e535c31fc5b 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -48,7 +48,7 @@
namespace mongo {
namespace {
-MONGO_FAIL_POINT_DEFINE(skipShardingPartsOfPrepareTransaction);
+MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic);
class PrepareTransactionCmd : public TypedCommand<PrepareTransactionCmd> {
public:
@@ -71,13 +71,9 @@ public:
using InvocationBase::InvocationBase;
Response typedRun(OperationContext* opCtx) {
- // In production, only config servers or initialized shard servers can participate in a
- // sharded transaction. However, many test suites test the replication and storage parts
- // of prepareTransaction against a standalone replica set, so allow skipping the check.
- if (!MONGO_FAIL_POINT(skipShardingPartsOfPrepareTransaction)) {
- if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
- uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
- }
+ if (!getTestCommandsEnabled() &&
+ serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
+ uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
}
auto txnParticipant = TransactionParticipant::get(opCtx);
@@ -99,8 +95,6 @@ public:
"Transaction isn't in progress",
txnParticipant->inMultiDocumentTransaction());
- const auto& cmd = request();
-
if (txnParticipant->transactionIsPrepared()) {
auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient());
auto prepareOpTime = txnParticipant->getPrepareOpTime();
@@ -118,96 +112,24 @@ public:
<< " participant prepareOpTime: "
<< prepareOpTime.toString());
- // A participant should re-send its vote if it re-received prepare.
- _sendVoteCommit(opCtx, prepareOpTime.getTimestamp(), cmd.getCoordinatorId());
-
+ if (MONGO_FAIL_POINT(
+ participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
+ }
return PrepareTimestamp(prepareOpTime.getTimestamp());
}
- // TODO (SERVER-36839): Pass coordinatorId into prepareTransaction() so that the
- // coordinatorId can be included in the write to config.transactions.
const auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx, {});
- _sendVoteCommit(opCtx, prepareTimestamp, cmd.getCoordinatorId());
-
- return PrepareTimestamp(prepareTimestamp);
- }
-
- private:
- void _sendVoteCommit(OperationContext* opCtx,
- Timestamp prepareTimestamp,
- ShardId coordinatorId) {
- // In a production cluster, a participant should always send its vote to the coordinator
- // as part of prepareTransaction. However, many test suites test the replication and
- // storage parts of prepareTransaction against a standalone replica set, so allow
- // skipping sending a vote.
- if (MONGO_FAIL_POINT(skipShardingPartsOfPrepareTransaction)) {
- return;
- }
-
- VoteCommitTransaction voteCommit;
- voteCommit.setDbName("admin");
- voteCommit.setShardId(ShardingState::get(opCtx)->shardId());
- voteCommit.setPrepareTimestamp(prepareTimestamp);
- BSONObj voteCommitObj = voteCommit.toBSON(
- BSON("lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber"
- << *opCtx->getTxnNumber()
- << "autocommit"
- << false));
- _sendVote(opCtx, voteCommitObj, coordinatorId);
- }
-
- void _sendVoteAbort(OperationContext* opCtx, ShardId coordinatorId) {
- // In a production cluster, a participant should always send its vote to the coordinator
- // as part of prepareTransaction. However, many test suites test the replication and
- // storage parts of prepareTransaction against a standalone replica set, so allow
- // skipping sending a vote.
- if (MONGO_FAIL_POINT(skipShardingPartsOfPrepareTransaction)) {
- return;
- }
-
- VoteAbortTransaction voteAbort;
- voteAbort.setDbName("admin");
- voteAbort.setShardId(ShardingState::get(opCtx)->shardId());
- BSONObj voteAbortObj = voteAbort.toBSON(
- BSON("lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber"
- << *opCtx->getTxnNumber()
- << "autocommit"
- << false));
- _sendVote(opCtx, voteAbortObj, coordinatorId);
- }
-
- void _sendVote(OperationContext* opCtx, const BSONObj& voteObj, ShardId coordinatorId) {
- try {
- // TODO (SERVER-37328): Participant should wait for writeConcern before sending its
- // vote.
-
- LOG(3) << "Participant shard sending " << voteObj << " to " << coordinatorId;
-
- const auto coordinatorPrimaryHost = [&] {
- auto coordinatorShard = uassertStatusOK(
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, coordinatorId));
- return uassertStatusOK(coordinatorShard->getTargeter()->findHostNoWait(
- ReadPreferenceSetting{ReadPreference::PrimaryOnly}));
- }();
-
- const executor::RemoteCommandRequest request(
- coordinatorPrimaryHost,
- NamespaceString::kAdminDb.toString(),
- voteObj,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly}.toContainingBSON(),
- opCtx,
- executor::RemoteCommandRequest::kNoTimeout);
-
- auto noOp = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {};
- uassertStatusOK(
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()->scheduleRemoteCommand(
- request, noOp));
- } catch (const DBException& ex) {
- LOG(3) << "Participant shard failed to send " << voteObj << " to " << coordinatorId
- << causedBy(ex.toStatus());
+ if (MONGO_FAIL_POINT(
+ participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
}
+ return PrepareTimestamp(std::move(prepareTimestamp));
}
+ private:
bool supportsWriteConcern() const override {
return true;
}
@@ -233,122 +155,6 @@ public:
}
} prepareTransactionCmd;
-class VoteCommitTransactionCmd : public TypedCommand<VoteCommitTransactionCmd> {
-public:
- using Request = VoteCommitTransaction;
- class Invocation final : public InvocationBase {
- public:
- using InvocationBase::InvocationBase;
-
- void typedRun(OperationContext* opCtx) {
- // Only config servers or initialized shard servers can act as transaction coordinators.
- if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
- uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
- }
-
- uassert(
- ErrorCodes::CommandNotSupported,
- "'voteCommitTransaction' is only supported in feature compatibility version 4.2",
- (serverGlobalParams.featureCompatibility.getVersion() ==
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42));
-
- const auto& cmd = request();
-
- LOG(3) << "Coordinator shard received voteCommit from " << cmd.getShardId()
- << " with prepare timestamp " << cmd.getPrepareTimestamp() << " for transaction "
- << opCtx->getTxnNumber() << " on session "
- << opCtx->getLogicalSessionId()->toBSON();
-
- TransactionCoordinatorService::get(opCtx)->voteCommit(
- opCtx,
- opCtx->getLogicalSessionId().get(),
- opCtx->getTxnNumber().get(),
- cmd.getShardId(),
- cmd.getPrepareTimestamp());
- }
-
- private:
- bool supportsWriteConcern() const override {
- return false;
- }
-
- NamespaceString ns() const override {
- return NamespaceString(request().getDbName(), "");
- }
-
- void doCheckAuthorization(OperationContext* opCtx) const override {}
- };
-
- virtual bool adminOnly() const {
- return true;
- }
-
- std::string help() const override {
- return "Votes to commit a transaction; sent by a transaction participant to the "
- "transaction commit coordinator for a cross-shard transaction";
- }
-
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kNever;
- }
-} voteCommitTransactionCmd;
-
-class VoteAbortTransactionCmd : public TypedCommand<VoteAbortTransactionCmd> {
-public:
- using Request = VoteAbortTransaction;
- class Invocation final : public InvocationBase {
- public:
- using InvocationBase::InvocationBase;
-
- void typedRun(OperationContext* opCtx) {
- // Only config servers or initialized shard servers can act as transaction coordinators.
- if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
- uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
- }
-
- uassert(ErrorCodes::CommandNotSupported,
- "'voteAbortTransaction' is only supported in feature compatibility version 4.2",
- (serverGlobalParams.featureCompatibility.getVersion() ==
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42));
-
- const auto& cmd = request();
-
- LOG(3) << "Coordinator shard received voteAbort from " << cmd.getShardId()
- << " for transaction " << opCtx->getTxnNumber() << " on session "
- << opCtx->getLogicalSessionId()->toBSON();
-
- TransactionCoordinatorService::get(opCtx)->voteAbort(opCtx,
- opCtx->getLogicalSessionId().get(),
- opCtx->getTxnNumber().get(),
- cmd.getShardId());
- }
-
- private:
- bool supportsWriteConcern() const override {
- return false;
- }
-
- NamespaceString ns() const override {
- return NamespaceString(request().getDbName(), "");
- }
-
- void doCheckAuthorization(OperationContext* opCtx) const override {}
- };
-
- virtual bool adminOnly() const {
- return true;
- }
-
- std::string help() const override {
- return "Votes to abort a transaction; sent by a transaction participant to the transaction "
- "commit coordinator for a cross-shard transaction";
- }
-
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kNever;
- }
-} voteAbortTransactionCmd;
-
// TODO (SERVER-37440): Make coordinateCommit idempotent.
class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTransactionCmd> {
public:
@@ -397,61 +203,16 @@ public:
opCtx->getTxnNumber().get(),
participantList);
- // If the commit decision is already available before we prepare locally, it means the
- // transaction has completed and we should skip preparing locally.
- //
- // TODO (SERVER-37440): Reconsider when coordinateCommit is made idempotent.
- if (!commitDecisionFuture.isReady()) {
- // Execute the 'prepare' logic on the local participant (the router does not send a
- // separate 'prepare' message to the coordinator shard).
- _callPrepareOnLocalParticipant(opCtx);
- }
-
// Block waiting for the commit decision.
auto commitDecision = commitDecisionFuture.get(opCtx);
// If the decision was abort, propagate NoSuchTransaction exception back to mongos.
uassert(ErrorCodes::NoSuchTransaction,
"Transaction was aborted",
- commitDecision != TransactionCoordinatorService::CommitDecision::kAbort);
+ commitDecision != TransactionCoordinator::CommitDecision::kAbort);
}
private:
- void _callPrepareOnLocalParticipant(OperationContext* opCtx) {
- auto localParticipantPrepareTimestamp = [&]() -> Timestamp {
- OperationSessionInfoFromClient sessionInfo;
- sessionInfo.setAutocommit(false);
- sessionInfo.setCoordinator(false);
- OperationContextSessionMongod checkOutSession(opCtx, true, sessionInfo);
-
- auto txnParticipant = TransactionParticipant::get(opCtx);
-
- txnParticipant->unstashTransactionResources(opCtx, "prepareTransaction");
- ScopeGuard guard = MakeGuard([&txnParticipant, opCtx]() {
- txnParticipant->abortActiveUnpreparedOrStashPreparedTransaction(opCtx);
- });
-
- auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx, {});
-
- txnParticipant->stashTransactionResources(opCtx);
- guard.Dismiss();
- return prepareTimestamp;
- }();
-
- LOG(3) << "Participant shard delivering voteCommit with prepareTimestamp "
- << localParticipantPrepareTimestamp << " to local coordinator for transaction "
- << opCtx->getTxnNumber() << " on session "
- << opCtx->getLogicalSessionId()->toBSON();
-
- // Deliver the local participant's vote to the coordinator.
- TransactionCoordinatorService::get(opCtx)->voteCommit(
- opCtx,
- opCtx->getLogicalSessionId().get(),
- opCtx->getTxnNumber().get(),
- ShardingState::get(opCtx)->shardId(),
- localParticipantPrepareTimestamp);
- }
-
bool supportsWriteConcern() const override {
return true;
}
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp
index 03bf534a104..cb12b61c719 100644
--- a/src/mongo/db/transaction_coordinator.cpp
+++ b/src/mongo/db/transaction_coordinator.cpp
@@ -28,14 +28,15 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction
#include "mongo/platform/basic.h"
+#include "mongo/db/logical_clock.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/transaction_coordinator.h"
-
-#include "mongo/db/session.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -43,46 +44,114 @@ using Action = TransactionCoordinator::StateMachine::Action;
using Event = TransactionCoordinator::StateMachine::Event;
using State = TransactionCoordinator::StateMachine::State;
+//
+// Pre-decision
+//
+
Action TransactionCoordinator::recvCoordinateCommit(const std::set<ShardId>& participants) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_participantList.recordFullList(participants);
return _stateMachine.onEvent(std::move(lk), Event::kRecvParticipantList);
}
-Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp) {
+Action TransactionCoordinator::madeParticipantListDurable() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- _participantList.recordVoteCommit(shardId, prepareTimestamp);
-
- auto event = (_participantList.allParticipantsVotedCommit()) ? Event::kRecvFinalVoteCommit
- : Event::kRecvVoteCommit;
- return _stateMachine.onEvent(std::move(lk), event);
+ return _stateMachine.onEvent(std::move(lk), Event::kMadeParticipantListDurable);
}
+//
+// Abort path
+//
+
Action TransactionCoordinator::recvVoteAbort(const ShardId& shardId) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_participantList.recordVoteAbort(shardId);
return _stateMachine.onEvent(std::move(lk), Event::kRecvVoteAbort);
}
-Action TransactionCoordinator::recvTryAbort() {
+Action TransactionCoordinator::madeAbortDecisionDurable() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- return _stateMachine.onEvent(std::move(lk), Event::kRecvTryAbort);
+ return _stateMachine.onEvent(std::move(lk), Event::kMadeAbortDecisionDurable);
}
-void TransactionCoordinator::recvCommitAck(const ShardId& shardId) {
+Action TransactionCoordinator::recvAbortAck(const ShardId& shardId) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _participantList.recordCommitAck(shardId);
- if (_participantList.allParticipantsAckedCommit()) {
- _stateMachine.onEvent(std::move(lk), Event::kRecvFinalCommitAck);
+ _participantList.recordAbortAck(shardId);
+ auto event = _participantList.allParticipantsAckedAbort() ? Event::kRecvFinalAbortAck
+ : Event::kRecvAbortAck;
+ return _stateMachine.onEvent(std::move(lk), event);
+}
+
+//
+// Commit path
+//
+
+Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _participantList.recordVoteCommit(shardId, prepareTimestamp);
+ auto event = (_participantList.allParticipantsVotedCommit()) ? Event::kRecvFinalVoteCommit
+ : Event::kRecvVoteCommit;
+ if (event == Event::kRecvFinalVoteCommit) {
+ const auto maxPrepareTs = _participantList.getHighestPrepareTimestamp();
+ _commitTimestamp = Timestamp(maxPrepareTs.getSecs(), maxPrepareTs.getInc() + 1);
+ Status s = LogicalClock::get(getGlobalServiceContext())
+ ->advanceClusterTime(LogicalTime(_commitTimestamp.get()));
+ if (!s.isOK()) {
+ log() << "Coordinator shard failed to advance cluster time to commitTimestamp "
+ << causedBy(s);
+ }
}
+ return _stateMachine.onEvent(std::move(lk), event);
+}
+
+Action TransactionCoordinator::madeCommitDecisionDurable() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _stateMachine.onEvent(std::move(lk), Event::kMadeCommitDecisionDurable);
}
+Action TransactionCoordinator::recvCommitAck(const ShardId& shardId) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _participantList.recordCommitAck(shardId);
+ auto event = _participantList.allParticipantsAckedCommit() ? Event::kRecvFinalCommitAck
+ : Event::kRecvCommitAck;
+ return _stateMachine.onEvent(std::move(lk), event);
+}
+
+//
+// Any time
+//
+
Future<TransactionCoordinator::StateMachine::State> TransactionCoordinator::waitForCompletion() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
return _stateMachine.waitForTransitionTo({State::kCommitted, State::kAborted});
}
+Future<TransactionCoordinator::CommitDecision> TransactionCoordinator::waitForDecision() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _stateMachine
+ .waitForTransitionTo({State::kWaitingForAbortAcks,
+ State::kWaitingForCommitAcks,
+ State::kCommitted,
+ State::kAborted})
+ .then([](auto state) {
+ switch (state) {
+ case TransactionCoordinator::StateMachine::State::kWaitingForAbortAcks:
+ case TransactionCoordinator::StateMachine::State::kAborted:
+ return TransactionCoordinator::CommitDecision::kAbort;
+ case TransactionCoordinator::StateMachine::State::kWaitingForCommitAcks:
+ case TransactionCoordinator::StateMachine::State::kCommitted:
+ return TransactionCoordinator::CommitDecision::kCommit;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ });
+}
+
+Action TransactionCoordinator::recvTryAbort() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _stateMachine.onEvent(std::move(lk), Event::kRecvTryAbort);
+}
+
//
// StateMachine
//
@@ -101,39 +170,70 @@ Future<TransactionCoordinator::StateMachine::State> TransactionCoordinator::wait
const std::map<State, std::map<Event, TransactionCoordinator::StateMachine::Transition>>
TransactionCoordinator::StateMachine::transitionTable = {
// clang-format off
- {State::kWaitingForParticipantList, {
- {Event::kRecvVoteAbort, {Action::kSendAbort, State::kAborted}},
- {Event::kRecvVoteCommit, {}},
- {Event::kRecvParticipantList, {State::kWaitingForVotes}},
- {Event::kRecvTryAbort, {Action::kSendAbort, State::kAborted}},
+ {State::kUninitialized, {
+ {Event::kRecvParticipantList, {Action::kWriteParticipantList, State::kMakingParticipantListDurable}},
+ {Event::kRecvTryAbort, {{}, State::kAborted}},
+ }},
+ {State::kMakingParticipantListDurable, {
+ {Event::kRecvParticipantList, {}},
+ {Event::kMadeParticipantListDurable, {Action::kSendPrepare, State::kWaitingForVotes}},
+ {Event::kRecvTryAbort, {}},
}},
{State::kWaitingForVotes, {
- {Event::kRecvVoteAbort, {Action::kSendAbort, State::kAborted}},
+ {Event::kRecvParticipantList, {}},
+ {Event::kRecvVoteAbort, {Action::kWriteAbortDecision, State::kMakingAbortDecisionDurable}},
{Event::kRecvVoteCommit, {}},
+ {Event::kRecvFinalVoteCommit, {Action::kWriteCommitDecision, State::kMakingCommitDecisionDurable}},
+ {Event::kRecvTryAbort, {}},
+ }},
+
+ // Abort path
+ // Note: Can continue to receive votes after abort decision has been made, because an abort
+ // decision only requires a single voteAbort.
+ {State::kMakingAbortDecisionDurable, {
+ {Event::kRecvParticipantList, {}},
+ {Event::kRecvVoteAbort, {}},
+ {Event::kRecvVoteCommit, {}},
+ {Event::kMadeAbortDecisionDurable, {Action::kSendAbort, State::kWaitingForAbortAcks}},
+ {Event::kRecvTryAbort, {}},
+ }},
+ {State::kWaitingForAbortAcks, {
{Event::kRecvParticipantList, {}},
- {Event::kRecvFinalVoteCommit, {Action::kSendCommit, State::kWaitingForCommitAcks}},
- {Event::kRecvTryAbort, {Action::kSendAbort, State::kAborted}},
+ {Event::kRecvVoteAbort, {}},
+ {Event::kRecvVoteCommit, {}},
+ {Event::kRecvAbortAck, {}},
+ {Event::kRecvFinalAbortAck, {Action::kDone, State::kAborted}},
+ {Event::kRecvTryAbort, {}},
+
}},
{State::kAborted, {
- {Event::kRecvVoteAbort, {}},
- {Event::kRecvVoteCommit, {Action::kSendAbort}},
{Event::kRecvParticipantList, {}},
+ {Event::kRecvVoteAbort, {}},
+ {Event::kRecvVoteCommit, {}},
{Event::kRecvTryAbort, {}},
}},
+
+ // Commit path
+ // Note: Cannot continue to receive votes after commit decision has been made, because a
+ // commit decision requires all voteCommits.
+ {State::kMakingCommitDecisionDurable, {
+ {Event::kRecvParticipantList, {}},
+ {Event::kMadeCommitDecisionDurable, {Action::kSendCommit, State::kWaitingForCommitAcks}},
+ {Event::kRecvTryAbort, {}},
+
+ }},
{State::kWaitingForCommitAcks, {
- {Event::kRecvVoteCommit, {}},
{Event::kRecvParticipantList, {}},
- {Event::kRecvFinalVoteCommit, {Action::kSendCommit}},
- {Event::kRecvFinalCommitAck, {State::kCommitted}},
+ {Event::kRecvCommitAck, {}},
+ {Event::kRecvFinalCommitAck, {Action::kDone, State::kCommitted}},
{Event::kRecvTryAbort, {}},
+
}},
{State::kCommitted, {
- {Event::kRecvVoteCommit, {}},
{Event::kRecvParticipantList, {}},
- {Event::kRecvFinalVoteCommit, {}},
- {Event::kRecvFinalCommitAck, {}},
{Event::kRecvTryAbort, {}},
}},
+
{State::kBroken, {}},
// clang-format on
};
@@ -175,6 +275,7 @@ void TransactionCoordinator::StateMachine::_signalAllPromisesWaitingForState(
Action TransactionCoordinator::StateMachine::onEvent(stdx::unique_lock<stdx::mutex> lk,
Event event) {
+
const auto legalTransitions = transitionTable.find(_state)->second;
if (!legalTransitions.count(event)) {
std::string errmsg = str::stream() << "Transaction coordinator received illegal event '"
@@ -184,9 +285,20 @@ Action TransactionCoordinator::StateMachine::onEvent(stdx::unique_lock<stdx::mut
}
const auto transition = legalTransitions.find(event)->second;
+
if (transition.nextState) {
+ StringBuilder ss;
+ ss << "TransactionCoordinator received event " << event << " while in state " << _state
+ << " and returning " << transition.action << " and transitioning to "
+ << *transition.nextState;
+ LOG(3) << ss.str();
_state = *transition.nextState;
_signalAllPromisesWaitingForState(std::move(lk), _state);
+ } else {
+ StringBuilder ss;
+ ss << "TransactionCoordinator received event " << event << " while in state " << _state
+ << " and returning " << transition.action << " and not transitioning to new state";
+ LOG(3) << ss.str();
}
return transition.action;
@@ -288,12 +400,13 @@ void TransactionCoordinator::ParticipantList::recordVoteAbort(const ShardId& sha
participant.vote != Participant::Vote::kCommit);
participant.vote = Participant::Vote::kAbort;
+ participant.ack = Participant::Ack::kAbort;
}
void TransactionCoordinator::ParticipantList::recordCommitAck(const ShardId& shardId) {
auto it = _participants.find(shardId);
uassert(
- ErrorCodes::InternalError,
+ 50989,
str::stream() << "Transaction commit coordinator processed 'commit' ack from participant "
<< shardId.toString()
<< " not in participant list",
@@ -301,6 +414,17 @@ void TransactionCoordinator::ParticipantList::recordCommitAck(const ShardId& sha
it->second.ack = Participant::Ack::kCommit;
}
+void TransactionCoordinator::ParticipantList::recordAbortAck(const ShardId& shardId) {
+ auto it = _participants.find(shardId);
+ uassert(
+ 50990,
+ str::stream() << "Transaction commit coordinator processed 'abort' ack from participant "
+ << shardId.toString()
+ << " not in participant list",
+ it != _participants.end());
+ it->second.ack = Participant::Ack::kAbort;
+}
+
bool TransactionCoordinator::ParticipantList::allParticipantsVotedCommit() const {
return _fullListReceived && std::all_of(_participants.begin(),
_participants.end(),
@@ -309,6 +433,13 @@ bool TransactionCoordinator::ParticipantList::allParticipantsVotedCommit() const
});
}
+bool TransactionCoordinator::ParticipantList::allParticipantsAckedAbort() const {
+ return std::all_of(
+ _participants.begin(), _participants.end(), [](const std::pair<ShardId, Participant>& i) {
+ return i.second.ack == Participant::Ack::kAbort;
+ });
+}
+
bool TransactionCoordinator::ParticipantList::allParticipantsAckedCommit() const {
invariant(_fullListReceived);
return std::all_of(
@@ -329,6 +460,14 @@ Timestamp TransactionCoordinator::ParticipantList::getHighestPrepareTimestamp()
return highestPrepareTimestamp;
}
+std::set<ShardId> TransactionCoordinator::ParticipantList::getParticipants() const {
+ std::set<ShardId> participants;
+ for (const auto& kv : _participants) {
+ participants.insert(kv.first);
+ }
+ return participants;
+}
+
std::set<ShardId> TransactionCoordinator::ParticipantList::getNonAckedCommitParticipants() const {
std::set<ShardId> nonAckedCommitParticipants;
for (const auto& kv : _participants) {
diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h
index f0e6f0612eb..510a3339a9e 100644
--- a/src/mongo/db/transaction_coordinator.h
+++ b/src/mongo/db/transaction_coordinator.h
@@ -33,10 +33,12 @@
#include <boost/optional.hpp>
#include <list>
#include <map>
+#include <memory>
#include <set>
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/timestamp.h"
+#include "mongo/db/logical_session_id.h"
#include "mongo/s/shard_id.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
@@ -53,26 +55,37 @@ class Session;
* A state machine that coordinates a distributed transaction commit with the transaction
* participants.
*/
-class TransactionCoordinator {
+class TransactionCoordinator : public std::enable_shared_from_this<TransactionCoordinator> {
MONGO_DISALLOW_COPYING(TransactionCoordinator);
public:
TransactionCoordinator() = default;
~TransactionCoordinator() = default;
+ enum class CommitDecision {
+ kCommit,
+ kAbort,
+ };
+
/**
* The internal state machine, or "brain", used by the TransactionCoordinator to determine what
* to do in response to an "event" (receiving a request or hearing back a response).
*/
class StateMachine {
- friend class TransactionCoordinator;
-
public:
~StateMachine();
enum class State {
- kWaitingForParticipantList,
+ kUninitialized,
+ kMakingParticipantListDurable,
kWaitingForVotes,
+
+ // Abort path
+ kMakingAbortDecisionDurable,
+ kWaitingForAbortAcks,
kAborted,
+
+ // Commit path
+ kMakingCommitDecisionDurable,
kWaitingForCommitAcks,
kCommitted,
@@ -85,16 +98,36 @@ public:
// State machine inputs
enum class Event {
+ kRecvParticipantList,
+ kMadeParticipantListDurable,
+
+ // Abort path
kRecvVoteAbort,
+ kMadeAbortDecisionDurable,
+ kRecvAbortAck,
+ kRecvFinalAbortAck,
+
+ // Commit path
kRecvVoteCommit,
- kRecvParticipantList,
kRecvFinalVoteCommit,
+ kMadeCommitDecisionDurable,
+ kRecvCommitAck,
kRecvFinalCommitAck,
+
kRecvTryAbort,
};
// State machine outputs
- enum class Action { kNone, kSendCommit, kSendAbort };
+ enum class Action {
+ kNone,
+ kWriteParticipantList,
+ kSendPrepare,
+ kWriteAbortDecision,
+ kSendAbort,
+ kWriteCommitDecision,
+ kSendCommit,
+ kDone
+ };
// IMPORTANT: If there is a state transition, this will release the lock in order to signal
// any promises that may be waiting on a state change, and will not reacquire it.
@@ -137,7 +170,7 @@ public:
};
static const std::map<State, std::map<Event, Transition>> transitionTable;
- State _state{State::kWaitingForParticipantList};
+ State _state{State::kUninitialized};
std::list<StateTransitionPromise> _stateTransitionPromises;
};
@@ -145,38 +178,77 @@ public:
* The coordinateCommit command contains the full participant list that this node is responsible
* for coordinating the commit across.
*
- * Stores the participant list.
+ * Stores the participant list and returns the next action to take.
*
* Throws if any participants that this node has already heard a vote from are not in the list.
*/
StateMachine::Action recvCoordinateCommit(const std::set<ShardId>& participants);
/**
- * A participant sends a voteCommit command with its prepareTimestamp if it succeeded in
- * preparing the transaction.
+ * Advances the state machine and returns the next action to take.
+ */
+ StateMachine::Action madeParticipantListDurable();
+
+ //
+ // Abort path
+ //
+
+ /**
+ * A participant responds to prepare with failure if it failed to prepare the transaction, has
+ * timed out and already aborted the transaction, or has received a higher transaction number.
*
- * Stores the participant's vote.
+ * Stores the participant's vote and returns the next action to take.
*
* Throws if the full participant list has been received and this shard is not one of the
* participants.
*/
- StateMachine::Action recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp);
+ StateMachine::Action recvVoteAbort(const ShardId& shardId);
+
+ /**
+ * Advances the state machine and returns the next action to take.
+ */
+ StateMachine::Action madeAbortDecisionDurable();
/**
- * A participant sends a voteAbort command if it failed to prepare the transaction.
+ * If this is the final abort ack, advances the state machine. Returns the next action to take.
+ */
+ StateMachine::Action recvAbortAck(const ShardId& shardId);
+
+ //
+ // Commit path
+ //
+
+ /**
+ * A participant responds to prepare with success and its prepare Timestamp if it succeeded in
+ * preparing the transaction.
*
- * Stores the participant's vote and causes the coordinator to decide to abort the transaction.
+ * Stores the participant's vote and prepare Timestamp and returns the next action to take.
*
* Throws if the full participant list has been received and this shard is not one of the
* participants.
*/
- StateMachine::Action recvVoteAbort(const ShardId& shardId);
+ StateMachine::Action recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp);
/**
- * A tryAbort event is received by the coordinator when a transaction is implicitly aborted when
- * a new transaction is received for the same session with a higher transaction number.
+ * Advances the state machine and returns the next action to take.
*/
- StateMachine::Action recvTryAbort();
+ StateMachine::Action madeCommitDecisionDurable();
+
+ /**
+ * Marks this participant as having completed committing the transaction.
+ */
+ StateMachine::Action recvCommitAck(const ShardId& shardId);
+
+ //
+ // Any time
+ //
+
+ /**
+ * Returns a Future which will be signaled when the TransactionCoordinator has successfully
+ * persisted a commit or abort decision. The resulting future will contain coordinator's
+ * decision.
+ */
+ Future<TransactionCoordinator::CommitDecision> waitForDecision();
/**
* Returns a Future which will be signaled when the TransactionCoordinator either commits
@@ -185,9 +257,15 @@ public:
Future<TransactionCoordinator::StateMachine::State> waitForCompletion();
/**
- * Marks this participant as having completed committing the transaction.
+ * A tryAbort event is received by the coordinator when a transaction is implicitly aborted when
+ * a new transaction is received for the same session with a higher transaction number.
*/
- void recvCommitAck(const ShardId& shardId);
+ StateMachine::Action recvTryAbort();
+
+ std::set<ShardId> getParticipants() const {
+ invariant(_stateMachine.state() != StateMachine::State::kUninitialized);
+ return _participantList.getParticipants();
+ }
std::set<ShardId> getNonAckedCommitParticipants() const {
return _participantList.getNonAckedCommitParticipants();
@@ -197,8 +275,8 @@ public:
return _participantList.getNonVotedAbortParticipants();
}
- Timestamp getCommitTimestamp() const {
- return _participantList.getHighestPrepareTimestamp();
+ boost::optional<Timestamp> getCommitTimestamp() const {
+ return _commitTimestamp;
}
StateMachine::State state() const {
@@ -218,14 +296,23 @@ public:
bool allParticipantsAckedCommit() const;
Timestamp getHighestPrepareTimestamp() const;
-
+ std::set<ShardId> getParticipants() const;
std::set<ShardId> getNonAckedCommitParticipants() const;
std::set<ShardId> getNonVotedAbortParticipants() const;
class Participant {
public:
+ /**
+ * This participant's vote, that is, whether the participant responded with success to
+ * prepareTransaction.
+ */
enum class Vote { kUnknown, kAbort, kCommit };
- enum class Ack { kNone, kCommit };
+
+ /**
+ * Whether this participant has acked the decision.
+ * TODO (SERVER-37924): Remove this enum and just track the ack as a bool.
+ */
+ enum class Ack { kNone, kAbort, kCommit };
Vote vote{Vote::kUnknown};
Ack ack{Ack::kNone};
@@ -244,23 +331,27 @@ private:
stdx::mutex _mutex;
ParticipantList _participantList;
StateMachine _stateMachine;
+ boost::optional<Timestamp> _commitTimestamp;
};
inline StringBuilder& operator<<(StringBuilder& sb,
const TransactionCoordinator::StateMachine::State& state) {
using State = TransactionCoordinator::StateMachine::State;
+ // clang-format off
switch (state) {
- // clang-format off
- case State::kWaitingForParticipantList: return sb << "kWaitingForParticipantlist";
+ case State::kUninitialized: return sb << "kUninitialized";
+ case State::kMakingParticipantListDurable: return sb << "kMakingParticipantListDurable";
case State::kWaitingForVotes: return sb << "kWaitingForVotes";
+ case State::kMakingAbortDecisionDurable: return sb << "kMakingAbortDecisionsDurable";
+ case State::kWaitingForAbortAcks: return sb << "kWaitingForAbortAcks";
case State::kAborted: return sb << "kAborted";
+ case State::kMakingCommitDecisionDurable: return sb << "kMakingCommiDecisionsDurable";
case State::kWaitingForCommitAcks: return sb << "kWaitingForCommitAcks";
case State::kCommitted: return sb << "kCommitted";
case State::kBroken: return sb << "kBroken";
- // clang-format on
- default:
- MONGO_UNREACHABLE;
};
+ // clang-format on
+ MONGO_UNREACHABLE;
}
inline std::ostream& operator<<(std::ostream& os,
@@ -273,17 +364,23 @@ inline std::ostream& operator<<(std::ostream& os,
inline StringBuilder& operator<<(StringBuilder& sb,
const TransactionCoordinator::StateMachine::Event& event) {
using Event = TransactionCoordinator::StateMachine::Event;
+ // clang-format off
switch (event) {
- // clang-format off
- case Event::kRecvVoteAbort: return sb << "kRecvVoteAbort";
- case Event::kRecvVoteCommit: return sb << "kRecvVoteCommit";
- case Event::kRecvParticipantList: return sb << "kRecvParticipantList";
- case Event::kRecvFinalVoteCommit: return sb << "kRecvFinalVoteCommit";
- case Event::kRecvFinalCommitAck: return sb << "kRecvFinalCommitAck";
- // clang-format on
- default:
- MONGO_UNREACHABLE;
+ case Event::kRecvParticipantList: return sb << "kRecvParticipantList";
+ case Event::kMadeParticipantListDurable: return sb << "kMadeParticipantListDurable";
+ case Event::kRecvVoteAbort: return sb << "kRecvVoteAbort";
+ case Event::kMadeAbortDecisionDurable: return sb << "kMadeAbortDecisionDurable";
+ case Event::kRecvAbortAck: return sb << "kRecvAbortAck";
+ case Event::kRecvFinalAbortAck: return sb << "kRecvFinalAbortAck";
+ case Event::kRecvVoteCommit: return sb << "kRecvVoteCommit";
+ case Event::kRecvFinalVoteCommit: return sb << "kRecvFinalVoteCommit";
+ case Event::kMadeCommitDecisionDurable: return sb << "kMadeCommitDecisionDurable";
+ case Event::kRecvCommitAck: return sb << "kRecvCommitAck";
+ case Event::kRecvFinalCommitAck: return sb << "kRecvFinalCommitAck";
+ case Event::kRecvTryAbort: return sb << "kRecvTryAbort";
};
+ // clang-format on
+ MONGO_UNREACHABLE;
}
inline std::ostream& operator<<(std::ostream& os,
@@ -298,9 +395,14 @@ inline StringBuilder& operator<<(StringBuilder& sb,
using Action = TransactionCoordinator::StateMachine::Action;
// clang-format off
switch (action) {
- case Action::kSendCommit: return sb << "kSendCommit";
- case Action::kSendAbort: return sb << "kSendAbort";
- case Action::kNone: return sb << "kNone";
+ case Action::kNone: return sb << "kNone";
+ case Action::kWriteParticipantList: return sb << "kWriteParticipantList";
+ case Action::kSendPrepare: return sb << "kSendPrepare";
+ case Action::kWriteAbortDecision: return sb << "kWriteAbortDecision";
+ case Action::kSendAbort: return sb << "kSendAbort";
+ case Action::kWriteCommitDecision: return sb << "kWriteCommitDecision";
+ case Action::kSendCommit: return sb << "kSendCommit";
+ case Action::kDone: return sb << "kDone";
};
// clang-format on
MONGO_UNREACHABLE;
@@ -312,4 +414,5 @@ inline std::ostream& operator<<(std::ostream& os,
sb << action;
return os << sb.str();
}
+
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp
index 79e7de153b2..74b29971677 100644
--- a/src/mongo/db/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog.cpp
@@ -78,22 +78,22 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::create(Lo
return newCoordinator;
}
-boost::optional<std::shared_ptr<TransactionCoordinator>> TransactionCoordinatorCatalog::get(
- LogicalSessionId lsid, TxnNumber txnNumber) {
+std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(LogicalSessionId lsid,
+ TxnNumber txnNumber) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid);
if (coordinatorsForSessionIter == _coordinatorsBySession.end()) {
- return boost::none;
+ return nullptr;
}
const auto& coordinatorsForSession = coordinatorsForSessionIter->second;
const auto& coordinatorForTxnIter = coordinatorsForSession.find(txnNumber);
if (coordinatorForTxnIter == coordinatorsForSession.end()) {
- return boost::none;
+ return nullptr;
}
return coordinatorForTxnIter->second;
diff --git a/src/mongo/db/transaction_coordinator_catalog.h b/src/mongo/db/transaction_coordinator_catalog.h
index bf28bf57e1f..85fb7609ab2 100644
--- a/src/mongo/db/transaction_coordinator_catalog.h
+++ b/src/mongo/db/transaction_coordinator_catalog.h
@@ -68,10 +68,9 @@ public:
/**
* Returns the coordinator with the given session id and transaction number, if it exists. If it
- * does not exist, return boost::none.
+ * does not exist, return nullptr.
*/
- boost::optional<std::shared_ptr<TransactionCoordinator>> get(LogicalSessionId lsid,
- TxnNumber txnNumber);
+ std::shared_ptr<TransactionCoordinator> get(LogicalSessionId lsid, TxnNumber txnNumber);
/**
* Returns the coordinator with the highest transaction number with the given session id, if it
diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp
index 69d7e2b7d37..5a16a26a4b9 100644
--- a/src/mongo/db/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp
@@ -83,7 +83,7 @@ TEST_F(TransactionCoordinatorCatalogTest, GetOnSessionThatDoesNotExistReturnsNon
TxnNumber txnNumber = 1;
auto coordinator = coordinatorCatalog().get(lsid, txnNumber);
- ASSERT_EQ(coordinator, boost::none);
+ ASSERT(coordinator == nullptr);
}
TEST_F(TransactionCoordinatorCatalogTest,
@@ -92,7 +92,7 @@ TEST_F(TransactionCoordinatorCatalogTest,
TxnNumber txnNumber = 1;
createCoordinatorInCatalog(lsid, txnNumber);
auto coordinatorInCatalog = coordinatorCatalog().get(lsid, txnNumber + 1);
- ASSERT_EQ(coordinatorInCatalog, boost::none);
+ ASSERT(coordinatorInCatalog == nullptr);
}
@@ -101,7 +101,7 @@ TEST_F(TransactionCoordinatorCatalogTest, CreateFollowedByGetReturnsCoordinator)
TxnNumber txnNumber = 1;
createCoordinatorInCatalog(lsid, txnNumber);
auto coordinatorInCatalog = coordinatorCatalog().get(lsid, txnNumber);
- ASSERT_NOT_EQUALS(coordinatorInCatalog, boost::none);
+ ASSERT(coordinatorInCatalog != nullptr);
}
TEST_F(TransactionCoordinatorCatalogTest, SecondCreateForSessionDoesNotOverwriteFirstCreate) {
@@ -112,7 +112,7 @@ TEST_F(TransactionCoordinatorCatalogTest, SecondCreateForSessionDoesNotOverwrite
auto coordinator2 = createCoordinatorInCatalog(lsid, txnNumber2);
auto coordinator1InCatalog = coordinatorCatalog().get(lsid, txnNumber1);
- ASSERT_NOT_EQUALS(coordinator1InCatalog, boost::none);
+ ASSERT(coordinator1InCatalog != nullptr);
}
DEATH_TEST_F(TransactionCoordinatorCatalogTest,
@@ -142,37 +142,41 @@ TEST_F(TransactionCoordinatorCatalogTest,
ASSERT_EQ(latestTxnNumAndCoordinator->first, txnNumber);
}
-TEST_F(TransactionCoordinatorCatalogTest,
- CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachCommittedState) {
- using CoordinatorState = TransactionCoordinator::StateMachine::State;
-
- LogicalSessionId lsid = makeLogicalSessionIdForTest();
- TxnNumber txnNumber = 1;
- auto coordinator = createCoordinatorInCatalog(lsid, txnNumber);
-
- coordinator->recvCoordinateCommit({ShardId("shard0000")});
- coordinator->recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
- coordinator->recvCommitAck(ShardId("shard0000"));
- ASSERT_EQ(coordinator->state(), CoordinatorState::kCommitted);
-
- auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid);
- ASSERT_FALSE(latestTxnNumAndCoordinator);
-}
-
-TEST_F(TransactionCoordinatorCatalogTest,
- CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachAbortedState) {
- using CoordinatorState = TransactionCoordinator::StateMachine::State;
-
- LogicalSessionId lsid = makeLogicalSessionIdForTest();
- TxnNumber txnNumber = 1;
- auto coordinator = createCoordinatorInCatalog(lsid, txnNumber);
-
- coordinator->recvVoteAbort(ShardId("shard0000"));
- ASSERT_EQ(coordinator->state(), CoordinatorState::kAborted);
+// TODO (SERVER-XXXX): Re-enable once coordinators are also participants and decision recovery
+// works correctly.
+// TEST_F(TransactionCoordinatorCatalogTest,
+// CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachCommittedState) {
+// using CoordinatorState = TransactionCoordinator::StateMachine::State;
+//
+// LogicalSessionId lsid = makeLogicalSessionIdForTest();
+// TxnNumber txnNumber = 1;
+// auto coordinator = createCoordinatorInCatalog(lsid, txnNumber);
+//
+// coordinator->recvCoordinateCommit({ShardId("shard0000")});
+// coordinator->recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
+// coordinator->recvCommitAck(ShardId("shard0000"));
+// ASSERT_EQ(coordinator->state(), CoordinatorState::kCommitted);
+//
+// auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid);
+// ASSERT_FALSE(latestTxnNumAndCoordinator);
+// }
- auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid);
- ASSERT_FALSE(latestTxnNumAndCoordinator);
-}
+// TODO (SERVER-XXXX): Re-enable once coordinators are also participants and decision recovery
+// works correctly.
+// TEST_F(TransactionCoordinatorCatalogTest,
+// CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachAbortedState) {
+// using CoordinatorState = TransactionCoordinator::StateMachine::State;
+//
+// LogicalSessionId lsid = makeLogicalSessionIdForTest();
+// TxnNumber txnNumber = 1;
+// auto coordinator = createCoordinatorInCatalog(lsid, txnNumber);
+//
+// coordinator->recvVoteAbort(ShardId("shard0000"));
+// ASSERT_EQ(coordinator->state(), CoordinatorState::kAborted);
+//
+// auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid);
+// ASSERT_FALSE(latestTxnNumAndCoordinator);
+// }
TEST_F(TransactionCoordinatorCatalogTest,
TwoCreatesFollowedByGetLatestOnSessionReturnsCoordinatorWithHighestTxnNumber) {
diff --git a/src/mongo/db/transaction_coordinator_commands_impl.cpp b/src/mongo/db/transaction_coordinator_commands_impl.cpp
index ce63dd84ef0..12896e1eb83 100644
--- a/src/mongo/db/transaction_coordinator_commands_impl.cpp
+++ b/src/mongo/db/transaction_coordinator_commands_impl.cpp
@@ -36,70 +36,72 @@
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands/txn_cmds_gen.h"
-#include "mongo/db/operation_context_session_mongod.h"
+#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/async_requests_sender.h"
#include "mongo/s/grid.h"
+#include "mongo/util/concurrency/notification.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
+using Action = TransactionCoordinator::StateMachine::Action;
+using State = TransactionCoordinator::StateMachine::State;
+using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
+
/**
* Finds the host and port for a shard.
*/
-StatusWith<HostAndPort> targetHost(OperationContext* opCtx,
- const ShardId& shardId,
- const ReadPreferenceSetting& readPref) {
+StatusWith<HostAndPort> targetHost(const ShardId& shardId, const ReadPreferenceSetting& readPref) {
auto shard = Grid::get(getGlobalServiceContext())->shardRegistry()->getShardNoReload(shardId);
if (!shard) {
return Status(ErrorCodes::ShardNotFound,
str::stream() << "Could not find shard " << shardId);
}
- auto targeter = shard->getTargeter();
- return targeter->findHost(opCtx, readPref);
+ return shard->getTargeter()->findHostNoWait(readPref);
}
-using CallbackFn = stdx::function<void(Status status, const ShardId& shardID)>;
+using CallbackFn = stdx::function<void(
+ const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj)>;
/**
* Sends the given command object to the given shard ID. If scheduling and running the command is
* successful, calls the callback with the status of the command response and the shard ID.
*/
-void sendAsyncCommandToShard(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+void sendAsyncCommandToShard(executor::TaskExecutor* executor,
const ShardId& shardId,
const BSONObj& commandObj,
- CallbackFn callbackOnCommandResponse) {
+ const CallbackFn& callbackOnCommandResponse) {
auto readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly);
- auto swShardHostAndPort = targetHost(opCtx, shardId, readPref);
- if (!swShardHostAndPort.isOK()) {
+ auto swShardHostAndPort = targetHost(shardId, readPref);
+ while (!swShardHostAndPort.isOK()) {
LOG(3) << "Coordinator shard failed to target primary host of participant shard for "
<< commandObj << causedBy(swShardHostAndPort.getStatus());
- return;
+ swShardHostAndPort = targetHost(shardId, readPref);
}
executor::RemoteCommandRequest request(
swShardHostAndPort.getValue(), "admin", commandObj, readPref.toContainingBSON(), nullptr);
auto swCallbackHandle = executor->scheduleRemoteCommand(
- request,
- [commandObj, shardId, callbackOnCommandResponse](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
-
- auto status = (!args.response.isOK()) ? args.response.status
- : getStatusFromCommandResult(args.response.data);
+ request, [ commandObjOwned = commandObj.getOwned(),
+ shardId,
+ callbackOnCommandResponse ](const RemoteCommandCallbackArgs& args) {
+ LOG(3) << "Coordinator shard got response " << args.response.data << " for "
+ << commandObjOwned << " to " << shardId;
- LOG(3) << "Coordinator shard got response " << status << " for " << commandObj << " to "
- << shardId;
-
- // Only call callback if command successfully executed and got a response.
- if (args.response.isOK()) {
- callbackOnCommandResponse(status, shardId);
- }
+ callbackOnCommandResponse(args, shardId, commandObjOwned);
});
if (!swCallbackHandle.isOK()) {
@@ -107,7 +109,8 @@ void sendAsyncCommandToShard(OperationContext* opCtx,
<< " to shard " << shardId << causedBy(swCallbackHandle.getStatus());
}
- // Do not wait for the callback to run.
+ // Do not wait for the callback to run. The callback will reschedule the remote request on the
+ // same executor if necessary.
}
/**
@@ -115,67 +118,216 @@ void sendAsyncCommandToShard(OperationContext* opCtx,
* scheduling and running the command is successful, calls the callback with the status of the
* command response and the shard ID.
*/
-void sendAsyncCommandToShards(OperationContext* opCtx,
- const std::set<ShardId>& shardIds,
- const BSONObj& commandObj,
- CallbackFn callbackOnCommandResponse) {
+void sendCommandToShards(OperationContext* opCtx,
+ const std::set<ShardId>& shardIds,
+ const BSONObj& commandObj,
+ const CallbackFn& callbackOnCommandResponse) {
// TODO (SERVER-36638): Change to arbitrary task executor? Unit test only supports fixed
// executor.
auto exec = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
StringBuilder ss;
ss << "[";
- // For each non-acked participant, launch an async task to target its shard
- // and then asynchronously send the command.
for (const auto& shardId : shardIds) {
- sendAsyncCommandToShard(opCtx, exec, shardId, commandObj, callbackOnCommandResponse);
+ sendAsyncCommandToShard(exec, shardId, commandObj, callbackOnCommandResponse);
ss << shardId << " ";
}
-
ss << "]";
LOG(3) << "Coordinator shard sending " << commandObj << " to " << ss.str();
}
+void driveCoordinatorUntilDone(OperationContext* opCtx,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ Action action) {
+ while (true) {
+ switch (action) {
+ case Action::kWriteParticipantList:
+ action = coordinator->madeParticipantListDurable();
+ break;
+ case Action::kSendPrepare:
+ action = txn::sendPrepare(
+ opCtx, lsid, txnNumber, coordinator, coordinator->getParticipants());
+ break;
+ case Action::kWriteAbortDecision:
+ action = coordinator->madeAbortDecisionDurable();
+ break;
+ case Action::kSendAbort:
+ action = txn::sendAbort(opCtx,
+ lsid,
+ txnNumber,
+ coordinator,
+ coordinator->getNonVotedAbortParticipants());
+ break;
+ case Action::kWriteCommitDecision:
+ action = coordinator->madeCommitDecisionDurable();
+ break;
+ case Action::kSendCommit:
+ action = txn::sendCommit(opCtx,
+ lsid,
+ txnNumber,
+ coordinator,
+ coordinator->getNonAckedCommitParticipants(),
+ coordinator->getCommitTimestamp().get());
+ break;
+ case Action::kDone:
+ return;
+ case Action::kNone:
+ // This means an event was delivered to the coordinator outside the expected order
+ // of events.
+ MONGO_UNREACHABLE;
+ }
+ }
+}
} // namespace
namespace txn {
-void sendCommit(OperationContext* opCtx,
- std::shared_ptr<TransactionCoordinator> coordinator,
- const std::set<ShardId>& nonAckedParticipants,
- Timestamp commitTimestamp) {
- invariant(coordinator);
+Action sendPrepare(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& participants) {
+ PrepareTransaction prepareCmd;
+ prepareCmd.setDbName("admin");
+ auto prepareObj = prepareCmd.toBSON(
+ BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
+ << WriteConcernOptions::kWriteConcernField
+ << WriteConcernOptions::Majority));
+
+ auto actionNotification = std::make_shared<Notification<Action>>();
+ CallbackFn prepareCallback;
+ prepareCallback = [coordinator, actionNotification, &prepareCallback](
+ const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj) {
+ auto status = (!args.response.isOK()) ? args.response.status
+ : getStatusFromCommandResult(args.response.data);
+
+ boost::optional<Action> action;
+
+ if (status.isOK()) {
+ if (args.response.data["prepareTimestamp"].eoo() ||
+ args.response.data["prepareTimestamp"].timestamp().isNull()) {
+ LOG(0) << "Coordinator shard received an OK response to prepareTransaction without "
+ "a prepareTimestamp from shard "
+ << shardId
+ << ", which is not expected behavior. Interpreting the response from "
+ << shardId << " as a vote to abort";
+ action = coordinator->recvVoteAbort(shardId);
+ } else {
+ action = coordinator->recvVoteCommit(
+ shardId, args.response.data["prepareTimestamp"].timestamp());
+ }
+ } else if (ErrorCodes::isVoteAbortError(status.code())) {
+ action = coordinator->recvVoteAbort(shardId);
+ }
+
+ if (action) {
+ if (*action != Action::kNone) {
+ actionNotification->set(*action);
+ }
+ return;
+ }
+
+ if (coordinator->state() != State::kWaitingForVotes) {
+ LOG(3) << "Coordinator shard not retrying prepare against " << shardId
+ << " because coordinator is no longer waiting for votes";
+ } else {
+ LOG(3) << "Coordinator shard retrying " << commandObj << " against " << shardId;
+ sendAsyncCommandToShard(args.executor, shardId, commandObj, prepareCallback);
+ }
+ };
+
+ sendCommandToShards(opCtx, participants, prepareObj, prepareCallback);
+ return actionNotification->get(opCtx);
+}
+
+Action sendCommit(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& nonAckedParticipants,
+ Timestamp commitTimestamp) {
CommitTransaction commitTransaction;
commitTransaction.setCommitTimestamp(commitTimestamp);
commitTransaction.setDbName("admin");
- BSONObj commitObj = commitTransaction.toBSON(BSON(
- "lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber" << *opCtx->getTxnNumber()
- << "autocommit"
- << false));
-
- sendAsyncCommandToShards(opCtx,
- nonAckedParticipants,
- commitObj,
- [coordinator](Status commandResponseStatus, const ShardId& shardId) {
- // TODO (SERVER-36642): Also interpret TransactionTooOld as
- // acknowledgment.
- if (commandResponseStatus.isOK()) {
- coordinator->recvCommitAck(shardId);
- }
- });
+ BSONObj commitObj = commitTransaction.toBSON(
+ BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false));
+
+ auto actionNotification = std::make_shared<Notification<Action>>();
+ CallbackFn commitCallback;
+ commitCallback = [coordinator, actionNotification, &commitCallback](
+ const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj) {
+ auto status = (!args.response.isOK()) ? args.response.status
+ : getStatusFromCommandResult(args.response.data);
+
+ if (status.isOK() || ErrorCodes::isVoteAbortError(status.code())) {
+ auto action = coordinator->recvCommitAck(shardId);
+ if (action != Action::kNone) {
+ actionNotification->set(action);
+ }
+ return;
+ }
+
+ LOG(3) << "Coordinator shard retrying " << commandObj << " against " << shardId;
+ sendAsyncCommandToShard(args.executor, shardId, commandObj, commitCallback);
+ };
+ sendCommandToShards(opCtx, nonAckedParticipants, commitObj, commitCallback);
+
+ return actionNotification->get(opCtx);
}
-void sendAbort(OperationContext* opCtx, const std::set<ShardId>& nonVotedAbortParticipants) {
+Action sendAbort(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& nonVotedAbortParticipants) {
// TODO (SERVER-36584) Use IDL to create command BSON.
- BSONObj abortObj = BSON(
- "abortTransaction" << 1 << "lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber"
- << *opCtx->getTxnNumber()
- << "autocommit"
- << false);
-
- sendAsyncCommandToShards(
- opCtx, nonVotedAbortParticipants, abortObj, [](Status, const ShardId&) {});
+ BSONObj abortObj =
+ BSON("abortTransaction" << 1 << "lsid" << lsid.toBSON() << "txnNumber" << txnNumber
+ << "autocommit"
+ << false);
+
+ auto actionNotification = std::make_shared<Notification<Action>>();
+
+ CallbackFn abortCallback;
+ abortCallback = [coordinator, actionNotification, &abortCallback](
+ const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj) {
+ auto status = (!args.response.isOK()) ? args.response.status
+ : getStatusFromCommandResult(args.response.data);
+
+ if (status.isOK() || ErrorCodes::isVoteAbortError(status.code())) {
+ auto action = coordinator->recvAbortAck(shardId);
+ if (action != Action::kNone) {
+ actionNotification->set(action);
+ }
+ return;
+ }
+
+ LOG(3) << "Coordinator shard retrying " << commandObj << " against " << shardId;
+ sendAsyncCommandToShard(args.executor, shardId, commandObj, abortCallback);
+
+ };
+ sendCommandToShards(opCtx, nonVotedAbortParticipants, abortObj, abortCallback);
+ return actionNotification->get(opCtx);
+}
+
+
+void launchCoordinateCommitTask(ThreadPool& threadPool,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ TransactionCoordinator::StateMachine::Action initialAction) {
+ auto ch = threadPool.schedule([coordinator, lsid, txnNumber, initialAction]() {
+ try {
+ // The opCtx destructor handles unsetting itself from the Client
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ driveCoordinatorUntilDone(opCtx.get(), coordinator, lsid, txnNumber, initialAction);
+ } catch (const DBException& e) {
+ log() << "Exception was thrown while coordinating commit: " << causedBy(e.toStatus());
+ }
+ });
}
} // namespace txn
diff --git a/src/mongo/db/transaction_coordinator_commands_impl.h b/src/mongo/db/transaction_coordinator_commands_impl.h
index 5a211e424fc..da679b30dde 100644
--- a/src/mongo/db/transaction_coordinator_commands_impl.h
+++ b/src/mongo/db/transaction_coordinator_commands_impl.h
@@ -31,25 +31,54 @@
#pragma once
#include "mongo/db/operation_context.h"
+#include "mongo/db/transaction_commit_decision_gen.h"
#include "mongo/db/transaction_coordinator.h"
namespace mongo {
+class ThreadPool;
+
namespace txn {
+void launchCoordinateCommitTask(ThreadPool& threadPool,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ TransactionCoordinator::StateMachine::Action initialAction);
+
+/**
+ * Schedules prepare to be sent asynchronously to all participants and blocks on being signaled that
+ * a voteAbort or the final voteCommit has been received.
+ */
+TransactionCoordinator::StateMachine::Action sendPrepare(
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& participants);
+
/**
- * Asynchronously sends commit to all participants provided and calls recvCommitAck on the
- * coordinator if the commit command succeeds.
+ * Schedules commit to be sent asynchronously to all participants and blocks on being signaled that
+ * the final commit ack has been received.
*/
-void sendCommit(OperationContext* opCtx,
- std::shared_ptr<TransactionCoordinator> coordinator,
- const std::set<ShardId>& nonAckedParticipants,
- Timestamp commitTimestamp);
+TransactionCoordinator::StateMachine::Action sendCommit(
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& nonAckedParticipants,
+ Timestamp commitTimestamp);
/**
- * Asynchronously sends abort to all participants provided.
+ * Schedules abort to be sent asynchronously to all participants and blocks on being signaled that
+ * the final abort ack has been received.
*/
-void sendAbort(OperationContext* opCtx, const std::set<ShardId>& nonVotedAbortParticipants);
+TransactionCoordinator::StateMachine::Action sendAbort(
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& nonVotedAbortParticipants);
} // namespace txn
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index a1084c1d7c8..7767775296d 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -38,41 +38,48 @@
#include "mongo/db/service_context.h"
#include "mongo/db/transaction_coordinator.h"
#include "mongo/db/transaction_coordinator_commands_impl.h"
-#include "mongo/executor/task_executor.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/grid.h"
#include "mongo/s/shard_id.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
+
const auto transactionCoordinatorServiceDecoration =
ServiceContext::declareDecoration<TransactionCoordinatorService>();
-void doCoordinatorAction(OperationContext* opCtx,
- std::shared_ptr<TransactionCoordinator> coordinator,
- TransactionCoordinator::StateMachine::Action action) {
- switch (action) {
- case TransactionCoordinator::StateMachine::Action::kSendCommit: {
- txn::sendCommit(opCtx,
- coordinator,
- coordinator->getNonAckedCommitParticipants(),
- coordinator->getCommitTimestamp());
- break;
- }
- case TransactionCoordinator::StateMachine::Action::kSendAbort: {
- txn::sendAbort(opCtx, coordinator->getNonVotedAbortParticipants());
- break;
- }
- case TransactionCoordinator::StateMachine::Action::kNone:
- break;
- }
-}
+using Action = TransactionCoordinator::StateMachine::Action;
+using State = TransactionCoordinator::StateMachine::State;
+
+/**
+ * Constructs the default options for the thread pool used to run commit.
+ */
+ThreadPool::Options makeDefaultThreadPoolOptions() {
+ ThreadPool::Options options;
+ options.poolName = "TransactionCoordinatorService";
+ options.minThreads = 0;
+ options.maxThreads = 20;
+
+ // Ensure all threads have a client
+ options.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+ return options;
}
+} // namespace
+
TransactionCoordinatorService::TransactionCoordinatorService()
- : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {}
+ : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()),
+ _threadPool(makeDefaultThreadPoolOptions()) {
+ _threadPool.startup();
+}
-TransactionCoordinatorService::~TransactionCoordinatorService() = default;
+void TransactionCoordinatorService::shutdown() {
+ _threadPool.shutdown();
+}
TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
@@ -100,72 +107,42 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
"Cannot start a new transaction with the same session ID and transaction "
"number as a transaction that has already begun two-phase commit.",
latestCoordinator->state() ==
- TransactionCoordinator::StateMachine::State::kWaitingForParticipantList);
+ TransactionCoordinator::StateMachine::State::kUninitialized);
return;
}
// Call tryAbort on previous coordinator.
- auto actionToTake = latestCoordinator.get()->recvTryAbort();
- doCoordinatorAction(opCtx, latestCoordinator, actionToTake);
+ latestCoordinator.get()->recvTryAbort();
}
_coordinatorCatalog->create(lsid, txnNumber);
// TODO (SERVER-37024): Schedule abort task on executor to execute at commitDeadline.
- // TODO (SERVER-37025): Schedule poke task on executor.
}
-Future<TransactionCoordinatorService::CommitDecision>
-TransactionCoordinatorService::coordinateCommit(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const std::set<ShardId>& participantList) {
+Future<TransactionCoordinator::CommitDecision> TransactionCoordinatorService::coordinateCommit(
+ OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::set<ShardId>& participantList) {
auto coordinator = _coordinatorCatalog->get(lsid, txnNumber);
if (!coordinator) {
- return TransactionCoordinatorService::CommitDecision::kAbort;
+ // TODO (SERVER-37440): Return decision "kForgotten", which indicates that a decision was
+ // already made and forgotten. The caller can recover the decision from the local
+ // participant if a higher transaction has not been started on the session and the session
+ // has not been reaped.
+ // Currently is MONGO_UNREACHABLE because no tests should cause the router to re-send
+ // coordinateCommitTransaction.
+ MONGO_UNREACHABLE;
}
- auto actionToTake = coordinator.get()->recvCoordinateCommit(participantList);
- doCoordinatorAction(opCtx, coordinator.get(), actionToTake);
-
- return coordinator.get()->waitForCompletion().then([](auto finalState) {
- switch (finalState) {
- case TransactionCoordinator::StateMachine::State::kAborted:
- return TransactionCoordinatorService::CommitDecision::kAbort;
- case TransactionCoordinator::StateMachine::State::kCommitted:
- return TransactionCoordinatorService::CommitDecision::kCommit;
- default:
- MONGO_UNREACHABLE;
- }
- });
-}
-
-void TransactionCoordinatorService::voteCommit(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const ShardId& shardId,
- Timestamp prepareTimestamp) {
- auto coordinator = _coordinatorCatalog->get(lsid, txnNumber);
- if (!coordinator) {
- txn::sendAbort(opCtx, {shardId});
- return;
+ Action initialAction = coordinator->recvCoordinateCommit(participantList);
+ if (initialAction != Action::kNone) {
+ txn::launchCoordinateCommitTask(_threadPool, coordinator, lsid, txnNumber, initialAction);
}
- auto actionToTake = coordinator.get()->recvVoteCommit(shardId, prepareTimestamp);
- doCoordinatorAction(opCtx, coordinator.get(), actionToTake);
-}
-
-void TransactionCoordinatorService::voteAbort(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const ShardId& shardId) {
- auto coordinator = _coordinatorCatalog->get(lsid, txnNumber);
-
- if (coordinator) {
- auto actionToTake = coordinator.get()->recvVoteAbort(shardId);
- doCoordinatorAction(opCtx, coordinator.get(), actionToTake);
- }
+ return coordinator.get()->waitForDecision();
}
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h
index 5eb8e6c7f66..c7c4ab77e46 100644
--- a/src/mongo/db/transaction_coordinator_service.h
+++ b/src/mongo/db/transaction_coordinator_service.h
@@ -34,7 +34,9 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/transaction_coordinator.h"
#include "mongo/db/transaction_coordinator_catalog.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -47,13 +49,13 @@ class TransactionCoordinatorService final {
MONGO_DISALLOW_COPYING(TransactionCoordinatorService);
public:
- enum class CommitDecision {
- kCommit,
- kAbort,
- };
-
TransactionCoordinatorService();
- ~TransactionCoordinatorService();
+ ~TransactionCoordinatorService() = default;
+
+ /**
+ * Shuts down the thread pool used for executing commits.
+ */
+ void shutdown();
/**
* Retrieves the TransactionCoordinatorService associated with the service or operation context.
@@ -75,37 +77,17 @@ public:
* Delivers coordinateCommit to the TransactionCoordinator, asynchronously sends commit or
* abort to participants if necessary, and returns a Future that will contain the commit
* decision when the transaction finishes committing or aborting.
- *
- * TODO (SERVER-37364): On the commit path, this Future should instead be signaled as soon as
- * the coordinator is finished persisting the commit decision, rather than waiting until the
- * commit process has been completed entirely.
*/
- Future<CommitDecision> coordinateCommit(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const std::set<ShardId>& participantList);
-
- /**
- * Delivers voteCommit to the TransactionCoordinator and asynchronously sends commit or abort to
- * participants if necessary.
- */
- void voteCommit(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const ShardId& shardId,
- Timestamp prepareTimestamp);
-
- /**
- * Delivers voteAbort on the TransactionCoordinator and asynchronously sends commit or abort to
- * participants if necessary.
- */
- void voteAbort(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const ShardId& shardId);
+ Future<TransactionCoordinator::CommitDecision> coordinateCommit(
+ OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::set<ShardId>& participantList);
private:
std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog;
+
+ ThreadPool _threadPool;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp
index 27d6a034821..3034e117bcc 100644
--- a/src/mongo/db/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/transaction_coordinator_service_test.cpp
@@ -1,4 +1,3 @@
-
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
@@ -34,8 +33,10 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands/txn_cmds_gen.h"
+#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/session_catalog.h"
+#include "mongo/db/transaction_coordinator_commands_impl.h"
#include "mongo/db/transaction_coordinator_service.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
@@ -55,7 +56,9 @@ const std::set<ShardId> kThreeShardIdSet{{"s1"}, {"s2"}, {"s3"}};
const Timestamp kDummyTimestamp = Timestamp::min();
const Date_t kCommitDeadline = Date_t::max();
const StatusWith<BSONObj> kRetryableError = {ErrorCodes::HostUnreachable, ""};
+const StatusWith<BSONObj> kNoSuchTransaction = {ErrorCodes::NoSuchTransaction, ""};
const StatusWith<BSONObj> kOk = BSON("ok" << 1);
+const StatusWith<BSONObj> kPrepareOk = BSON("ok" << 1 << "prepareTimestamp" << Timestamp(1, 1));
HostAndPort makeHostAndPort(const ShardId& shardId) {
return HostAndPort(str::stream() << shardId << ":123");
@@ -66,9 +69,6 @@ public:
void setUp() override {
ShardServerTestFixture::setUp();
- operationContext()->setLogicalSessionId(_lsid);
- operationContext()->setTxnNumber(_txnNumber);
-
for (const auto& shardId : kThreeShardIdList) {
auto shardTargeter = RemoteCommandTargeterMock::get(
uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))
@@ -89,6 +89,14 @@ public:
});
}
+ void assertPrepareSentAndRespondWithSuccess() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kPrepareOk);
+ }
+
+ void assertPrepareSentAndRespondWithNoSuchTransaction() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kNoSuchTransaction);
+ }
+
void assertAbortSentAndRespondWithSuccess() {
assertCommandSentAndRespondWith("abortTransaction", kOk);
}
@@ -125,9 +133,8 @@ public:
auto commitDecisionFuture = coordinatorService.coordinateCommit(
operationContext(), lsid, txnNumber, transactionParticipantShards);
- for (const auto& shardId : transactionParticipantShards) {
- coordinatorService.voteCommit(
- operationContext(), lsid, txnNumber, shardId, kDummyTimestamp);
+ for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
+ assertPrepareSentAndRespondWithSuccess();
}
for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
@@ -150,7 +157,13 @@ public:
auto commitDecisionFuture =
coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet);
- coordinatorService.voteAbort(operationContext(), lsid, txnNumber, abortingShard);
+ for (size_t i = 0; i < shardIdSet.size(); ++i) {
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ }
+
+ // Abort gets sent to the second participant as soon as the first participant
+ // receives a not-okay response to prepare.
+ assertAbortSentAndRespondWithSuccess();
// Wait for abort to complete.
commitDecisionFuture.get();
@@ -218,7 +231,6 @@ private:
std::unique_ptr<TransactionCoordinatorService> _coordinatorService;
};
-
} // namespace
TEST_F(TransactionCoordinatorServiceTest, CreateCoordinatorOnNewSessionSucceeds) {
@@ -253,31 +265,6 @@ TEST_F(TransactionCoordinatorServiceTest,
}
TEST_F(TransactionCoordinatorServiceTest,
- CreateCoordinatorWithHigherTxnNumberThanOngoingUncommittedTxnAbortsPreviousTxnAndSucceeds) {
-
- TransactionCoordinatorService coordinatorService;
- coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
-
- // This is currently the only way we have to get the commit decision.
- auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
-
- // Create a coordinator for a higher transaction number in the same session.
- coordinatorService.createCoordinator(
- operationContext(), lsid(), txnNumber() + 1, kCommitDeadline);
-
- assertAbortSentAndRespondWithSuccess();
- assertAbortSentAndRespondWithSuccess();
-
- // We should have aborted the previous transaction.
- ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort));
-
- // Make sure the newly created one works fine.
- commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet);
-}
-
-TEST_F(TransactionCoordinatorServiceTest,
CreateCoordinatorWithHigherTxnNumberThanOngoingCommittingTxnCommitsPreviousTxnAndSucceeds) {
TransactionCoordinatorService coordinatorService;
@@ -287,16 +274,18 @@ TEST_F(TransactionCoordinatorServiceTest,
// commit acks.
auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- coordinatorService.voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
- coordinatorService.voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
+
+ // Simulate all participants acking prepare/voting to commit.
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
// Create a coordinator for a higher transaction number in the same session. This should
// "tryAbort" on the old coordinator which should NOT abort it since it's already waiting for
// commit acks.
coordinatorService.createCoordinator(
operationContext(), lsid(), txnNumber() + 1, kCommitDeadline);
+ auto newTxnCommitDecisionFuture = coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber() + 1, kTwoShardIdSet);
// Finish committing the old transaction by sending it commit acks from both participants.
assertCommitSentAndRespondWithSuccess();
@@ -304,35 +293,44 @@ TEST_F(TransactionCoordinatorServiceTest,
// The old transaction should now be committed.
ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kCommit));
+ static_cast<int>(TransactionCoordinator::CommitDecision::kCommit));
// Make sure the newly created one works fine too.
- commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet);
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+ // commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet);
}
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- CoordinateCommitWithNoVotesReturnsNotReadyFuture) {
+ CoordinateCommitReturnsCorrectCommitDecisionOnAbort) {
auto commitDecisionFuture = coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- ASSERT_FALSE(commitDecisionFuture.isReady());
- // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
- abortTransaction(
- *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[0]);
+ // Simulate a participant voting to abort.
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ assertPrepareSentAndRespondWithSuccess();
+
+ // Only send abort to the node that voted to commit.
+ assertAbortSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision),
+ static_cast<int>(TransactionCoordinator::CommitDecision::kAbort));
}
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- CoordinateCommitReturnsCorrectCommitDecisionOnAbort) {
+ CoordinateCommitWithNoVotesReturnsNotReadyFuture) {
auto commitDecisionFuture = coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]);
-
- auto commitDecision = commitDecisionFuture.get();
- ASSERT_EQ(static_cast<int>(commitDecision),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort));
+ ASSERT_FALSE(commitDecisionFuture.isReady());
+ // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
+ abortTransaction(
+ *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[0]);
}
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
@@ -341,31 +339,14 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
auto commitDecisionFuture = coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
-
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
assertCommitSentAndRespondWithSuccess();
assertCommitSentAndRespondWithSuccess();
auto commitDecision = commitDecisionFuture.get();
ASSERT_EQ(static_cast<int>(commitDecision),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kCommit));
-}
-
-TEST_F(TransactionCoordinatorServiceTest,
- CoordinateCommitReturnsAbortDecisionWhenCoordinatorDoesNotExist) {
-
- TransactionCoordinatorService coordinatorService;
- auto commitDecisionFuture = coordinatorService.coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- ASSERT_TRUE(commitDecisionFuture.isReady());
-
- auto commitDecision = commitDecisionFuture.get();
- ASSERT_EQ(static_cast<int>(commitDecision),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort));
+ static_cast<int>(TransactionCoordinator::CommitDecision::kCommit));
}
TEST_F(TransactionCoordinatorServiceTest,
@@ -409,152 +390,4 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
static_cast<int>(commitDecisionFuture2.get()));
}
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteCommitDoesNotSendCommitIfParticipantListNotYetReceived) {
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- assertNoMessageSent();
- // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
- abortTransaction(
- *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[1]);
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- ResentVoteCommitDoesNotSendCommitIfParticipantListNotYetReceived) {
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- assertNoMessageSent();
-
- // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
- abortTransaction(
- *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[1]);
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- ResentVoteCommitDoesNotSendCommitIfParticipantListHasBeenReceived) {
-
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- assertNoMessageSent();
-
- // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
- abortTransaction(
- *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[1]);
- commitDecisionFuture.get();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn, FinalVoteCommitSendsCommit) {
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
-
- assertCommitSentAndRespondWithSuccess();
- assertCommitSentAndRespondWithSuccess();
-}
-
-// This logic is obviously correct for a transaction which has been aborted prior to receiving
-// coordinateCommit, when the coordinator does not yet know all participants and so cannot send
-// abortTransaction to all participants. In this case, it can potentially receive voteCommit
-// messages from some participants even after the local TransactionCoordinator object has
-// transitioned to the aborted state and then removed from the service. We then must tell
-// the participant that sent the voteCommit message that it should abort.
-//
-// More subtly, it also works for voteCommit retries for transactions that have already committed,
-// because we'll send abort to the participant, and the abort command will just receive
-// NoSuchTransaction or TransactionTooOld (because the participant must have already committed if
-// the transaction coordinator finished committing).
-TEST_F(TransactionCoordinatorServiceTest,
- VoteCommitForCoordinatorThatDoesNotExistSendsVoteAbortToCallingParticipant) {
-
- TransactionCoordinatorService coordinatorService;
- coordinatorService.voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- assertAbortSentAndRespondWithSuccess();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- ResentFinalVoteCommitOnlySendsCommitToNonAckedParticipants) {
-
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
-
- assertCommitSentAndRespondWithSuccess();
- assertCommitSentAndRespondWithRetryableError();
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
-
- assertCommitSentAndRespondWithSuccess();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteAbortDoesNotSendAbortIfIsOnlyVoteReceivedSoFar) {
-
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]);
-
- assertNoMessageSent();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteAbortForCoordinatorThatDoesNotExistDoesNotSendAbort) {
-
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]);
- // Coordinator no longer exists.
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]);
-
- assertNoMessageSent();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteAbortSendsAbortIfSomeParticipantsHaveVotedCommit) {
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[1]);
-
- // This should be sent to the shard that voted commit (s1).
- assertAbortSentAndRespondWithSuccess();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteAbortAfterReceivingParticipantListSendsAbortToAllParticipantsWhoHaventVotedAbort) {
-
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
- operationContext(), lsid(), txnNumber(), kThreeShardIdSet);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kThreeShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kThreeShardIdList[1]);
-
- // Should send abort to shards s1 and s3 (the ones that did not vote abort).
- assertAbortSentAndRespondWithSuccess();
- assertAbortSentAndRespondWithSuccess();
-}
-
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_state_machine_test.cpp b/src/mongo/db/transaction_coordinator_state_machine_test.cpp
index 4fe4a0fcac2..da97c29f0da 100644
--- a/src/mongo/db/transaction_coordinator_state_machine_test.cpp
+++ b/src/mongo/db/transaction_coordinator_state_machine_test.cpp
@@ -70,15 +70,49 @@ void expectScheduleThrows(Schedule schedule) {
ASSERT_EQ(State::kBroken, coordinator.state());
}
+
+void doCommit(StateMachine& coordinator) {
+ runSchedule(coordinator,
+ {Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvFinalVoteCommit,
+ Event::kMadeCommitDecisionDurable,
+ Event::kRecvFinalCommitAck});
+}
+
+void doAbort(StateMachine& coordinator) {
+ runSchedule(coordinator,
+ {Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvVoteAbort,
+ Event::kMadeAbortDecisionDurable,
+ Event::kRecvFinalAbortAck});
+}
+
TEST(CoordinatorStateMachine, AbortSucceeds) {
- expectScheduleSucceeds({Event::kRecvVoteAbort}, State::kAborted);
- expectScheduleSucceeds({Event::kRecvVoteAbort, Event::kRecvVoteAbort}, State::kAborted);
+ expectScheduleSucceeds({Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvVoteAbort,
+ Event::kMadeAbortDecisionDurable,
+ Event::kRecvFinalAbortAck},
+ State::kAborted);
+ // Check that it's okay to receive two vote aborts.
+ expectScheduleSucceeds({Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvVoteAbort,
+ Event::kRecvVoteAbort,
+ Event::kMadeAbortDecisionDurable,
+ Event::kRecvFinalAbortAck},
+ State::kAborted);
}
TEST(CoordinatorStateMachine, CommitSucceeds) {
- expectScheduleSucceeds(
- {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit, Event::kRecvFinalCommitAck},
- State::kCommitted);
+ expectScheduleSucceeds({Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvFinalVoteCommit,
+ Event::kMadeCommitDecisionDurable,
+ Event::kRecvFinalCommitAck},
+ State::kCommitted);
}
TEST(CoordinatorStateMachine, RecvFinalVoteCommitAndRecvVoteAbortThrows) {
@@ -90,7 +124,7 @@ TEST(CoordinatorStateMachine, RecvFinalVoteCommitAndRecvVoteAbortThrows) {
TEST(CoordinatorStateMachine, WaitForTransitionToOnlyTerminalStatesReturnsCorrectStateOnAbort) {
StateMachine coordinator;
auto future = coordinator.waitForTransitionTo({State::kCommitted, State::kAborted});
- runSchedule(coordinator, {Event::kRecvVoteAbort});
+ doAbort(coordinator);
ASSERT_EQ(future.get(), State::kAborted);
}
@@ -100,15 +134,13 @@ TEST(CoordinatorStateMachine, WaitForTransitionToStatesThatHaventBeenReachedRetu
ASSERT_FALSE(future.isReady());
// We need to abort here because we require that all promises are triggered prior to coordinator
// destruction.
- runSchedule(coordinator, {Event::kRecvVoteAbort});
+ doAbort(coordinator);
}
TEST(CoordinatorStateMachine, WaitForTransitionToOnlyTerminalStatesReturnsCorrectStateOnCommit) {
StateMachine coordinator;
auto future = coordinator.waitForTransitionTo({State::kCommitted, State::kAborted});
- runSchedule(
- coordinator,
- {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit, Event::kRecvFinalCommitAck});
+ doCommit(coordinator);
ASSERT_EQ(future.get(), State::kCommitted);
}
@@ -117,8 +149,9 @@ TEST(CoordinatorStateMachine,
StateMachine coordinator;
runSchedule(coordinator, {Event::kRecvParticipantList});
auto future = coordinator.waitForTransitionTo(
- {State::kWaitingForVotes, State::kCommitted, State::kAborted});
- ASSERT_EQ(future.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes);
+ {State::kMakingParticipantListDurable, State::kCommitted, State::kAborted});
+ ASSERT_EQ(future.get(),
+ TransactionCoordinator::StateMachine::State::kMakingParticipantListDurable);
}
TEST(CoordinatorStateMachine, WaitForTransitionToMultipleStatesReturnsFirstStateToBeHit) {
@@ -128,7 +161,7 @@ TEST(CoordinatorStateMachine, WaitForTransitionToMultipleStatesReturnsFirstState
State::kCommitted,
State::kAborted});
- runSchedule(coordinator, {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit});
+ doCommit(coordinator);
ASSERT_EQ(future.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes);
}
@@ -140,7 +173,7 @@ TEST(CoordinatorStateMachine,
{State::kWaitingForVotes, State::kCommitted, State::kAborted});
auto future2 = coordinator.waitForTransitionTo(
{State::kWaitingForCommitAcks, State::kCommitted, State::kAborted});
- runSchedule(coordinator, {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit});
+ doCommit(coordinator);
ASSERT_EQ(future1.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes);
ASSERT_EQ(future2.get(), TransactionCoordinator::StateMachine::State::kWaitingForCommitAcks);
@@ -163,7 +196,7 @@ TEST(CoordinatorStateMachine,
coordinator.waitForTransitionTo(
{State::kWaitingForCommitAcks, State::kCommitted, State::kAborted})};
- runSchedule(coordinator, {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit});
+ doCommit(coordinator);
for (auto& future1 : futures1) {
ASSERT_EQ(future1.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes);
diff --git a/src/mongo/db/transaction_coordinator_test.cpp b/src/mongo/db/transaction_coordinator_test.cpp
index 6f154b3268b..458ff30244e 100644
--- a/src/mongo/db/transaction_coordinator_test.cpp
+++ b/src/mongo/db/transaction_coordinator_test.cpp
@@ -34,135 +34,107 @@
#include <future>
+#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
+namespace {
+
using State = TransactionCoordinator::StateMachine::State;
+using Coordinator = ServiceContextMongoDTest;
const Timestamp dummyTimestamp = Timestamp::min();
-TEST(Coordinator, SomeParticipantVotesAbortLeadsToAbort) {
- TransactionCoordinator coordinator;
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteAbort(ShardId("shard0000"));
- coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- ASSERT_EQ(State::kAborted, coordinator.state());
+void doCommit(TransactionCoordinator& coordinator) {
+ coordinator.recvCoordinateCommit({ShardId("shard0000")});
+ coordinator.madeParticipantListDurable();
+ coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
+ coordinator.madeCommitDecisionDurable();
+ coordinator.recvCommitAck(ShardId("shard0000"));
}
-TEST(Coordinator, SomeParticipantsVoteAbortBeforeCoordinatorReceivesParticipantListLeadsToAbort) {
- TransactionCoordinator coordinator;
+void doAbort(TransactionCoordinator& coordinator) {
+ coordinator.recvCoordinateCommit({ShardId("shard0000")});
+ coordinator.madeParticipantListDurable();
coordinator.recvVoteAbort(ShardId("shard0000"));
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- ASSERT_EQ(State::kAborted, coordinator.state());
+ coordinator.madeAbortDecisionDurable();
+ coordinator.recvAbortAck(ShardId("shard0000"));
+}
}
-TEST(Coordinator, AllParticipantsVoteCommitLeadsToCommit) {
+TEST_F(Coordinator, SomeParticipantVotesAbortLeadsToAbort) {
TransactionCoordinator coordinator;
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
+ coordinator.madeParticipantListDurable();
+ coordinator.recvVoteAbort(ShardId("shard0000"));
coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- coordinator.recvCommitAck(ShardId("shard0000"));
- coordinator.recvCommitAck(ShardId("shard0001"));
- ASSERT_EQ(State::kCommitted, coordinator.state());
+ coordinator.madeAbortDecisionDurable();
+ coordinator.recvAbortAck(ShardId("shard0001"));
+ ASSERT_EQ(State::kAborted, coordinator.state());
}
-TEST(
- Coordinator,
- AllParticipantsVoteCommitSomeParticipantsVoteBeforeCoordinatorReceivesParticipantListLeadsToCommit) {
+TEST_F(Coordinator, AllParticipantsVoteCommitLeadsToCommit) {
TransactionCoordinator coordinator;
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
+ coordinator.madeParticipantListDurable();
+ coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
+ coordinator.madeCommitDecisionDurable();
coordinator.recvCommitAck(ShardId("shard0000"));
coordinator.recvCommitAck(ShardId("shard0001"));
ASSERT_EQ(State::kCommitted, coordinator.state());
}
-TEST(Coordinator, NotHearingSomeParticipantsVoteOtherParticipantsVotedCommitLeadsToStillWaiting) {
+TEST_F(Coordinator, NotHearingSomeParticipantsVoteOtherParticipantsVotedCommitLeadsToStillWaiting) {
TransactionCoordinator coordinator;
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
+ coordinator.madeParticipantListDurable();
coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
ASSERT_EQ(State::kWaitingForVotes, coordinator.state());
}
-TEST(Coordinator, NotHearingSomeParticipantsVoteAnotherParticipantVotedAbortLeadsToAbort) {
+TEST_F(Coordinator, NotHearingSomeParticipantsVoteAnotherParticipantVotedAbortLeadsToAbort) {
TransactionCoordinator coordinator;
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
+ coordinator.madeParticipantListDurable();
coordinator.recvVoteAbort(ShardId("shard0000"));
- ASSERT_EQ(State::kAborted, coordinator.state());
+ ASSERT_EQ(State::kMakingAbortDecisionDurable, coordinator.state());
}
-TEST(Coordinator, NotHearingSomeParticipantsCommitAckLeadsToStillWaiting) {
+TEST_F(Coordinator, NotHearingSomeParticipantsCommitAckLeadsToStillWaiting) {
TransactionCoordinator coordinator;
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
+ coordinator.madeParticipantListDurable();
coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
+ coordinator.madeCommitDecisionDurable();
coordinator.recvCommitAck(ShardId("shard0000"));
ASSERT_EQ(State::kWaitingForCommitAcks, coordinator.state());
}
-TEST(Coordinator, TryAbortWhileWaitingForParticipantListSuccessfullyAborts) {
- TransactionCoordinator coordinator;
- coordinator.recvTryAbort();
- ASSERT_EQ(State::kAborted, coordinator.state());
-}
-
-TEST(Coordinator, TryAbortWhileWaitingForVotesSuccessfullyAborts) {
- TransactionCoordinator coordinator;
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
- coordinator.recvTryAbort();
- ASSERT_EQ(State::kAborted, coordinator.state());
-}
-
-TEST(Coordinator, TryAbortWhileWaitingForCommitAcksDoesNotCancelCommit) {
- TransactionCoordinator coordinator;
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
- coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- ASSERT_EQ(State::kWaitingForCommitAcks, coordinator.state());
- coordinator.recvTryAbort();
- ASSERT_EQ(State::kWaitingForCommitAcks, coordinator.state());
- coordinator.recvCommitAck(ShardId("shard0000"));
- coordinator.recvCommitAck(ShardId("shard0001"));
- ASSERT_EQ(State::kCommitted, coordinator.state());
-}
-
-TEST(Coordinator, VoteCommitToAbortedCoordinatorRespondsWithAbort) {
- TransactionCoordinator coordinator;
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteAbort(ShardId("shard0000"));
- ASSERT_EQ(State::kAborted, coordinator.state());
- auto action = coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- ASSERT_EQ(TransactionCoordinator::StateMachine::Action::kSendAbort, action);
-}
-
-TEST(Coordinator, WaitForCompletionReturnsOnChangeToCommitted) {
+TEST_F(Coordinator, WaitForCompletionReturnsOnChangeToCommitted) {
TransactionCoordinator coordinator;
auto future = coordinator.waitForCompletion();
- coordinator.recvCoordinateCommit({ShardId("shard0000")});
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
- coordinator.recvCommitAck(ShardId("shard0000"));
+ doCommit(coordinator);
auto finalState = future.get();
ASSERT_EQ(finalState, TransactionCoordinator::StateMachine::State::kCommitted);
}
-TEST(Coordinator, WaitForCompletionReturnsOnChangeToAborted) {
+TEST_F(Coordinator, WaitForCompletionReturnsOnChangeToAborted) {
TransactionCoordinator coordinator;
auto future = coordinator.waitForCompletion();
- coordinator.recvVoteAbort(ShardId("shard0000"));
+ doAbort(coordinator);
auto finalState = future.get();
ASSERT_EQ(finalState, TransactionCoordinator::StateMachine::State::kAborted);
}
-TEST(Coordinator, RepeatedCallsToWaitForCompletionAllReturn) {
+TEST_F(Coordinator, RepeatedCallsToWaitForCompletionAllReturn) {
TransactionCoordinator coordinator;
auto futures = {coordinator.waitForCompletion(),
coordinator.waitForCompletion(),
coordinator.waitForCompletion()};
- coordinator.recvVoteAbort(ShardId("shard0000"));
+ doAbort(coordinator);
for (auto& future : futures) {
auto finalState = future.get();
@@ -170,9 +142,9 @@ TEST(Coordinator, RepeatedCallsToWaitForCompletionAllReturn) {
}
}
-TEST(Coordinator, CallingWaitForCompletionAfterAlreadyCompleteReturns) {
+TEST_F(Coordinator, CallingWaitForCompletionAfterAlreadyCompleteReturns) {
TransactionCoordinator coordinator;
- coordinator.recvVoteAbort(ShardId("shard0000"));
+ doAbort(coordinator);
auto future = coordinator.waitForCompletion();
auto finalState = future.get();
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 07217ec089f..5b6ed2d3c02 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -28,7 +28,7 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction
#include "mongo/platform/basic.h"
@@ -614,47 +614,23 @@ Shard::CommandResponse TransactionRouter::_commitSingleShardTransaction(Operatio
Shard::CommandResponse TransactionRouter::_commitMultiShardTransaction(OperationContext* opCtx) {
invariant(_coordinatorId);
-
- auto shardRegistry = Grid::get(opCtx)->shardRegistry();
-
- PrepareTransaction prepareCmd;
- prepareCmd.setDbName("admin");
- prepareCmd.setCoordinatorId(*_coordinatorId);
-
- auto prepareCmdObj = prepareCmd.toBSON(
- BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+ auto coordinatorIter = _participants.find(*_coordinatorId);
+ invariant(coordinatorIter != _participants.end());
std::vector<CommitParticipant> participantList;
for (const auto& participantEntry : _participants) {
- ShardId shardId(participantEntry.first);
-
CommitParticipant commitParticipant;
- commitParticipant.setShardId(shardId);
+ commitParticipant.setShardId(participantEntry.first);
participantList.push_back(std::move(commitParticipant));
-
- if (participantEntry.second.isCoordinator()) {
- // coordinateCommit is sent to participant that is also a coordinator.
- invariant(shardId == *_coordinatorId);
- continue;
- }
-
- const auto& participant = participantEntry.second;
- auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
- shard->runFireAndForgetCommand(opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- participant.attachTxnFieldsIfNeeded(prepareCmdObj, false));
}
- auto coordinatorShard = uassertStatusOK(shardRegistry->getShard(opCtx, *_coordinatorId));
+ auto coordinatorShard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, *_coordinatorId));
CoordinateCommitTransaction coordinateCommitCmd;
coordinateCommitCmd.setDbName("admin");
coordinateCommitCmd.setParticipants(participantList);
- auto coordinatorIter = _participants.find(*_coordinatorId);
- invariant(coordinatorIter != _participants.end());
-
return uassertStatusOK(coordinatorShard->runCommandWithFixedRetryAttempts(
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 889c0c5ceed..bc14fb5b326 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -657,7 +657,7 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) {
future.timed_get(kFutureTimeout);
}
-TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipants) {
+TEST_F(TransactionRouterTest, SendCoordinateCommitForMultipleParticipants) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
@@ -676,21 +676,6 @@ TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipa
auto future = launchAsync([&] { txnRouter->commitTransaction(operationContext()); });
onCommand([&](const RemoteCommandRequest& request) {
- ASSERT_EQ(hostAndPort2, request.target);
- ASSERT_EQ("admin", request.dbname);
-
- auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
- ASSERT_EQ(cmdName, "prepareTransaction");
-
- auto coordinator = request.cmdObj["coordinatorId"].str();
- ASSERT_EQ(shard1.toString(), coordinator);
-
- checkSessionDetails(request.cmdObj, lsid, txnNum, boost::none);
-
- return BSON("ok" << 1);
- });
-
- onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
ASSERT_EQ("admin", request.dbname);
diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js
index 2c30b10e632..2119e5cef2f 100644
--- a/src/mongo/shell/servers.js
+++ b/src/mongo/shell/servers.js
@@ -1121,16 +1121,6 @@ var MongoRunner, _startMongod, startMongoProgram, runMongoProgram, startMongoPro
}
}
- // New mongod-specific options in 4.1.x.
- if (!programMajorMinorVersion || programMajorMinorVersion >= 410) {
- if (jsTest.options().setSkipShardingPartsOfPrepareTransactionFailpoint &&
- jsTest.options().enableTestCommands) {
- argArray.push(
- ...["--setParameter",
- "failpoint.skipShardingPartsOfPrepareTransaction={mode:'alwaysOn'}"]);
- }
- }
-
// New mongod-specific options in 4.0.x
if (!programMajorMinorVersion || programMajorMinorVersion >= 400) {
if (jsTest.options().transactionLifetimeLimitSeconds !== undefined) {
diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js
index e6477b0fa10..a98de90cdc1 100644
--- a/src/mongo/shell/utils.js
+++ b/src/mongo/shell/utils.js
@@ -318,8 +318,6 @@ jsTestOptions = function() {
mqlTestFile: TestData.mqlTestFile,
mqlRootPath: TestData.mqlRootPath,
disableImplicitSessions: TestData.disableImplicitSessions || false,
- setSkipShardingPartsOfPrepareTransactionFailpoint:
- TestData.setSkipShardingPartsOfPrepareTransactionFailpoint || false,
});
}
return _jsTestOptions;