diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2018-11-15 21:07:42 -0500 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2018-11-29 17:29:07 -0500 |
commit | 179748103a198727e76655932d317b27af758af7 (patch) | |
tree | 77fdd8a4328d6503f8065725f6d447b74cb660e2 /src | |
parent | e54d65fa8e444dcfd5bba66f1f4c40203e5ebe16 (diff) | |
download | mongo-179748103a198727e76655932d317b27af758af7.tar.gz |
SERVER-37440 coordinateCommit should fall back to recovering decision from local participant
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp | 103 | ||||
-rw-r--r-- | src/mongo/db/s/recover_transaction_decision_from_local_participant.h | 57 | ||||
-rw-r--r-- | src/mongo/db/s/txn_two_phase_commit_cmds.cpp | 88 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.cpp | 45 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.h | 20 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service_test.cpp | 141 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 380 |
8 files changed, 762 insertions, 73 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index ec14f2c6113..977c85720c2 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -687,6 +687,7 @@ env.Library( 'transaction_history_iterator.cpp', 'transaction_metrics_observer.cpp', 'transaction_participant.cpp', + 's/recover_transaction_decision_from_local_participant.cpp', env.Idlc('session_txn_record.idl')[0], env.Idlc('transaction_commit_decision.idl')[0], env.Idlc('transactions_stats.idl')[0], diff --git a/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp b/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp new file mode 100644 index 00000000000..35f90c58c51 --- /dev/null +++ b/src/mongo/db/s/recover_transaction_decision_from_local_participant.cpp @@ -0,0 +1,103 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/s/recover_transaction_decision_from_local_participant.h" + +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/transaction_participant.h" + +namespace mongo { + +void recoverDecisionFromLocalParticipantOrAbortLocalParticipant(OperationContext* opCtx) { + ON_BLOCK_EXIT([opCtx] { + // Ensure waiting for the user-supplied writeConcern of the decision. + repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); + }); + + MongoDOperationContextSession checkOutSession(opCtx); + + auto txnParticipant = TransactionParticipant::get(opCtx); + + try { + if (txnParticipant->getActiveTxnNumber() < *opCtx->getTxnNumber()) { + // TODO (SERVER-38133): Remove the if-block and just execute the else-block. + // Until SERVER-38133, calling beginOrContinue with startTransaction=boost::none with a + // higher txnNumber than the local participant's txnNumber will return NoSuchTransaction + // *without* marking the transaction at the higher txnNumber as aborted. As a stop-gap, + // if the request's txnNumber is higher than the local participant's txnNumber, + // explicitly start a transaction at the higher txnNumber here by calling + // beginOrContinue with startTransaction=true; the higher transaction will be aborted + // below. + // After SERVER-38133, the new transaction will be started *and* aborted by calling + // beginOrContinue with startTransaction=boost::none. + txnParticipant->beginOrContinue( + *opCtx->getTxnNumber(), false /* autocommit */, true /* startTransaction */); + } else { + // If the local participant's transaction number is *equal* to this request's and + // corresponds to a *retryable write*, throws NoSuchTransaction, else is a no-op. + // If the local participant's transaction number is *higher* than this request's, throws + // TransactionTooOld. + txnParticipant->beginOrContinue( + *opCtx->getTxnNumber(), false /* autocommit */, boost::none /* startTransaction */); + } + + // The local participant's txnNumber matched the request's txnNumber, and the txnNumber + // corresponds to a transaction (not a retryable write). + + if (txnParticipant->transactionIsCommitted()) { + return; + } + + txnParticipant->unstashTransactionResources(opCtx, "coordinateCommitTransaction"); + } catch (const DBException& e) { + // Convert a PreparedTransactionInProgress error to an anonymous error code. + uassert(51021, + "coordinateCommitTransaction command found local participant is prepared but no " + "active coordinator exists", + e.code() != ErrorCodes::PreparedTransactionInProgress); + throw; + } + + // Abort the transaction. Since there was no active coordinator for this transaction on this + // node, either the coordinator has already timed out waiting for the participant list and + // the local participant's transaction will time out and abort anyway, or another node in + // this replica set has stepped up and received all the transaction statements and may + // commit. It's safe to abort the transaction even if the latter is the case, because this + // command will fail waiting for majority writeConcern. + txnParticipant->abortActiveTransaction(opCtx); + uassert(ErrorCodes::NoSuchTransaction, + "Transaction was aborted", + txnParticipant->transactionIsCommitted()); +} + +} // namespace mongo diff --git a/src/mongo/db/s/recover_transaction_decision_from_local_participant.h b/src/mongo/db/s/recover_transaction_decision_from_local_participant.h new file mode 100644 index 00000000000..4962e4150a0 --- /dev/null +++ b/src/mongo/db/s/recover_transaction_decision_from_local_participant.h @@ -0,0 +1,57 @@ + +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +namespace mongo { + +class OperationContext; + +/** + * Examines the local participant's decision for the transaction number on the OperationContext and + * returns without throwing if the local participant's decision was commit. Otherwise, throws one of + * the following errors: + * + * - If the local participant has a higher transaction number, throws TransactionTooOld. + * - If the local participant is in prepare, throws an anonymous error code, because either the + * request to recover the decision was a delayed message or a byzantine message. + * - If the local participant has a lower transaction number, starts a transaction at the + * transaction number on the OperationContext, aborts it, and throws NoSuchTransaction. + * - If the local participant has the same transaction number and: + * -- the transaction number corresponds to a retryable write, throws NoSuchTransaction + * -- is already aborted, throws NoSuchTransaction + * -- is in progress, aborts the transaction and throws NoSuchTransaction + * + * Sets the Client last OpTime to the system last OpTime to ensure the caller waits for writeConcern + * of the decision. + */ +void recoverDecisionFromLocalParticipantOrAbortLocalParticipant(OperationContext* opCtx); + +} // namespace mongo diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 270d3a31cf8..38a1b351a7b 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -36,6 +36,7 @@ #include "mongo/db/commands.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/recover_transaction_decision_from_local_participant.h" #include "mongo/db/s/sharding_state.h" #include "mongo/db/session_catalog_mongod.h" #include "mongo/db/transaction_coordinator_service.h" @@ -168,7 +169,6 @@ public: } } prepareTransactionCmd; -// TODO (SERVER-37440): Make coordinateCommit idempotent. class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTransactionCmd> { public: using Request = CoordinateCommitTransaction; @@ -190,39 +190,67 @@ public: const auto& cmd = request(); - // Convert the participant list array into a set, and assert that all participants in - // the list are unique. - // TODO (PM-564): Propagate the 'readOnly' flag down into the TransactionCoordinator. - std::set<ShardId> participantList; - StringBuilder ss; - ss << "["; - for (const auto& participant : cmd.getParticipants()) { - const auto shardId = participant.getShardId(); - uassert(ErrorCodes::InvalidOptions, - str::stream() << "participant list contained duplicate shardId " << shardId, - std::find(participantList.begin(), participantList.end(), shardId) == - participantList.end()); - participantList.insert(shardId); - ss << shardId << " "; + boost::optional<Future<TransactionCoordinator::CommitDecision>> commitDecisionFuture; + + if (!cmd.getParticipants().empty()) { + // Convert the participant list array into a set, and assert that all participants + // in the list are unique. + // TODO (PM-564): Propagate the 'readOnly' flag down into the + // TransactionCoordinator. + std::set<ShardId> participantList; + StringBuilder ss; + ss << "["; + for (const auto& participant : cmd.getParticipants()) { + const auto shardId = participant.getShardId(); + uassert(ErrorCodes::InvalidOptions, + str::stream() << "participant list contained duplicate shardId " + << shardId, + std::find(participantList.begin(), participantList.end(), shardId) == + participantList.end()); + participantList.insert(shardId); + ss << shardId << " "; + } + ss << "]"; + LOG(3) << "Coordinator shard received request to coordinate commit with " + "participant list " + << ss.str() << " for transaction " << opCtx->getTxnNumber() << " on session " + << opCtx->getLogicalSessionId()->toBSON(); + + commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->coordinateCommit( + opCtx, + opCtx->getLogicalSessionId().get(), + opCtx->getTxnNumber().get(), + participantList); + } else { + LOG(3) << "Coordinator shard received request to recover commit decision for " + "transaction " + << opCtx->getTxnNumber() << " on session " + << opCtx->getLogicalSessionId()->toBSON(); + + commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->recoverCommit( + opCtx, opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get()); } - ss << "]"; - LOG(3) << "Coordinator shard received participant list with shards " << ss.str() - << " for transaction " << opCtx->getTxnNumber() << " on session " - << opCtx->getLogicalSessionId()->toBSON(); - auto commitDecisionFuture = TransactionCoordinatorService::get(opCtx)->coordinateCommit( - opCtx, - opCtx->getLogicalSessionId().get(), - opCtx->getTxnNumber().get(), - participantList); + if (commitDecisionFuture) { + // The commit coordination is still ongoing. Block waiting for the decision. + auto commitDecision = commitDecisionFuture->get(opCtx); + switch (commitDecision) { + case TransactionCoordinator::CommitDecision::kAbort: + uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted"); + case TransactionCoordinator::CommitDecision::kCommit: + return; + } + } - // Block waiting for the commit decision. - auto commitDecision = commitDecisionFuture.get(opCtx); + // No coordinator was found in memory. Either the commit coordination already completed, + // the original primary on which the coordinator was created stepped down, or this + // coordinateCommit request was a byzantine message. - // If the decision was abort, propagate NoSuchTransaction exception back to mongos. - uassert(ErrorCodes::NoSuchTransaction, - "Transaction was aborted", - commitDecision != TransactionCoordinator::CommitDecision::kAbort); + LOG(3) << "Coordinator shard going to attempt to recover decision from local " + "participant for transaction " + << opCtx->getTxnNumber() << " on session " + << opCtx->getLogicalSessionId()->toBSON(); + recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx); } private: diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp index 9b1e8b8f14a..77e388cb9a6 100644 --- a/src/mongo/db/transaction_coordinator_service.cpp +++ b/src/mongo/db/transaction_coordinator_service.cpp @@ -181,21 +181,15 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, // TODO (SERVER-37024): Schedule abort task on executor to execute at commitDeadline. } -Future<TransactionCoordinator::CommitDecision> TransactionCoordinatorService::coordinateCommit( - OperationContext* opCtx, - LogicalSessionId lsid, - TxnNumber txnNumber, - const std::set<ShardId>& participantList) { +boost::optional<Future<TransactionCoordinator::CommitDecision>> +TransactionCoordinatorService::coordinateCommit(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::set<ShardId>& participantList) { auto coordinator = _coordinatorCatalog->get(lsid, txnNumber); if (!coordinator) { - // TODO (SERVER-37440): Return decision "kForgotten", which indicates that a decision was - // already made and forgotten. The caller can recover the decision from the local - // participant if a higher transaction has not been started on the session and the session - // has not been reaped. - // Currently is MONGO_UNREACHABLE because no tests should cause the router to re-send - // coordinateCommitTransaction. - MONGO_UNREACHABLE; + return boost::none; } Action initialAction = coordinator->recvCoordinateCommit(participantList); @@ -203,7 +197,32 @@ Future<TransactionCoordinator::CommitDecision> TransactionCoordinatorService::co launchCoordinateCommitTask(_threadPool, coordinator, lsid, txnNumber, initialAction); } - return coordinator.get()->waitForCompletion().then([](auto finalState) { + return coordinator->waitForCompletion().then([](auto finalState) { + switch (finalState) { + case TransactionCoordinator::StateMachine::State::kAborted: + return TransactionCoordinator::CommitDecision::kAbort; + case TransactionCoordinator::StateMachine::State::kCommitted: + return TransactionCoordinator::CommitDecision::kCommit; + default: + MONGO_UNREACHABLE; + } + }); + // TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision + // is made durable. Currently the coordinator waits to hear acks because participants in prepare + // reject requests with a higher transaction number, causing tests to fail. + // return coordinator.get()->waitForDecision(); +} + +boost::optional<Future<TransactionCoordinator::CommitDecision>> +TransactionCoordinatorService::recoverCommit(OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber) { + auto coordinator = _coordinatorCatalog->get(lsid, txnNumber); + if (!coordinator) { + return boost::none; + } + + return coordinator->waitForCompletion().then([](auto finalState) { switch (finalState) { case TransactionCoordinator::StateMachine::State::kAborted: return TransactionCoordinator::CommitDecision::kAbort; diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h index c7c4ab77e46..9cc07b784f4 100644 --- a/src/mongo/db/transaction_coordinator_service.h +++ b/src/mongo/db/transaction_coordinator_service.h @@ -74,16 +74,28 @@ public: Date_t commitDeadline); /** - * Delivers coordinateCommit to the TransactionCoordinator, asynchronously sends commit or - * abort to participants if necessary, and returns a Future that will contain the commit - * decision when the transaction finishes committing or aborting. + * If a coordinator for the (lsid, txnNumber) exists, delivers the participant list to the + * coordinator, which will cause the coordinator to start coordinating the commit if the + * coordinator had not yet received a list, and returns a Future that will contain the decision + * when the transaction finishes committing or aborting. + * + * If no coordinator for the (lsid, txnNumber) exists, returns boost::none. */ - Future<TransactionCoordinator::CommitDecision> coordinateCommit( + boost::optional<Future<TransactionCoordinator::CommitDecision>> coordinateCommit( OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber, const std::set<ShardId>& participantList); + /** + * If a coordinator for the (lsid, txnNumber) exists, returns a Future that will contain the + * decision when the transaction finishes committing or aborting. + * + * If no coordinator for the (lsid, txnNumber) exists, returns boost::none. + */ + boost::optional<Future<TransactionCoordinator::CommitDecision>> recoverCommit( + OperationContext* opCtx, LogicalSessionId lsid, TxnNumber txnNumber); + private: std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog; diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp index d95e3197be5..f940f8a0ef9 100644 --- a/src/mongo/db/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/transaction_coordinator_service_test.cpp @@ -205,7 +205,7 @@ public: const LogicalSessionId& lsid, const TxnNumber& txnNumber, const std::set<ShardId>& transactionParticipantShards) { - auto commitDecisionFuture = coordinatorService.coordinateCommit( + auto commitDecisionFuture = *coordinatorService.coordinateCommit( operationContext(), lsid, txnNumber, transactionParticipantShards); for (size_t i = 0; i < transactionParticipantShards.size(); ++i) { @@ -230,7 +230,7 @@ public: const std::set<ShardId>& shardIdSet, const ShardId& abortingShard) { auto commitDecisionFuture = - coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet); + *coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet); for (size_t i = 0; i < shardIdSet.size(); ++i) { assertPrepareSentAndRespondWithNoSuchTransaction(); @@ -347,7 +347,7 @@ TEST_F(TransactionCoordinatorServiceTest, // Progress the transaction up until the point where it has sent commit and is waiting for // commit acks. - auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit( + auto oldTxnCommitDecisionFuture = *coordinatorService.coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); // Simulate all participants acking prepare/voting to commit. @@ -375,7 +375,108 @@ TEST_F(TransactionCoordinatorServiceTest, assertPrepareSentAndRespondWithSuccess(); assertCommitSentAndRespondWithSuccess(); assertCommitSentAndRespondWithSuccess(); - // commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet); +} + +TEST_F(TransactionCoordinatorServiceTest, CoordinateCommitReturnsNoneIfNoCoordinatorEverExisted) { + TransactionCoordinatorService coordinatorService; + auto commitDecisionFuture = coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + ASSERT(boost::none == commitDecisionFuture); +} + +TEST_F(TransactionCoordinatorServiceTest, CoordinateCommitReturnsNoneIfCoordinatorWasRemoved) { + TransactionCoordinatorService coordinatorService; + + coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); + commitTransaction(coordinatorService, lsid(), txnNumber(), kTwoShardIdSet); + + auto commitDecisionFuture = coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + ASSERT(boost::none == commitDecisionFuture); +} + +TEST_F(TransactionCoordinatorServiceTest, + CoordinateCommitWithSameParticipantListJoinsOngoingCoordinationThatLeadsToAbort) { + TransactionCoordinatorService coordinatorService; + + coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); + + auto commitDecisionFuture1 = *coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + + assertPrepareSentAndRespondWithNoSuchTransaction(); + + auto commitDecisionFuture2 = *coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + + assertPrepareSentAndRespondWithSuccess(); + assertAbortSentAndRespondWithSuccess(); + + ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()), + static_cast<int>(commitDecisionFuture2.get())); +} + +TEST_F(TransactionCoordinatorServiceTest, + CoordinateCommitWithSameParticipantListJoinsOngoingCoordinationThatLeadsToCommit) { + TransactionCoordinatorService coordinatorService; + + coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); + + auto commitDecisionFuture1 = *coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + + assertPrepareSentAndRespondWithSuccess(); + + auto commitDecisionFuture2 = *coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + + assertPrepareSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + + ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()), + static_cast<int>(commitDecisionFuture2.get())); +} + +TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationThatLeadsToAbort) { + TransactionCoordinatorService coordinatorService; + + coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); + + auto commitDecisionFuture1 = *coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + + assertPrepareSentAndRespondWithNoSuchTransaction(); + + auto commitDecisionFuture2 = + *coordinatorService.recoverCommit(operationContext(), lsid(), txnNumber()); + + assertPrepareSentAndRespondWithSuccess(); + assertAbortSentAndRespondWithSuccess(); + + ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()), + static_cast<int>(commitDecisionFuture2.get())); +} + +TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationThatLeadsToCommit) { + TransactionCoordinatorService coordinatorService; + + coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); + + auto commitDecisionFuture1 = *coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber(), kTwoShardIdSet); + + assertPrepareSentAndRespondWithSuccess(); + + auto commitDecisionFuture2 = + *coordinatorService.recoverCommit(operationContext(), lsid(), txnNumber()); + + assertPrepareSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + + ASSERT_EQ(static_cast<int>(commitDecisionFuture1.get()), + static_cast<int>(commitDecisionFuture2.get())); } TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorToPrepare) { @@ -383,7 +484,7 @@ TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorT coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); // Coordinator sends prepare. - auto commitDecisionFuture = coordinatorService.coordinateCommit( + auto commitDecisionFuture = *coordinatorService.coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); // One participant responds with writeConcern error. @@ -414,7 +515,7 @@ TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorT coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); // Coordinator sends prepare. - auto commitDecisionFuture = coordinatorService.coordinateCommit( + auto commitDecisionFuture = *coordinatorService.coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); // One participant votes to abort. @@ -441,7 +542,7 @@ TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorT coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); // Coordinator sends prepare. - auto commitDecisionFuture = coordinatorService.coordinateCommit( + auto commitDecisionFuture = *coordinatorService.coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); // Both participants vote to commit. @@ -468,7 +569,7 @@ TEST_F(TransactionCoordinatorServiceTest, CoordinatorRetriesOnWriteConcernErrorT TEST_F(TransactionCoordinatorServiceTestSingleTxn, CoordinateCommitReturnsCorrectCommitDecisionOnAbort) { - auto commitDecisionFuture = coordinatorService()->coordinateCommit( + auto commitDecisionFuture = *coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); // Simulate a participant voting to abort. @@ -486,7 +587,7 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, TEST_F(TransactionCoordinatorServiceTestSingleTxn, CoordinateCommitWithNoVotesReturnsNotReadyFuture) { - auto commitDecisionFuture = coordinatorService()->coordinateCommit( + auto commitDecisionFuture = *coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); ASSERT_FALSE(commitDecisionFuture.isReady()); @@ -498,7 +599,7 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, TEST_F(TransactionCoordinatorServiceTestSingleTxn, CoordinateCommitReturnsCorrectCommitDecisionOnCommit) { - auto commitDecisionFuture = coordinatorService()->coordinateCommit( + auto commitDecisionFuture = *coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); assertPrepareSentAndRespondWithSuccess(); @@ -511,24 +612,12 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, static_cast<int>(TransactionCoordinator::CommitDecision::kCommit)); } -TEST_F(TransactionCoordinatorServiceTest, - CoordinateCommitRecoversCorrectCommitDecisionForTransactionThatAlreadyCommitted) { - // TODO (SERVER-37440): Implement test when coordinateCommit is made to work correctly on - // retries. -} - -TEST_F(TransactionCoordinatorServiceTest, - CoordinateCommitRecoversCorrectCommitDecisionForTransactionThatAlreadyAborted) { - // TODO (SERVER-37440): Implement test when coordinateCommit is made to work correctly on - // retries. -} - TEST_F(TransactionCoordinatorServiceTestSingleTxn, ConcurrentCallsToCoordinateCommitReturnSameDecisionOnCommit) { - auto commitDecisionFuture1 = coordinatorService()->coordinateCommit( + auto commitDecisionFuture1 = *coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - auto commitDecisionFuture2 = coordinatorService()->coordinateCommit( + auto commitDecisionFuture2 = *coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); commitTransaction(*coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet); @@ -540,9 +629,9 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, TEST_F(TransactionCoordinatorServiceTestSingleTxn, ConcurrentCallsToCoordinateCommitReturnSameDecisionOnAbort) { - auto commitDecisionFuture1 = coordinatorService()->coordinateCommit( + auto commitDecisionFuture1 = *coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - auto commitDecisionFuture2 = coordinatorService()->coordinateCommit( + auto commitDecisionFuture2 = *coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); abortTransaction( diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index be01a7d2db2..b873ecb43d7 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -41,6 +41,7 @@ #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/s/recover_transaction_decision_from_local_participant.h" #include "mongo/db/server_transactions_metrics.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog_mongod.h" @@ -3814,6 +3815,385 @@ TEST_F(TxnParticipantTest, GetOldestNonMajorityCommittedOpTimeReturnsOldestEntry ASSERT_EQ(nonMajorityCommittedOpTime, laterOpTime); } +class RecoverDecisionFromLocalParticipantTest : public TxnParticipantTest { +private: + void _putParticipantInProgressWithoutSessionCheckout() { + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->unstashTransactionResources(opCtx(), "insert"); + ASSERT(txnParticipant->inMultiDocumentTransaction()); + txnParticipant->stashTransactionResources(opCtx()); + } + + Timestamp _putParticipantInPrepareWithoutSessionCheckout() { + _putParticipantInProgressWithoutSessionCheckout(); + + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); + const auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); + ASSERT(txnParticipant->transactionIsPrepared()); + txnParticipant->stashTransactionResources(opCtx()); + return prepareTimestamp; + } + +protected: + void putParticipantInProgress() { + auto sessionCheckout = checkOutSession(); + _putParticipantInProgressWithoutSessionCheckout(); + } + + void putParticipantInAbortedWithoutPrepare() { + auto sessionCheckout = checkOutSession(); + + _putParticipantInProgressWithoutSessionCheckout(); + + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction"); + txnParticipant->abortActiveTransaction(opCtx()); + ASSERT(txnParticipant->transactionIsAborted()); + // No need to stash transaction resources after abort. + } + + void putParticipantInCommittedWithoutPrepare() { + auto sessionCheckout = checkOutSession(); + + _putParticipantInProgressWithoutSessionCheckout(); + + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction"); + txnParticipant->commitUnpreparedTransaction(opCtx()); + ASSERT(txnParticipant->transactionIsCommitted()); + // No need to stash transaction resources after commit. + } + + Timestamp putParticipantInPrepare() { + auto sessionCheckout = checkOutSession(); + return _putParticipantInPrepareWithoutSessionCheckout(); + } + + void putParticipantInAbortedAfterPrepare() { + auto sessionCheckout = checkOutSession(); + + _putParticipantInPrepareWithoutSessionCheckout(); + + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction"); + txnParticipant->abortActiveTransaction(opCtx()); + ASSERT(txnParticipant->transactionIsAborted()); + // No need to stash transaction resources after abort. + } + + void putParticipantInCommittedAfterPrepare() { + auto sessionCheckout = checkOutSession(); + + const auto prepareTimestamp = _putParticipantInPrepareWithoutSessionCheckout(); + const auto commitTS = Timestamp(prepareTimestamp.getSecs(), prepareTimestamp.getInc() + 1); + + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction"); + txnParticipant->commitPreparedTransaction(opCtx(), commitTS); + ASSERT_TRUE(txnParticipant->transactionIsCommitted()); + // No need to stash transaction resources after commit. + } + + void putParticipantInRetryableWrite() { + MongoDOperationContextSession checkOutSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), boost::none, boost::none); + ASSERT(!txnParticipant->inMultiDocumentTransaction() && + !txnParticipant->transactionIsCommitted() && + !txnParticipant->transactionIsAborted() && !txnParticipant->transactionIsPrepared()); + } + + void assertParticipantIsInProgressWithTxnNumber(const TxnNumber expectedTxnNumber) { + MongoDOperationContextSession checkOutSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber()); + ASSERT(txnParticipant->inMultiDocumentTransaction() && + !txnParticipant->transactionIsPrepared()); + } + + void assertParticipantIsPreparedWithTxnNumber(const TxnNumber expectedTxnNumber) { + MongoDOperationContextSession checkOutSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber()); + ASSERT(txnParticipant->transactionIsPrepared()); + } + + void assertParticipantIsAbortedWithTxnNumber(const TxnNumber expectedTxnNumber) { + MongoDOperationContextSession checkOutSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber()); + ASSERT(txnParticipant->transactionIsAborted()); + } + + void assertParticipantIsCommittedWithTxnNumber(const TxnNumber expectedTxnNumber) { + MongoDOperationContextSession checkOutSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber()); + ASSERT(txnParticipant->transactionIsCommitted()); + } + + void assertParticipantIsInRetryableWriteWithTxnNumber(const TxnNumber expectedTxnNumber) { + MongoDOperationContextSession checkOutSession(opCtx()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + ASSERT_EQUALS(expectedTxnNumber, txnParticipant->getActiveTxnNumber()); + ASSERT(!txnParticipant->inMultiDocumentTransaction() && + !txnParticipant->transactionIsCommitted() && + !txnParticipant->transactionIsAborted() && !txnParticipant->transactionIsPrepared()); + } + + void assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld( + const LogicalSessionId& lsid, const TxnNumber txnNumber) { + auto recoverDecisionThrowsTransactionTooOldFunc = [&](OperationContext* newOpCtx) { + newOpCtx->setLogicalSessionId(lsid); + newOpCtx->setTxnNumber(txnNumber); + + ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(newOpCtx), + AssertionException, + ErrorCodes::TransactionTooOld); + }; + runFunctionFromDifferentOpCtx(recoverDecisionThrowsTransactionTooOldFunc); + } + + void assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction( + const LogicalSessionId& lsid, const TxnNumber txnNumber) { + auto recoverDecisionThrowsNoSuchTransactionFunc = [&](OperationContext* newOpCtx) { + newOpCtx->setLogicalSessionId(lsid); + newOpCtx->setTxnNumber(txnNumber); + + ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(newOpCtx), + AssertionException, + ErrorCodes::NoSuchTransaction); + }; + runFunctionFromDifferentOpCtx(recoverDecisionThrowsNoSuchTransactionFunc); + } +}; + +// +// Local TransactionParticipant has *same* TxnNumber as recoverCommit request. +// + +TEST_F( + RecoverDecisionFromLocalParticipantTest, + AbortsActiveTransactionAndThrowsNoSuchTransactionIfParticipantIsInProgressWithSameTxnNumber) { + putParticipantInProgress(); + ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()), + AssertionException, + ErrorCodes::NoSuchTransaction); + assertParticipantIsAbortedWithTxnNumber(*opCtx()->getTxnNumber()); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + ThrowsNoSuchTransactionIfParticipantIsAbortedWithoutPrepareWithSameTxnNumber) { + putParticipantInAbortedWithoutPrepare(); + ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()), + AssertionException, + ErrorCodes::NoSuchTransaction); + assertParticipantIsAbortedWithTxnNumber(*opCtx()->getTxnNumber()); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + DoesNotThrowIfParticipantIsCommittedWithoutPrepareWithSameTxnNumber) { + putParticipantInCommittedWithoutPrepare(); + recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()); + assertParticipantIsCommittedWithTxnNumber(*opCtx()->getTxnNumber()); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + ThrowsAnonymousErrorIfParticipantIsPreparedWithSameTxnNumber) { + putParticipantInPrepare(); + ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()), + AssertionException, + 51021); + assertParticipantIsPreparedWithTxnNumber(*opCtx()->getTxnNumber()); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + ThrowsNoSuchTransactionIfParticipantIsAbortedAfterPrepareWithSameTxnNumber) { + putParticipantInAbortedAfterPrepare(); + ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()), + AssertionException, + ErrorCodes::NoSuchTransaction); + assertParticipantIsAbortedWithTxnNumber(*opCtx()->getTxnNumber()); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + DoesNotThrowIfParticipantIsCommittedAfterPrepareWithSameTxnNumber) { + putParticipantInCommittedAfterPrepare(); + recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()); + assertParticipantIsCommittedWithTxnNumber(*opCtx()->getTxnNumber()); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + ThrowsNoSuchTransactionIfTxnNumberCorrespondsToRetryableWrite) { + putParticipantInRetryableWrite(); + ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(opCtx()), + AssertionException, + ErrorCodes::NoSuchTransaction); + assertParticipantIsInRetryableWriteWithTxnNumber(*opCtx()->getTxnNumber()); +} + +// +// Local TransactionParticipant has *higher* TxnNumber than recoverCommit request. +// + +TEST_F(RecoverDecisionFromLocalParticipantTest, + ThrowsTransactionTooOldIfTransactionParticipantInProgressHasHigherTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto oldTxnNumber = participantTxnNumber - 1; + + putParticipantInProgress(); + assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber); + assertParticipantIsInProgressWithTxnNumber(participantTxnNumber); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + ThrowsTransactionTooOldIfTransactionParticipantInPrepareHasHigherTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto oldTxnNumber = participantTxnNumber - 1; + + putParticipantInPrepare(); + assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber); + assertParticipantIsPreparedWithTxnNumber(participantTxnNumber); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + ThrowsTransactionTooOldIfTransactionParticipantInAbortedHasHigherTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto oldTxnNumber = participantTxnNumber - 1; + + putParticipantInAbortedAfterPrepare(); + assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber); + assertParticipantIsAbortedWithTxnNumber(participantTxnNumber); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + ThrowsTransactionTooOldIfTransactionParticipantInCommittedHasHigherTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto oldTxnNumber = participantTxnNumber - 1; + + putParticipantInCommittedAfterPrepare(); + assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber); + assertParticipantIsCommittedWithTxnNumber(participantTxnNumber); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, + ThrowsTransactionTooOldIfTransactionParticipantInRetryableWriteHasHigherTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto oldTxnNumber = participantTxnNumber - 1; + + putParticipantInRetryableWrite(); + assertRecoverDecisionFromDifferentOpCtxThrowsTransactionTooOld(lsid, oldTxnNumber); + assertParticipantIsInRetryableWriteWithTxnNumber(participantTxnNumber); +} + +// +// Local TransactionParticipant has *lower* TxnNumber than recoverCommit request. +// + +TEST_F( + RecoverDecisionFromLocalParticipantTest, + AbortsOlderAndNewerTransactionAndThrowsNoSuchTransactionIfParticipantIsInProgressWithLowerTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto newTxnNumber = participantTxnNumber + 1; + + putParticipantInProgress(); + assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber); + assertParticipantIsAbortedWithTxnNumber(newTxnNumber); +} + +TEST_F(RecoverDecisionFromLocalParticipantTest, ThrowsIfParticipantIsInPrepareWithLowerTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto newTxnNumber = participantTxnNumber + 1; + + putParticipantInPrepare(); + auto recoverDecisionThrowsAnonymousErrorFunc = [&](OperationContext* newOpCtx) { + newOpCtx->setLogicalSessionId(lsid); + newOpCtx->setTxnNumber(newTxnNumber); + + ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(newOpCtx), + AssertionException, + 51021); + }; + runFunctionFromDifferentOpCtx(recoverDecisionThrowsAnonymousErrorFunc); + assertParticipantIsPreparedWithTxnNumber(participantTxnNumber); +} + +TEST_F( + RecoverDecisionFromLocalParticipantTest, + StartsAndAbortsNewerTransactionAndThrowsNoSuchTransactionIfParticipantIsAbortedWithLowerTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto newTxnNumber = participantTxnNumber + 1; + + putParticipantInAbortedAfterPrepare(); + assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber); + assertParticipantIsAbortedWithTxnNumber(newTxnNumber); +} + +TEST_F( + RecoverDecisionFromLocalParticipantTest, + StartsAndAbortsNewerTransactionAndThrowsNoSuchTransactionIfParticipantIsCommittedWithLowerTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto newTxnNumber = participantTxnNumber + 1; + + putParticipantInCommittedAfterPrepare(); + assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber); + assertParticipantIsAbortedWithTxnNumber(newTxnNumber); +} + +TEST_F( + RecoverDecisionFromLocalParticipantTest, + StartsAndAbortsNewerTransactionAndThrowsNoSuchTransactionIfParticipantIsInRetryableWriteWithLowerTxnNumber) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto newTxnNumber = participantTxnNumber + 1; + + putParticipantInRetryableWrite(); + assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber); + assertParticipantIsAbortedWithTxnNumber(newTxnNumber); +} + +// +// Recovering the decision is retryable. +// + +TEST_F(RecoverDecisionFromLocalParticipantTest, + RecoverDecisionCanBeRetriedIfFirstTryThrowsLockTimeout) { + const auto lsid = *opCtx()->getLogicalSessionId(); + const auto participantTxnNumber = *opCtx()->getTxnNumber(); + const auto newTxnNumber = participantTxnNumber + 1; + + putParticipantInCommittedAfterPrepare(); + + // First recoverDecision attempt throws LockTimeout. + { + Lock::GlobalLock lk(opCtx(), MODE_X); + auto recoverDecisionThrowsLockTimeoutFunc = [&](OperationContext* newOpCtx) { + newOpCtx->setLogicalSessionId(lsid); + newOpCtx->setTxnNumber(newTxnNumber); + + ASSERT_THROWS_CODE(recoverDecisionFromLocalParticipantOrAbortLocalParticipant(newOpCtx), + AssertionException, + ErrorCodes::LockTimeout); + }; + runFunctionFromDifferentOpCtx(recoverDecisionThrowsLockTimeoutFunc); + } + assertParticipantIsInProgressWithTxnNumber(newTxnNumber); + + // Retry recoverDecision. + assertRecoverDecisionFromDifferentOpCtxThrowsNoSuchTransaction(lsid, newTxnNumber); + + assertParticipantIsAbortedWithTxnNumber(newTxnNumber); +} } // namespace } // namespace mongo |