summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2018-11-15 21:07:42 -0500
committerEsha Maharishi <esha.maharishi@mongodb.com>2018-11-29 17:29:07 -0500
commit179748103a198727e76655932d317b27af758af7 (patch)
tree77fdd8a4328d6503f8065725f6d447b74cb660e2
parente54d65fa8e444dcfd5bba66f1f4c40203e5ebe16 (diff)
downloadmongo-179748103a198727e76655932d317b27af758af7.tar.gz
SERVER-37440 coordinateCommit should fall back to recovering decision from local participant
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml1
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml1
-rw-r--r--buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j21
-rw-r--r--jstests/sharding/transactions_recover_decision_from_local_participant.js146
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp103
-rw-r--r--src/mongo/db/s/recover_transaction_decision_from_local_participant.h57
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp88
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp45
-rw-r--r--src/mongo/db/transaction_coordinator_service.h20
-rw-r--r--src/mongo/db/transaction_coordinator_service_test.cpp141
-rw-r--r--src/mongo/db/transaction_participant_test.cpp380
12 files changed, 911 insertions, 73 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 836a2291c22..18c87ba2066 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -50,6 +50,7 @@ selector:
- jstests/sharding/transactions_implicit_abort.js
- jstests/sharding/transactions_multi_writes.js
- jstests/sharding/transactions_read_concerns.js
+ - jstests/sharding/transactions_recover_decision_from_local_participant.js
- jstests/sharding/transactions_reject_writes_for_moved_chunks.js
- jstests/sharding/transactions_snapshot_errors_first_statement.js
- jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index 0a270df65e4..65d61bbd16e 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -365,6 +365,7 @@ selector:
- jstests/sharding/transactions_implicit_abort.js
- jstests/sharding/transactions_multi_writes.js
- jstests/sharding/transactions_read_concerns.js
+ - jstests/sharding/transactions_recover_decision_from_local_participant.js
- jstests/sharding/transactions_reject_writes_for_moved_chunks.js
- jstests/sharding/transactions_snapshot_errors_first_statement.js
- jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
diff --git a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
index 8ebf8f241bb..bfca3798112 100644
--- a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
+++ b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
@@ -59,6 +59,7 @@ selector:
- jstests/sharding/transactions_implicit_abort.js
- jstests/sharding/transactions_multi_writes.js
- jstests/sharding/transactions_read_concerns.js
+ - jstests/sharding/transactions_recover_decision_from_local_participant.js
- jstests/sharding/transactions_reject_writes_for_moved_chunks.js
- jstests/sharding/transactions_snapshot_errors_first_statement.js
- jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
diff --git a/jstests/sharding/transactions_recover_decision_from_local_participant.js b/jstests/sharding/transactions_recover_decision_from_local_participant.js
new file mode 100644
index 00000000000..784d4e097ae
--- /dev/null
+++ b/jstests/sharding/transactions_recover_decision_from_local_participant.js
@@ -0,0 +1,146 @@
+/**
+ * Tests that the coordinateCommitTransaction command falls back to recovering the decision from
+ * the local participant.
+ *
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+(function() {
+ "use strict";
+
+ // The test modifies config.transactions, which must be done outside of a session.
+ TestData.disableImplicitSessions = true;
+
+ let st = new ShardingTest({shards: 2});
+
+ assert.commandWorked(st.s.adminCommand({enableSharding: 'test'}));
+ st.ensurePrimaryShard('test', st.shard0.name);
+ assert.commandWorked(st.s.adminCommand({shardCollection: 'test.user', key: {x: 1}}));
+ assert.commandWorked(st.s.adminCommand({split: 'test.user', middle: {x: 0}}));
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: 'test.user', find: {x: 0}, to: st.shard1.name}));
+
+ // Insert documents to prime mongos and shards with the latest sharding metadata.
+ let testDB = st.s.getDB('test');
+ assert.commandWorked(testDB.runCommand({insert: 'user', documents: [{x: -10}, {x: 10}]}));
+
+ let coordinatorConn = st.rs0.getPrimary();
+
+ const runCoordinateCommit = function(txnNumber, participantList) {
+ return coordinatorConn.adminCommand({
+ coordinateCommitTransaction: 1,
+ participants: participantList,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false
+ });
+ };
+
+ const startNewTransactionThroughMongos = function() {
+ const updateDocumentOnShard0 = {
+ q: {x: -1},
+ u: {"$set": {lastTxnNumber: txnNumber}},
+ upsert: true
+ };
+ const updateDocumentOnShard1 = {
+ q: {x: 1},
+ u: {"$set": {lastTxnNumber: txnNumber}},
+ upsert: true
+ };
+ assert.commandWorked(testDB.runCommand({
+ update: 'user',
+ updates: [updateDocumentOnShard0, updateDocumentOnShard1],
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false,
+ startTransaction: true
+ }));
+ };
+
+ // TODO (SERVER-37364): Once coordinateCommit returns as soon as the decision is made durable,
+ // this test will pass but will be racy in terms of whether it's testing that coordinateCommit
+ // returns the TransactionCoordinator's decision or local TransactionParticipant's decision.
+ const runTest = function(participantList) {
+ jsTest.log("running test with participant list: " + tojson(participantList));
+
+ jsTest.log("coordinateCommit sent when local participant has never heard of the session.");
+ assert.commandFailedWithCode(runCoordinateCommit(txnNumber, participantList),
+ ErrorCodes.NoSuchTransaction);
+
+ jsTest.log(
+ "coordinateCommit sent after coordinator finished coordinating an abort decision.");
+ ++txnNumber;
+ startNewTransactionThroughMongos();
+ assert.commandWorked(st.rs0.getPrimary().adminCommand({
+ abortTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false
+ }));
+ assert.commandFailedWithCode(testDB.adminCommand({
+ commitTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false
+ }),
+ ErrorCodes.NoSuchTransaction);
+ assert.commandFailedWithCode(runCoordinateCommit(txnNumber, participantList),
+ ErrorCodes.NoSuchTransaction);
+
+ jsTest.log(
+ "coordinateCommit sent after coordinator finished coordinating a commit decision.");
+ ++txnNumber;
+ startNewTransactionThroughMongos();
+ assert.commandWorked(testDB.adminCommand({
+ commitTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false
+ }));
+ assert.commandWorked(runCoordinateCommit(txnNumber, participantList));
+
+ jsTest.log(
+ "coordinateCommit sent for lower transaction number than last number participant saw.");
+ assert.commandFailedWithCode(runCoordinateCommit(txnNumber - 1, participantList),
+ ErrorCodes.TransactionTooOld);
+
+ // Can still recover decision for current transaction number.
+ assert.commandWorked(runCoordinateCommit(txnNumber, participantList));
+
+ jsTest.log("coordinateCommit sent after session is reaped.");
+ assert.writeOK(
+ coordinatorConn.getDB("config").transactions.remove({}, false /* justOne */));
+ assert.commandFailedWithCode(runCoordinateCommit(txnNumber, participantList),
+ ErrorCodes.NoSuchTransaction);
+
+ jsTest.log(
+ "coordinateCommit sent for higher transaction number than participant has seen.");
+ ++txnNumber;
+ startNewTransactionThroughMongos();
+ assert.commandFailedWithCode(runCoordinateCommit(txnNumber + 1, participantList),
+ ErrorCodes.NoSuchTransaction);
+
+ // Expedite aborting the transaction on the non-coordinator shard.
+ // The previous transaction should abort because sending coordinateCommit with a higher
+ // transaction number against the coordinator should have caused the local participant on
+ // the coordinator to abort.
+ assert.commandFailedWithCode(testDB.adminCommand({
+ commitTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false
+ }),
+ ErrorCodes.NoSuchTransaction);
+ };
+
+ // Test with a real participant list, to simulate retrying through main router.
+ let lsid = {id: UUID()};
+ let txnNumber = 0;
+ runTest([{shardId: st.shard0.shardName}, {shardId: st.shard1.shardName}]);
+
+ // Test with an empty participant list, to simulate using a recovery router.
+ lsid = {id: UUID()};
+ txnNumber = 0;
+ runTest([]);
+
+ st.stop();
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index ec14f2c6113..977c85720c2 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -687,6 +687,7 @@ env.Library(
'transaction_history_iterator.cpp',
'transaction_metrics_observer.cpp',
'transaction_participant.cpp',
+ 's/recover_transaction_decision_from_local_participant.cpp',
env.Idlc('session_txn_record.idl')[0],
env.Idlc('transaction_commit_decision.idl')[0],
env.Idlc('transactions_stats.idl')[0],
diff --git a/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp b/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp
new file mode 100644
index 00000000000..35f90c58c51
--- /dev/null
+++ b/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp
@@ -0,0 +1,103 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/s/recover_transaction_decision_from_local_participant.h"
+
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/transaction_participant.h"
+
+namespace mongo {
+
+void recoverDecisionFromLocalParticipantOrAbortLocalParticipant(OperationContext* opCtx) {
+ ON_BLOCK_EXIT([opCtx] {
+ // Ensure waiting for the user-supplied writeConcern of the decision.
+ repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
+ });
+
+ MongoDOperationContextSession checkOutSession(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+
+ try {
+ if (txnParticipant->getActiveTxnNumber() < *opCtx->getTxnNumber()) {
+ // TODO (SERVER-38133): Remove the if-block and just execute the else-block.
+ // Until SERVER-38133, calling beginOrContinue with startTransaction=boost::none with a
+ // higher txnNumber than the local participant's txnNumber will return NoSuchTransaction
+ // *without* marking the transaction at the higher txnNumber as aborted. As a stop-gap,
+ // if the request's txnNumber is higher than the local participant's txnNumber,
+ // explicitly start a transaction at the higher txnNumber here by calling
+ // beginOrContinue with startTransaction=true; the higher transaction will be aborted
+ // below.
+ // After SERVER-38133, the new transaction will be started *and* aborted by calling
+ // beginOrContinue with startTransaction=boost::none.
+ txnParticipant->beginOrContinue(
+ *opCtx->getTxnNumber(), false /* autocommit */, true /* startTransaction */);
+ } else {
+ // If the local participant's transaction number is *equal* to this request's and
+ // corresponds to a *retryable write*, throws NoSuchTransaction, else is a no-op.
+ // If the local participant's transaction number is *higher* than this request's, throws
+ // TransactionTooOld.
+ txnParticipant->beginOrContinue(
+ *opCtx->getTxnNumber(), false /* autocommit */, boost::none /* startTransaction */);
+ }
+
+ // The local participant's txnNumber matched the request's txnNumber, and the txnNumber
+ // corresponds to a transaction (not a retryable write).
+
+ if (txnParticipant->transactionIsCommitted()) {
+ return;
+ }
+
+ txnParticipant->unstashTransactionResources(opCtx, "coordinateCommitTransaction");
+ } catch (const DBException& e) {
+ // Convert a PreparedTransactionInProgress error to an anonymous error code.
+ uassert(51021,
+ "coordinateCommitTransaction command found local participant is prepared but no "
+ "active coordinator exists",
+ e.code() != ErrorCodes::PreparedTransactionInProgress);
+ throw;
+ }
+
+ // Abort the transaction. Since there was no active coordinator for this transaction on this
+ // node, either the coordinator has already timed out waiting for the participant list and
+ // the local participant's transaction will time out and abort anyway, or another node in
+ // this replica set has stepped up and received all the transaction statements and may
+ // commit. It's safe to abort the transaction even if the latter is the case, because this
+ // command will fail waiting for majority writeConcern.
+ txnParticipant->abortActiveTransaction(opCtx);
+ uassert(ErrorCodes::NoSuchTransaction,
+ "Transaction was aborted",
+ txnParticipant->transactionIsCommitted());
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/s/recover_transaction_decision_from_local_participant.h b/src/mongo/db/s/recover_transaction_decision_from_local_participant.h
new file mode 100644
index 00000000000..4962e4150a0
--- /dev/null
+++ b/src/mongo/db/s/recover_transaction_decision_from_local_participant.h
@@ -0,0 +1,57 @@
+
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+namespace mongo {
+
+class OperationContext;
+
+/**
+ * Examines the local participant's decision for the transaction number on the OperationContext and
+ * returns without throwing if the local participant's decision was commit. Otherwise, throws one of
+ * the following errors:
+ *
+ * - If the local participant has a higher transaction number, throws TransactionTooOld.
+ * - If the local participant is in prepare, throws an anonymous error code, because either the
+ * request to recover the decision was a delayed message or a byzantine message.
+ * - If the local participant has a lower transaction number, starts a transaction at the
+ * transaction number on the OperationContext, aborts it, and throws NoSuchTransaction.
+ * - If the local participant has the same transaction number and:
+ * -- the transaction number corresponds to a retryable write, throws NoSuchTransaction
+ * -- is already aborted, throws NoSuchTransaction
+ * -- is in progress, aborts the transaction and throws NoSuchTransaction
+ *
+ * Sets the Client last OpTime to the system last OpTime to ensure the caller waits for writeConcern
+ * of the decision.
+ */
+void recoverDecisionFromLocalParticipantOrAbortLocalParticipant(OperationContext* opCtx);
+
+} // namespace mongo
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index 270d3a31cf8..38a1b351a7b 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/commands.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/recover_transaction_decision_from_local_participant.h"
#include "mongo/db/s/sharding_state.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_coordinator_service.h"
@@ -168,7 +169,6 @@ public:
}
} prepareTransactionCmd;
-// TODO (SERVER-37440): Make coordinateCommit idempotent.
class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTransactionCmd> {
public:
using Request = CoordinateCommitTransaction;
@@ -190,39 +190,67 @@ public:
const auto& cmd = request();
- // Convert the participant list array into a set, and assert that all participants in
- // the list are unique.
- // TODO (PM-564): Propagate the 'readOnly' flag down into the TransactionCoordinator.
- std::set<ShardId> participantList;
- StringBuilder ss;
- ss << "[";
- for (const auto& participant : cmd.getParticipants()) {
- const auto shardId = participant.getShardId();
- uassert(ErrorCodes::InvalidOptions,
- str::stream() << "participant list contained duplicate shardId " << shardId,
- std::find(participantList.begin(), participantList.end(), shardId) ==
- participantList.end());
- participantList.insert(shardId);
- ss << shardId << " ";
+ boost::optional<Future<TransactionCoordinator::CommitDecision>> commitDecisionFuture;
+
+ if (!cmd.getParticipants().empty()) {
+ // Convert the participant list array into a set, and assert that all participants
+ // in the list are unique.
+ // TODO (PM-564): Propagate the 'readOnly' flag down into the
+ // TransactionCoordinator.
+ std::set<ShardId> participantList;
+ StringBuilder ss;
+ ss << "[";
+ for (const auto& participant : cmd.getParticipants()) {
+ const auto shardId = participant.getShardId();
+ uassert(ErrorCodes::InvalidOptions,
+ str::stream() << "participant list contained duplicate shardId "
+ << shardId,
+ std::find(participantList.begin(), participantList.end(), shardId) ==
+ participantList.end());
+ participantList.insert(shardId);
+ ss << shardId << " ";
+ }
+ ss << "]";
+ LOG(3) << "Coordinator shard received request to coordinate commit with "
+ "participant list "
+ << ss.str() << " for transaction " << opCtx->getTxnNumber() << " on session "
+ << opCtx->getLogicalSessionId()->toBSON();
+
+ commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->coordinateCommit(
+ opCtx,
+ opCtx->getLogicalSessionId().get(),
+ opCtx->getTxnNumber().get(),
+ participantList);
+ } else {
+ LOG(3) << "Coordinator shard received request to recover commit decision for "
+ "transaction "
+ << opCtx->getTxnNumber() << " on session "
+ << opCtx->getLogicalSessionId()->toBSON();
+
+ commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->recoverCommit(
+ opCtx, opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get());
}
- ss << "]";
- LOG(3) << "Coordinator shard received participant list with shards " << ss.str()
- << " for transaction " << opCtx->getTxnNumber() << " on session "
- << opCtx->getLogicalSessionId()->toBSON();
- auto commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->coordinateCommit(
- opCtx,
- opCtx->getLogicalSessionId().get(),
- opCtx->getTxnNumber().get(),
- participantList);
+ if (commitDecisionFuture) {
+ // The commit coordination is still ongoing. Block waiting for the decision.
+ auto commitDecision = commitDecisionFuture->get(opCtx);
+ switch (commitDecision) {
+ case TransactionCoordinator::CommitDecision::kAbort:
+ uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted");
+ case TransactionCoordinator::CommitDecision::kCommit:
+ return;
+ }
+ }
- // Block waiting for the commit decision.
- auto commitDecision = commitDecisionFuture.get(opCtx);
+ // No coordinator was found in memory. Either the commit coordination already completed,
+ // the original primary on which the coordinator was created stepped down, or this
+ // coordinateCommit request was a byzantine message.
- // If the decision was abort, propagate NoSuchTransaction exception back to mongos.
- uassert(ErrorCodes::NoSuchTransaction,
- "Transaction was aborted",
- commitDecision != TransactionCoordinator::CommitDecision::kAbort);
+ LOG(3) << "Coordinator shard going to attempt to recover decision from local "
+ "participant for transaction "
+ << opCtx->getTxnNumber() << " on session "
+ << opCtx->getLogicalSessionId()->toBSON();
+ recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx);
}
private:
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index 9b1e8b8f14a..77e388cb9a6 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -181,21 +181,15 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
// TODO (SERVER-37024): Schedule abort task on executor to execute at commitDeadline.
}
-Future<TransactionCoordinator::CommitDecision> TransactionCoordinatorService::coordinateCommit(
- OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const std::set<ShardId>& participantList) {
+boost::optional<Future<TransactionCoordinator::CommitDecision>>
+TransactionCoordinatorService::coordinateCommit(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::set<ShardId>& participantList) {
auto coordinator = _coordinatorCatalog->get(lsid, txnNumber);
if (!coordinator) {
- // TODO (SERVER-37440): Return decision "kForgotten", which indicates that a decision was
- // already made and forgotten. The caller can recover the decision from the local
- // participant if a higher transaction has not been started on the session and the session
- // has not been reaped.
- // Currently is MONGO_UNREACHABLE because no tests should cause the router to re-send
- // coordinateCommitTransaction.
- MONGO_UNREACHABLE;
+ return boost::none;
}
Action initialAction = coordinator->recvCoordinateCommit(participantList);
@@ -203,7 +197,32 @@ Future<TransactionCoordinator::CommitDecision> TransactionCoordinatorService::co
launchCoordinateCommitTask(_threadPool, coordinator, lsid, txnNumber, initialAction);
}
- return coordinator.get()->waitForCompletion().then([](auto finalState) {
+ return coordinator->waitForCompletion().then([](auto finalState) {
+ switch (finalState) {
+ case TransactionCoordinator::StateMachine::State::kAborted:
+ return TransactionCoordinator::CommitDecision::kAbort;
+ case TransactionCoordinator::StateMachine::State::kCommitted:
+ return TransactionCoordinator::CommitDecision::kCommit;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ });
+ // TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision
+ // is made durable. Currently the coordinator waits to hear acks because participants in prepare
+ // reject requests with a higher transaction number, causing tests to fail.
+ // return coordinator.get()->waitForDecision();
+}
+
+boost::optional<Future<TransactionCoordinator::CommitDecision>>
+TransactionCoordinatorService::recoverCommit(OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber) {
+ auto coordinator = _coordinatorCatalog->get(lsid, txnNumber);
+ if (!coordinator) {
+ return boost::none;
+ }
+
+ return coordinator->waitForCompletion().then([](auto finalState) {
switch (finalState) {
case TransactionCoordinator::StateMachine::State::kAborted:
return TransactionCoordinator::CommitDecision::kAbort;
diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h
index c7c4ab77e46..9cc07b784f4 100644
--- a/src/mongo/db/transaction_coordinator_service.h
+++ b/src/mongo/db/transaction_coordinator_service.h
@@ -74,16 +74,28 @@ public:
Date_t commitDeadline);
/**
- * Delivers coordinateCommit to the TransactionCoordinator, asynchronously sends commit or
- * abort to participants if necessary, and returns a Future that will contain the commit
- * decision when the transaction finishes committing or aborting.
+ * If a coordinator for the (lsid, txnNumber) exists, delivers the participant list to the
+ * coordinator, which will cause the coordinator to start coordinating the commit if the
+ * coordinator had not yet received a list, and returns a Future that will contain the decision
+ * when the transaction finishes committing or aborting.
+ *
+ * If no coordinator for the (lsid, txnNumber) exists, returns boost::none.
*/
- Future<TransactionCoordinator::CommitDecision> coordinateCommit(
+ boost::optional<Future<TransactionCoordinator::CommitDecision>> coordinateCommit(
OperationContext* opCtx,
LogicalSessionId lsid,
TxnNumber txnNumber,
const std::set<ShardId>& participantList);
+ /**
+ * If a coordinator for the (lsid, txnNumber) exists, returns a Future that will contain the
+ * decision when the transaction finishes committing or aborting.
+ *
+ * If no coordinator for the (lsid, txnNumber) exists, returns boost::none.
+ */
+ boost::optional<Future<TransactionCoordinator::CommitDecision>> recoverCommit(
+ OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber);
+
private:
std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog;
diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp
index d95e3197be5..f940f8a0ef9 100644
--- a/src/mongo/db/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/transaction_coordinator_service_test.cpp
@@ -205,7 +205,7 @@ public:
const LogicalSessionId& lsid,
const TxnNumber& txnNumber,
const std::set<ShardId>& transactionParticipantShards) {
- auto commitDecisionFuture = coordinatorService.coordinateCommit(
+ auto commitDecisionFuture = *coordinatorService.coordinateCommit(
operationContext(), lsid, txnNumber, transactionParticipantShards);
for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
@@ -230,7 +230,7 @@ public:
const std::set<ShardId>& shardIdSet,
const ShardId& abortingShard) {
auto commitDecisionFuture =
- coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet);
+ *coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet);
for (size_t i = 0; i < shardIdSet.size(); ++i) {
assertPrepareSentAndRespondWithNoSuchTransaction();
@@ -347,7 +347,7 @@ TEST_F(TransactionCoordinatorServiceTest,
// Progress the transaction up until the point where it has sent commit and is waiting for
// commit acks.
- auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit(
+ auto oldTxnCommitDecisionFuture = *coordinatorService.coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
// Simulate all participants acking prepare/voting to commit.
@@ -375,7 +375,108 @@ TEST_F(TransactionCoordinatorServiceTest,
assertPrepareSentAndRespondWithSuccess();
assertCommitSentAndRespondWithSuccess();
assertCommitSentAndRespondWithSuccess();
- // commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet);
+}
+
+TEST_F(TransactionCoordinatorServiceTest, CoordinateCommitReturnsNoneIfNoCoordinatorEverExisted) {
+ TransactionCoordinatorService coordinatorService;
+ auto commitDecisionFuture = coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+ ASSERT(boost::none == commitDecisionFuture);
+}
+
+TEST_F(TransactionCoordinatorServiceTest, CoordinateCommitReturnsNoneIfCoordinatorWasRemoved) {
+ TransactionCoordinatorService coordinatorService;
+
+ coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
+ commitTransaction(coordinatorService, lsid(), txnNumber(), kTwoShardIdSet);
+
+ auto commitDecisionFuture = coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+ ASSERT(boost::none == commitDecisionFuture);
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ CoordinateCommitWithSameParticipantListJoinsOngoingCoordinationThatLeadsToAbort) {
+ TransactionCoordinatorService coordinatorService;
+
+ coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
+
+ auto commitDecisionFuture1 = *coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ auto commitDecisionFuture2 = *coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
+}
+
+TEST_F(TransactionCoordinatorServiceTest,
+ CoordinateCommitWithSameParticipantListJoinsOngoingCoordinationThatLeadsToCommit) {
+ TransactionCoordinatorService coordinatorService;
+
+ coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
+
+ auto commitDecisionFuture1 = *coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithSuccess();
+
+ auto commitDecisionFuture2 = *coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
+}
+
+TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationThatLeadsToAbort) {
+ TransactionCoordinatorService coordinatorService;
+
+ coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
+
+ auto commitDecisionFuture1 = *coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+
+ auto commitDecisionFuture2 =
+ *coordinatorService.recoverCommit(operationContext(), lsid(), txnNumber());
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertAbortSentAndRespondWithSuccess();
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
+}
+
+TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationThatLeadsToCommit) {
+ TransactionCoordinatorService coordinatorService;
+
+ coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
+
+ auto commitDecisionFuture1 = *coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
+
+ assertPrepareSentAndRespondWithSuccess();
+
+ auto commitDecisionFuture2 =
+ *coordinatorService.recoverCommit(operationContext(), lsid(), txnNumber());
+
+ assertPrepareSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()),
+ static_cast<int>(commitDecisionFuture2.get()));
}
TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorToPrepare) {
@@ -383,7 +484,7 @@ TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorT
coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
// Coordinator sends prepare.
- auto commitDecisionFuture = coordinatorService.coordinateCommit(
+ auto commitDecisionFuture = *coordinatorService.coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
// One participant responds with writeConcern error.
@@ -414,7 +515,7 @@ TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorT
coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
// Coordinator sends prepare.
- auto commitDecisionFuture = coordinatorService.coordinateCommit(
+ auto commitDecisionFuture = *coordinatorService.coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
// One participant votes to abort.
@@ -441,7 +542,7 @@ TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorT
coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
// Coordinator sends prepare.
- auto commitDecisionFuture = coordinatorService.coordinateCommit(
+ auto commitDecisionFuture = *coordinatorService.coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
// Both participants vote to commit.
@@ -468,7 +569,7 @@ TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorT
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
CoordinateCommitReturnsCorrectCommitDecisionOnAbort) {
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
+ auto commitDecisionFuture = *coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
// Simulate a participant voting to abort.
@@ -486,7 +587,7 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
CoordinateCommitWithNoVotesReturnsNotReadyFuture) {
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
+ auto commitDecisionFuture = *coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
ASSERT_FALSE(commitDecisionFuture.isReady());
@@ -498,7 +599,7 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
CoordinateCommitReturnsCorrectCommitDecisionOnCommit) {
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
+ auto commitDecisionFuture = *coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
assertPrepareSentAndRespondWithSuccess();
@@ -511,24 +612,12 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
static_cast<int>(TransactionCoordinator::CommitDecision::kCommit));
}
-TEST_F(TransactionCoordinatorServiceTest,
- CoordinateCommitRecoversCorrectCommitDecisionForTransactionThatAlreadyCommitted) {
- // TODO (SERVER-37440): Implement test when coordinateCommit is made to work correctly on
- // retries.
-}
-
-TEST_F(TransactionCoordinatorServiceTest,
- CoordinateCommitRecoversCorrectCommitDecisionForTransactionThatAlreadyAborted) {
- // TODO (SERVER-37440): Implement test when coordinateCommit is made to work correctly on
- // retries.
-}
-
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
ConcurrentCallsToCoordinateCommitReturnSameDecisionOnCommit) {
- auto commitDecisionFuture1 = coordinatorService()->coordinateCommit(
+ auto commitDecisionFuture1 = *coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- auto commitDecisionFuture2 = coordinatorService()->coordinateCommit(
+ auto commitDecisionFuture2 = *coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
commitTransaction(*coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet);
@@ -540,9 +629,9 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
ConcurrentCallsToCoordinateCommitReturnSameDecisionOnAbort) {
- auto commitDecisionFuture1 = coordinatorService()->coordinateCommit(
+ auto commitDecisionFuture1 = *coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- auto commitDecisionFuture2 = coordinatorService()->coordinateCommit(
+ auto commitDecisionFuture2 = *coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
abortTransaction(
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index be01a7d2db2..b873ecb43d7 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/s/recover_transaction_decision_from_local_participant.h"
#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog_mongod.h"
@@ -3814,6 +3815,385 @@ TEST_F(TxnParticipantTest, GetOldestNonMajorityCommittedOpTimeReturnsOldestEntry
ASSERT_EQ(nonMajorityCommittedOpTime, laterOpTime);
}
+class RecoverDecisionFromLocalParticipantTest : public TxnParticipantTest {
+private:
+ void _putParticipantInProgressWithoutSessionCheckout() {
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->unstashTransactionResources(opCtx(), "insert");
+ ASSERT(txnParticipant->inMultiDocumentTransaction());
+ txnParticipant->stashTransactionResources(opCtx());
+ }
+
+ Timestamp _putParticipantInPrepareWithoutSessionCheckout() {
+ _putParticipantInProgressWithoutSessionCheckout();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
+ const auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
+ ASSERT(txnParticipant->transactionIsPrepared());
+ txnParticipant->stashTransactionResources(opCtx());
+ return prepareTimestamp;
+ }
+
+protected:
+ void putParticipantInProgress() {
+ auto sessionCheckout = checkOutSession();
+ _putParticipantInProgressWithoutSessionCheckout();
+ }
+
+ void putParticipantInAbortedWithoutPrepare() {
+ auto sessionCheckout = checkOutSession();
+
+ _putParticipantInProgressWithoutSessionCheckout();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
+ txnParticipant->abortActiveTransaction(opCtx());
+ ASSERT(txnParticipant->transactionIsAborted());
+ // No need to stash transaction resources after abort.
+ }
+
+ void putParticipantInCommittedWithoutPrepare() {
+ auto sessionCheckout = checkOutSession();
+
+ _putParticipantInProgressWithoutSessionCheckout();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
+ txnParticipant->commitUnpreparedTransaction(opCtx());
+ ASSERT(txnParticipant->transactionIsCommitted());
+ // No need to stash transaction resources after commit.
+ }
+
+ Timestamp putParticipantInPrepare() {
+ auto sessionCheckout = checkOutSession();
+ return _putParticipantInPrepareWithoutSessionCheckout();
+ }
+
+ void putParticipantInAbortedAfterPrepare() {
+ auto sessionCheckout = checkOutSession();
+
+ _putParticipantInPrepareWithoutSessionCheckout();
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
+ txnParticipant->abortActiveTransaction(opCtx());
+ ASSERT(txnParticipant->transactionIsAborted());
+ // No need to stash transaction resources after abort.
+ }
+
+ void putParticipantInCommittedAfterPrepare() {
+ auto sessionCheckout = checkOutSession();
+
+ const auto prepareTimestamp = _putParticipantInPrepareWithoutSessionCheckout();
+ const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
+ txnParticipant->commitPreparedTransaction(opCtx(), commitTS);
+ ASSERT_TRUE(txnParticipant->transactionIsCommitted());
+ // No need to stash transaction resources after commit.
+ }
+
+ void putParticipantInRetryableWrite() {
+ MongoDOperationContextSession checkOutSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), boost::none, boost::none);
+ ASSERT(!txnParticipant->inMultiDocumentTransaction() &&
+ !txnParticipant->transactionIsCommitted() &&
+ !txnParticipant->transactionIsAborted() && !txnParticipant->transactionIsPrepared());
+ }
+
+ void assertParticipantIsInProgressWithTxnNumber(const TxnNumber expectedTxnNumber) {
+ MongoDOperationContextSession checkOutSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber());
+ ASSERT(txnParticipant->inMultiDocumentTransaction() &&
+ !txnParticipant->transactionIsPrepared());
+ }
+
+ void assertParticipantIsPreparedWithTxnNumber(const TxnNumber expectedTxnNumber) {
+ MongoDOperationContextSession checkOutSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber());
+ ASSERT(txnParticipant->transactionIsPrepared());
+ }
+
+ void assertParticipantIsAbortedWithTxnNumber(const TxnNumber expectedTxnNumber) {
+ MongoDOperationContextSession checkOutSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber());
+ ASSERT(txnParticipant->transactionIsAborted());
+ }
+
+ void assertParticipantIsCommittedWithTxnNumber(const TxnNumber expectedTxnNumber) {
+ MongoDOperationContextSession checkOutSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber());
+ ASSERT(txnParticipant->transactionIsCommitted());
+ }
+
+ void assertParticipantIsInRetryableWriteWithTxnNumber(const TxnNumber expectedTxnNumber) {
+ MongoDOperationContextSession checkOutSession(opCtx());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+ ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber());
+ ASSERT(!txnParticipant->inMultiDocumentTransaction() &&
+ !txnParticipant->transactionIsCommitted() &&
+ !txnParticipant->transactionIsAborted() && !txnParticipant->transactionIsPrepared());
+ }
+
+ void assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(
+ const LogicalSessionId& lsid, const TxnNumber txnNumber) {
+ auto recoverDecisionThrowsTransactionTooOldFunc = [&](OperationContext* newOpCtx) {
+ newOpCtx->setLogicalSessionId(lsid);
+ newOpCtx->setTxnNumber(txnNumber);
+
+ ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(newOpCtx),
+ AssertionException,
+ ErrorCodes::TransactionTooOld);
+ };
+ runFunctionFromDifferentOpCtx(recoverDecisionThrowsTransactionTooOldFunc);
+ }
+
+ void assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(
+ const LogicalSessionId& lsid, const TxnNumber txnNumber) {
+ auto recoverDecisionThrowsNoSuchTransactionFunc = [&](OperationContext* newOpCtx) {
+ newOpCtx->setLogicalSessionId(lsid);
+ newOpCtx->setTxnNumber(txnNumber);
+
+ ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(newOpCtx),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+ };
+ runFunctionFromDifferentOpCtx(recoverDecisionThrowsNoSuchTransactionFunc);
+ }
+};
+
+//
+// Local TransactionParticipant has *same* TxnNumber as recoverCommit request.
+//
+
+TEST_F(
+ RecoverDecisionFromLocalParticipantTest,
+ AbortsActiveTransactionAndThrowsNoSuchTransactionIfParticipantIsInProgressWithSameTxnNumber) {
+ putParticipantInProgress();
+ ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+ assertParticipantIsAbortedWithTxnNumber(*opCtx()->getTxnNumber());
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ ThrowsNoSuchTransactionIfParticipantIsAbortedWithoutPrepareWithSameTxnNumber) {
+ putParticipantInAbortedWithoutPrepare();
+ ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+ assertParticipantIsAbortedWithTxnNumber(*opCtx()->getTxnNumber());
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ DoesNotThrowIfParticipantIsCommittedWithoutPrepareWithSameTxnNumber) {
+ putParticipantInCommittedWithoutPrepare();
+ recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx());
+ assertParticipantIsCommittedWithTxnNumber(*opCtx()->getTxnNumber());
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ ThrowsAnonymousErrorIfParticipantIsPreparedWithSameTxnNumber) {
+ putParticipantInPrepare();
+ ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()),
+ AssertionException,
+ 51021);
+ assertParticipantIsPreparedWithTxnNumber(*opCtx()->getTxnNumber());
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ ThrowsNoSuchTransactionIfParticipantIsAbortedAfterPrepareWithSameTxnNumber) {
+ putParticipantInAbortedAfterPrepare();
+ ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+ assertParticipantIsAbortedWithTxnNumber(*opCtx()->getTxnNumber());
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ DoesNotThrowIfParticipantIsCommittedAfterPrepareWithSameTxnNumber) {
+ putParticipantInCommittedAfterPrepare();
+ recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx());
+ assertParticipantIsCommittedWithTxnNumber(*opCtx()->getTxnNumber());
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ ThrowsNoSuchTransactionIfTxnNumberCorrespondsToRetryableWrite) {
+ putParticipantInRetryableWrite();
+ ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+ assertParticipantIsInRetryableWriteWithTxnNumber(*opCtx()->getTxnNumber());
+}
+
+//
+// Local TransactionParticipant has *higher* TxnNumber than recoverCommit request.
+//
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ ThrowsTransactionTooOldIfTransactionParticipantInProgressHasHigherTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto oldTxnNumber = participantTxnNumber - 1;
+
+ putParticipantInProgress();
+ assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber);
+ assertParticipantIsInProgressWithTxnNumber(participantTxnNumber);
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ ThrowsTransactionTooOldIfTransactionParticipantInPrepareHasHigherTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto oldTxnNumber = participantTxnNumber - 1;
+
+ putParticipantInPrepare();
+ assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber);
+ assertParticipantIsPreparedWithTxnNumber(participantTxnNumber);
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ ThrowsTransactionTooOldIfTransactionParticipantInAbortedHasHigherTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto oldTxnNumber = participantTxnNumber - 1;
+
+ putParticipantInAbortedAfterPrepare();
+ assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber);
+ assertParticipantIsAbortedWithTxnNumber(participantTxnNumber);
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ ThrowsTransactionTooOldIfTransactionParticipantInCommittedHasHigherTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto oldTxnNumber = participantTxnNumber - 1;
+
+ putParticipantInCommittedAfterPrepare();
+ assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber);
+ assertParticipantIsCommittedWithTxnNumber(participantTxnNumber);
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ ThrowsTransactionTooOldIfTransactionParticipantInRetryableWriteHasHigherTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto oldTxnNumber = participantTxnNumber - 1;
+
+ putParticipantInRetryableWrite();
+ assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber);
+ assertParticipantIsInRetryableWriteWithTxnNumber(participantTxnNumber);
+}
+
+//
+// Local TransactionParticipant has *lower* TxnNumber than recoverCommit request.
+//
+
+TEST_F(
+ RecoverDecisionFromLocalParticipantTest,
+ AbortsOlderAndNewerTransactionAndThrowsNoSuchTransactionIfParticipantIsInProgressWithLowerTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto newTxnNumber = participantTxnNumber + 1;
+
+ putParticipantInProgress();
+ assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber);
+ assertParticipantIsAbortedWithTxnNumber(newTxnNumber);
+}
+
+TEST_F(RecoverDecisionFromLocalParticipantTest, ThrowsIfParticipantIsInPrepareWithLowerTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto newTxnNumber = participantTxnNumber + 1;
+
+ putParticipantInPrepare();
+ auto recoverDecisionThrowsAnonymousErrorFunc = [&](OperationContext* newOpCtx) {
+ newOpCtx->setLogicalSessionId(lsid);
+ newOpCtx->setTxnNumber(newTxnNumber);
+
+ ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(newOpCtx),
+ AssertionException,
+ 51021);
+ };
+ runFunctionFromDifferentOpCtx(recoverDecisionThrowsAnonymousErrorFunc);
+ assertParticipantIsPreparedWithTxnNumber(participantTxnNumber);
+}
+
+TEST_F(
+ RecoverDecisionFromLocalParticipantTest,
+ StartsAndAbortsNewerTransactionAndThrowsNoSuchTransactionIfParticipantIsAbortedWithLowerTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto newTxnNumber = participantTxnNumber + 1;
+
+ putParticipantInAbortedAfterPrepare();
+ assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber);
+ assertParticipantIsAbortedWithTxnNumber(newTxnNumber);
+}
+
+TEST_F(
+ RecoverDecisionFromLocalParticipantTest,
+ StartsAndAbortsNewerTransactionAndThrowsNoSuchTransactionIfParticipantIsCommittedWithLowerTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto newTxnNumber = participantTxnNumber + 1;
+
+ putParticipantInCommittedAfterPrepare();
+ assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber);
+ assertParticipantIsAbortedWithTxnNumber(newTxnNumber);
+}
+
+TEST_F(
+ RecoverDecisionFromLocalParticipantTest,
+ StartsAndAbortsNewerTransactionAndThrowsNoSuchTransactionIfParticipantIsInRetryableWriteWithLowerTxnNumber) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto newTxnNumber = participantTxnNumber + 1;
+
+ putParticipantInRetryableWrite();
+ assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber);
+ assertParticipantIsAbortedWithTxnNumber(newTxnNumber);
+}
+
+//
+// Recovering the decision is retryable.
+//
+
+TEST_F(RecoverDecisionFromLocalParticipantTest,
+ RecoverDecisionCanBeRetriedIfFirstTryThrowsLockTimeout) {
+ const auto lsid = *opCtx()->getLogicalSessionId();
+ const auto participantTxnNumber = *opCtx()->getTxnNumber();
+ const auto newTxnNumber = participantTxnNumber + 1;
+
+ putParticipantInCommittedAfterPrepare();
+
+ // First recoverDecision attempt throws LockTimeout.
+ {
+ Lock::GlobalLock lk(opCtx(), MODE_X);
+ auto recoverDecisionThrowsLockTimeoutFunc = [&](OperationContext* newOpCtx) {
+ newOpCtx->setLogicalSessionId(lsid);
+ newOpCtx->setTxnNumber(newTxnNumber);
+
+ ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(newOpCtx),
+ AssertionException,
+ ErrorCodes::LockTimeout);
+ };
+ runFunctionFromDifferentOpCtx(recoverDecisionThrowsLockTimeoutFunc);
+ }
+ assertParticipantIsInProgressWithTxnNumber(newTxnNumber);
+
+ // Retry recoverDecision.
+ assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber);
+
+ assertParticipantIsAbortedWithTxnNumber(newTxnNumber);
+}
} // namespace
} // namespace mongo