diff options
Diffstat (limited to 'src')
20 files changed, 860 insertions, 912 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 6b2ff3d0762..0d79c3a1618 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -333,3 +333,5 @@ error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag", error_class("ExceededTimeLimitError", ["ExceededTimeLimit", "MaxTimeMSExpired", "NetworkInterfaceExceededTimeLimit"]) error_class("SnapshotError", ["SnapshotTooOld", "SnapshotUnavailable", "StaleChunkHistory", "MigrationConflict"]) + +error_class("VoteAbortError", ["NoSuchTransaction", "TransactionTooOld"]) diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp index 99ce548cd23..c043a38dba8 100644 --- a/src/mongo/db/commands/txn_cmds.cpp +++ b/src/mongo/db/commands/txn_cmds.cpp @@ -46,6 +46,9 @@ namespace mongo { namespace { +MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForAbortAfterExecutingAbortLogic); +MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForCommitAfterExecutingCommitLogic); + class CmdCommitTxn : public BasicCommand { public: CmdCommitTxn() : BasicCommand("commitTransaction") {} @@ -94,6 +97,11 @@ public: // commit oplog entry. auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient()); replClient.setLastOpToSystemLastOpTime(opCtx); + if (MONGO_FAIL_POINT(participantReturnNetworkErrorForCommitAfterExecutingCommitLogic)) { + uasserted(ErrorCodes::SocketException, + "returning network error because failpoint is on"); + } + return true; } @@ -109,6 +117,10 @@ public: // commitUnpreparedTransaction will throw if the transaction is prepared. txnParticipant->commitUnpreparedTransaction(opCtx); } + if (MONGO_FAIL_POINT(participantReturnNetworkErrorForCommitAfterExecutingCommitLogic)) { + uasserted(ErrorCodes::SocketException, + "returning network error because failpoint is on"); + } return true; } @@ -172,6 +184,11 @@ public: << exceptionToStatus()); throw; } + if (MONGO_FAIL_POINT(participantReturnNetworkErrorForAbortAfterExecutingAbortLogic)) { + uasserted(ErrorCodes::SocketException, + "returning network error because failpoint is on"); + } + return true; } diff --git a/src/mongo/db/commands/txn_two_phase_commit_cmds.idl b/src/mongo/db/commands/txn_two_phase_commit_cmds.idl index d90cc4fa49a..0b77fc9827f 100644 --- a/src/mongo/db/commands/txn_two_phase_commit_cmds.idl +++ b/src/mongo/db/commands/txn_two_phase_commit_cmds.idl @@ -45,10 +45,6 @@ commands: description: "Parser for the 'prepareTransaction' command." strict: true namespace: ignored - fields: - coordinatorId: - description: "The coordinator shard for this transaction." - type: shard_id voteCommitTransaction: description: "Parser for the 'voteCommitTransaction' command." diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index cb4766efc4e..e535c31fc5b 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -48,7 +48,7 @@ namespace mongo { namespace { -MONGO_FAIL_POINT_DEFINE(skipShardingPartsOfPrepareTransaction); +MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic); class PrepareTransactionCmd : public TypedCommand<PrepareTransactionCmd> { public: @@ -71,13 +71,9 @@ public: using InvocationBase::InvocationBase; Response typedRun(OperationContext* opCtx) { - // In production, only config servers or initialized shard servers can participate in a - // sharded transaction. However, many test suites test the replication and storage parts - // of prepareTransaction against a standalone replica set, so allow skipping the check. - if (!MONGO_FAIL_POINT(skipShardingPartsOfPrepareTransaction)) { - if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { - uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); - } + if (!getTestCommandsEnabled() && + serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { + uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); } auto txnParticipant = TransactionParticipant::get(opCtx); @@ -99,8 +95,6 @@ public: "Transaction isn't in progress", txnParticipant->inMultiDocumentTransaction()); - const auto& cmd = request(); - if (txnParticipant->transactionIsPrepared()) { auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient()); auto prepareOpTime = txnParticipant->getPrepareOpTime(); @@ -118,96 +112,24 @@ public: << " participant prepareOpTime: " << prepareOpTime.toString()); - // A participant should re-send its vote if it re-received prepare. - _sendVoteCommit(opCtx, prepareOpTime.getTimestamp(), cmd.getCoordinatorId()); - + if (MONGO_FAIL_POINT( + participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic)) { + uasserted(ErrorCodes::SocketException, + "returning network error because failpoint is on"); + } return PrepareTimestamp(prepareOpTime.getTimestamp()); } - // TODO (SERVER-36839): Pass coordinatorId into prepareTransaction() so that the - // coordinatorId can be included in the write to config.transactions. const auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx, {}); - _sendVoteCommit(opCtx, prepareTimestamp, cmd.getCoordinatorId()); - - return PrepareTimestamp(prepareTimestamp); - } - - private: - void _sendVoteCommit(OperationContext* opCtx, - Timestamp prepareTimestamp, - ShardId coordinatorId) { - // In a production cluster, a participant should always send its vote to the coordinator - // as part of prepareTransaction. However, many test suites test the replication and - // storage parts of prepareTransaction against a standalone replica set, so allow - // skipping sending a vote. - if (MONGO_FAIL_POINT(skipShardingPartsOfPrepareTransaction)) { - return; - } - - VoteCommitTransaction voteCommit; - voteCommit.setDbName("admin"); - voteCommit.setShardId(ShardingState::get(opCtx)->shardId()); - voteCommit.setPrepareTimestamp(prepareTimestamp); - BSONObj voteCommitObj = voteCommit.toBSON( - BSON("lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber" - << *opCtx->getTxnNumber() - << "autocommit" - << false)); - _sendVote(opCtx, voteCommitObj, coordinatorId); - } - - void _sendVoteAbort(OperationContext* opCtx, ShardId coordinatorId) { - // In a production cluster, a participant should always send its vote to the coordinator - // as part of prepareTransaction. However, many test suites test the replication and - // storage parts of prepareTransaction against a standalone replica set, so allow - // skipping sending a vote. - if (MONGO_FAIL_POINT(skipShardingPartsOfPrepareTransaction)) { - return; - } - - VoteAbortTransaction voteAbort; - voteAbort.setDbName("admin"); - voteAbort.setShardId(ShardingState::get(opCtx)->shardId()); - BSONObj voteAbortObj = voteAbort.toBSON( - BSON("lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber" - << *opCtx->getTxnNumber() - << "autocommit" - << false)); - _sendVote(opCtx, voteAbortObj, coordinatorId); - } - - void _sendVote(OperationContext* opCtx, const BSONObj& voteObj, ShardId coordinatorId) { - try { - // TODO (SERVER-37328): Participant should wait for writeConcern before sending its - // vote. - - LOG(3) << "Participant shard sending " << voteObj << " to " << coordinatorId; - - const auto coordinatorPrimaryHost = [&] { - auto coordinatorShard = uassertStatusOK( - Grid::get(opCtx)->shardRegistry()->getShard(opCtx, coordinatorId)); - return uassertStatusOK(coordinatorShard->getTargeter()->findHostNoWait( - ReadPreferenceSetting{ReadPreference::PrimaryOnly})); - }(); - - const executor::RemoteCommandRequest request( - coordinatorPrimaryHost, - NamespaceString::kAdminDb.toString(), - voteObj, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}.toContainingBSON(), - opCtx, - executor::RemoteCommandRequest::kNoTimeout); - - auto noOp = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {}; - uassertStatusOK( - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()->scheduleRemoteCommand( - request, noOp)); - } catch (const DBException& ex) { - LOG(3) << "Participant shard failed to send " << voteObj << " to " << coordinatorId - << causedBy(ex.toStatus()); + if (MONGO_FAIL_POINT( + participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic)) { + uasserted(ErrorCodes::SocketException, + "returning network error because failpoint is on"); } + return PrepareTimestamp(std::move(prepareTimestamp)); } + private: bool supportsWriteConcern() const override { return true; } @@ -233,122 +155,6 @@ public: } } prepareTransactionCmd; -class VoteCommitTransactionCmd : public TypedCommand<VoteCommitTransactionCmd> { -public: - using Request = VoteCommitTransaction; - class Invocation final : public InvocationBase { - public: - using InvocationBase::InvocationBase; - - void typedRun(OperationContext* opCtx) { - // Only config servers or initialized shard servers can act as transaction coordinators. - if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { - uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); - } - - uassert( - ErrorCodes::CommandNotSupported, - "'voteCommitTransaction' is only supported in feature compatibility version 4.2", - (serverGlobalParams.featureCompatibility.getVersion() == - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42)); - - const auto& cmd = request(); - - LOG(3) << "Coordinator shard received voteCommit from " << cmd.getShardId() - << " with prepare timestamp " << cmd.getPrepareTimestamp() << " for transaction " - << opCtx->getTxnNumber() << " on session " - << opCtx->getLogicalSessionId()->toBSON(); - - TransactionCoordinatorService::get(opCtx)->voteCommit( - opCtx, - opCtx->getLogicalSessionId().get(), - opCtx->getTxnNumber().get(), - cmd.getShardId(), - cmd.getPrepareTimestamp()); - } - - private: - bool supportsWriteConcern() const override { - return false; - } - - NamespaceString ns() const override { - return NamespaceString(request().getDbName(), ""); - } - - void doCheckAuthorization(OperationContext* opCtx) const override {} - }; - - virtual bool adminOnly() const { - return true; - } - - std::string help() const override { - return "Votes to commit a transaction; sent by a transaction participant to the " - "transaction commit coordinator for a cross-shard transaction"; - } - - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kNever; - } -} voteCommitTransactionCmd; - -class VoteAbortTransactionCmd : public TypedCommand<VoteAbortTransactionCmd> { -public: - using Request = VoteAbortTransaction; - class Invocation final : public InvocationBase { - public: - using InvocationBase::InvocationBase; - - void typedRun(OperationContext* opCtx) { - // Only config servers or initialized shard servers can act as transaction coordinators. - if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) { - uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands()); - } - - uassert(ErrorCodes::CommandNotSupported, - "'voteAbortTransaction' is only supported in feature compatibility version 4.2", - (serverGlobalParams.featureCompatibility.getVersion() == - ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42)); - - const auto& cmd = request(); - - LOG(3) << "Coordinator shard received voteAbort from " << cmd.getShardId() - << " for transaction " << opCtx->getTxnNumber() << " on session " - << opCtx->getLogicalSessionId()->toBSON(); - - TransactionCoordinatorService::get(opCtx)->voteAbort(opCtx, - opCtx->getLogicalSessionId().get(), - opCtx->getTxnNumber().get(), - cmd.getShardId()); - } - - private: - bool supportsWriteConcern() const override { - return false; - } - - NamespaceString ns() const override { - return NamespaceString(request().getDbName(), ""); - } - - void doCheckAuthorization(OperationContext* opCtx) const override {} - }; - - virtual bool adminOnly() const { - return true; - } - - std::string help() const override { - return "Votes to abort a transaction; sent by a transaction participant to the transaction " - "commit coordinator for a cross-shard transaction"; - } - - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kNever; - } -} voteAbortTransactionCmd; - // TODO (SERVER-37440): Make coordinateCommit idempotent. class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTransactionCmd> { public: @@ -397,61 +203,16 @@ public: opCtx->getTxnNumber().get(), participantList); - // If the commit decision is already available before we prepare locally, it means the - // transaction has completed and we should skip preparing locally. - // - // TODO (SERVER-37440): Reconsider when coordinateCommit is made idempotent. - if (!commitDecisionFuture.isReady()) { - // Execute the 'prepare' logic on the local participant (the router does not send a - // separate 'prepare' message to the coordinator shard). - _callPrepareOnLocalParticipant(opCtx); - } - // Block waiting for the commit decision. auto commitDecision = commitDecisionFuture.get(opCtx); // If the decision was abort, propagate NoSuchTransaction exception back to mongos. uassert(ErrorCodes::NoSuchTransaction, "Transaction was aborted", - commitDecision != TransactionCoordinatorService::CommitDecision::kAbort); + commitDecision != TransactionCoordinator::CommitDecision::kAbort); } private: - void _callPrepareOnLocalParticipant(OperationContext* opCtx) { - auto localParticipantPrepareTimestamp = [&]() -> Timestamp { - OperationSessionInfoFromClient sessionInfo; - sessionInfo.setAutocommit(false); - sessionInfo.setCoordinator(false); - OperationContextSessionMongod checkOutSession(opCtx, true, sessionInfo); - - auto txnParticipant = TransactionParticipant::get(opCtx); - - txnParticipant->unstashTransactionResources(opCtx, "prepareTransaction"); - ScopeGuard guard = MakeGuard([&txnParticipant, opCtx]() { - txnParticipant->abortActiveUnpreparedOrStashPreparedTransaction(opCtx); - }); - - auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx, {}); - - txnParticipant->stashTransactionResources(opCtx); - guard.Dismiss(); - return prepareTimestamp; - }(); - - LOG(3) << "Participant shard delivering voteCommit with prepareTimestamp " - << localParticipantPrepareTimestamp << " to local coordinator for transaction " - << opCtx->getTxnNumber() << " on session " - << opCtx->getLogicalSessionId()->toBSON(); - - // Deliver the local participant's vote to the coordinator. - TransactionCoordinatorService::get(opCtx)->voteCommit( - opCtx, - opCtx->getLogicalSessionId().get(), - opCtx->getTxnNumber().get(), - ShardingState::get(opCtx)->shardId(), - localParticipantPrepareTimestamp); - } - bool supportsWriteConcern() const override { return true; } diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp index 03bf534a104..cb12b61c719 100644 --- a/src/mongo/db/transaction_coordinator.cpp +++ b/src/mongo/db/transaction_coordinator.cpp @@ -28,14 +28,15 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction #include "mongo/platform/basic.h" +#include "mongo/db/logical_clock.h" +#include "mongo/db/service_context.h" #include "mongo/db/session_catalog.h" #include "mongo/db/transaction_coordinator.h" - -#include "mongo/db/session.h" +#include "mongo/util/log.h" namespace mongo { @@ -43,46 +44,114 @@ using Action = TransactionCoordinator::StateMachine::Action; using Event = TransactionCoordinator::StateMachine::Event; using State = TransactionCoordinator::StateMachine::State; +// +// Pre-decision +// + Action TransactionCoordinator::recvCoordinateCommit(const std::set<ShardId>& participants) { stdx::unique_lock<stdx::mutex> lk(_mutex); _participantList.recordFullList(participants); return _stateMachine.onEvent(std::move(lk), Event::kRecvParticipantList); } -Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp) { +Action TransactionCoordinator::madeParticipantListDurable() { stdx::unique_lock<stdx::mutex> lk(_mutex); - - _participantList.recordVoteCommit(shardId, prepareTimestamp); - - auto event = (_participantList.allParticipantsVotedCommit()) ? Event::kRecvFinalVoteCommit - : Event::kRecvVoteCommit; - return _stateMachine.onEvent(std::move(lk), event); + return _stateMachine.onEvent(std::move(lk), Event::kMadeParticipantListDurable); } +// +// Abort path +// + Action TransactionCoordinator::recvVoteAbort(const ShardId& shardId) { stdx::unique_lock<stdx::mutex> lk(_mutex); _participantList.recordVoteAbort(shardId); return _stateMachine.onEvent(std::move(lk), Event::kRecvVoteAbort); } -Action TransactionCoordinator::recvTryAbort() { +Action TransactionCoordinator::madeAbortDecisionDurable() { stdx::unique_lock<stdx::mutex> lk(_mutex); - return _stateMachine.onEvent(std::move(lk), Event::kRecvTryAbort); + return _stateMachine.onEvent(std::move(lk), Event::kMadeAbortDecisionDurable); } -void TransactionCoordinator::recvCommitAck(const ShardId& shardId) { +Action TransactionCoordinator::recvAbortAck(const ShardId& shardId) { stdx::unique_lock<stdx::mutex> lk(_mutex); - _participantList.recordCommitAck(shardId); - if (_participantList.allParticipantsAckedCommit()) { - _stateMachine.onEvent(std::move(lk), Event::kRecvFinalCommitAck); + _participantList.recordAbortAck(shardId); + auto event = _participantList.allParticipantsAckedAbort() ? Event::kRecvFinalAbortAck + : Event::kRecvAbortAck; + return _stateMachine.onEvent(std::move(lk), event); +} + +// +// Commit path +// + +Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _participantList.recordVoteCommit(shardId, prepareTimestamp); + auto event = (_participantList.allParticipantsVotedCommit()) ? Event::kRecvFinalVoteCommit + : Event::kRecvVoteCommit; + if (event == Event::kRecvFinalVoteCommit) { + const auto maxPrepareTs = _participantList.getHighestPrepareTimestamp(); + _commitTimestamp = Timestamp(maxPrepareTs.getSecs(), maxPrepareTs.getInc() + 1); + Status s = LogicalClock::get(getGlobalServiceContext()) + ->advanceClusterTime(LogicalTime(_commitTimestamp.get())); + if (!s.isOK()) { + log() << "Coordinator shard failed to advance cluster time to commitTimestamp " + << causedBy(s); + } } + return _stateMachine.onEvent(std::move(lk), event); +} + +Action TransactionCoordinator::madeCommitDecisionDurable() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + return _stateMachine.onEvent(std::move(lk), Event::kMadeCommitDecisionDurable); } +Action TransactionCoordinator::recvCommitAck(const ShardId& shardId) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _participantList.recordCommitAck(shardId); + auto event = _participantList.allParticipantsAckedCommit() ? Event::kRecvFinalCommitAck + : Event::kRecvCommitAck; + return _stateMachine.onEvent(std::move(lk), event); +} + +// +// Any time +// + Future<TransactionCoordinator::StateMachine::State> TransactionCoordinator::waitForCompletion() { stdx::unique_lock<stdx::mutex> lk(_mutex); return _stateMachine.waitForTransitionTo({State::kCommitted, State::kAborted}); } +Future<TransactionCoordinator::CommitDecision> TransactionCoordinator::waitForDecision() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + return _stateMachine + .waitForTransitionTo({State::kWaitingForAbortAcks, + State::kWaitingForCommitAcks, + State::kCommitted, + State::kAborted}) + .then([](auto state) { + switch (state) { + case TransactionCoordinator::StateMachine::State::kWaitingForAbortAcks: + case TransactionCoordinator::StateMachine::State::kAborted: + return TransactionCoordinator::CommitDecision::kAbort; + case TransactionCoordinator::StateMachine::State::kWaitingForCommitAcks: + case TransactionCoordinator::StateMachine::State::kCommitted: + return TransactionCoordinator::CommitDecision::kCommit; + default: + MONGO_UNREACHABLE; + } + }); +} + +Action TransactionCoordinator::recvTryAbort() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + return _stateMachine.onEvent(std::move(lk), Event::kRecvTryAbort); +} + // // StateMachine // @@ -101,39 +170,70 @@ Future<TransactionCoordinator::StateMachine::State> TransactionCoordinator::wait const std::map<State, std::map<Event, TransactionCoordinator::StateMachine::Transition>> TransactionCoordinator::StateMachine::transitionTable = { // clang-format off - {State::kWaitingForParticipantList, { - {Event::kRecvVoteAbort, {Action::kSendAbort, State::kAborted}}, - {Event::kRecvVoteCommit, {}}, - {Event::kRecvParticipantList, {State::kWaitingForVotes}}, - {Event::kRecvTryAbort, {Action::kSendAbort, State::kAborted}}, + {State::kUninitialized, { + {Event::kRecvParticipantList, {Action::kWriteParticipantList, State::kMakingParticipantListDurable}}, + {Event::kRecvTryAbort, {{}, State::kAborted}}, + }}, + {State::kMakingParticipantListDurable, { + {Event::kRecvParticipantList, {}}, + {Event::kMadeParticipantListDurable, {Action::kSendPrepare, State::kWaitingForVotes}}, + {Event::kRecvTryAbort, {}}, }}, {State::kWaitingForVotes, { - {Event::kRecvVoteAbort, {Action::kSendAbort, State::kAborted}}, + {Event::kRecvParticipantList, {}}, + {Event::kRecvVoteAbort, {Action::kWriteAbortDecision, State::kMakingAbortDecisionDurable}}, {Event::kRecvVoteCommit, {}}, + {Event::kRecvFinalVoteCommit, {Action::kWriteCommitDecision, State::kMakingCommitDecisionDurable}}, + {Event::kRecvTryAbort, {}}, + }}, + + // Abort path + // Note: Can continue to receive votes after abort decision has been made, because an abort + // decision only requires a single voteAbort. + {State::kMakingAbortDecisionDurable, { + {Event::kRecvParticipantList, {}}, + {Event::kRecvVoteAbort, {}}, + {Event::kRecvVoteCommit, {}}, + {Event::kMadeAbortDecisionDurable, {Action::kSendAbort, State::kWaitingForAbortAcks}}, + {Event::kRecvTryAbort, {}}, + }}, + {State::kWaitingForAbortAcks, { {Event::kRecvParticipantList, {}}, - {Event::kRecvFinalVoteCommit, {Action::kSendCommit, State::kWaitingForCommitAcks}}, - {Event::kRecvTryAbort, {Action::kSendAbort, State::kAborted}}, + {Event::kRecvVoteAbort, {}}, + {Event::kRecvVoteCommit, {}}, + {Event::kRecvAbortAck, {}}, + {Event::kRecvFinalAbortAck, {Action::kDone, State::kAborted}}, + {Event::kRecvTryAbort, {}}, + }}, {State::kAborted, { - {Event::kRecvVoteAbort, {}}, - {Event::kRecvVoteCommit, {Action::kSendAbort}}, {Event::kRecvParticipantList, {}}, + {Event::kRecvVoteAbort, {}}, + {Event::kRecvVoteCommit, {}}, {Event::kRecvTryAbort, {}}, }}, + + // Commit path + // Note: Cannot continue to receive votes after commit decision has been made, because a + // commit decision requires all voteCommits. + {State::kMakingCommitDecisionDurable, { + {Event::kRecvParticipantList, {}}, + {Event::kMadeCommitDecisionDurable, {Action::kSendCommit, State::kWaitingForCommitAcks}}, + {Event::kRecvTryAbort, {}}, + + }}, {State::kWaitingForCommitAcks, { - {Event::kRecvVoteCommit, {}}, {Event::kRecvParticipantList, {}}, - {Event::kRecvFinalVoteCommit, {Action::kSendCommit}}, - {Event::kRecvFinalCommitAck, {State::kCommitted}}, + {Event::kRecvCommitAck, {}}, + {Event::kRecvFinalCommitAck, {Action::kDone, State::kCommitted}}, {Event::kRecvTryAbort, {}}, + }}, {State::kCommitted, { - {Event::kRecvVoteCommit, {}}, {Event::kRecvParticipantList, {}}, - {Event::kRecvFinalVoteCommit, {}}, - {Event::kRecvFinalCommitAck, {}}, {Event::kRecvTryAbort, {}}, }}, + {State::kBroken, {}}, // clang-format on }; @@ -175,6 +275,7 @@ void TransactionCoordinator::StateMachine::_signalAllPromisesWaitingForState( Action TransactionCoordinator::StateMachine::onEvent(stdx::unique_lock<stdx::mutex> lk, Event event) { + const auto legalTransitions = transitionTable.find(_state)->second; if (!legalTransitions.count(event)) { std::string errmsg = str::stream() << "Transaction coordinator received illegal event '" @@ -184,9 +285,20 @@ Action TransactionCoordinator::StateMachine::onEvent(stdx::unique_lock<stdx::mut } const auto transition = legalTransitions.find(event)->second; + if (transition.nextState) { + StringBuilder ss; + ss << "TransactionCoordinator received event " << event << " while in state " << _state + << " and returning " << transition.action << " and transitioning to " + << *transition.nextState; + LOG(3) << ss.str(); _state = *transition.nextState; _signalAllPromisesWaitingForState(std::move(lk), _state); + } else { + StringBuilder ss; + ss << "TransactionCoordinator received event " << event << " while in state " << _state + << " and returning " << transition.action << " and not transitioning to new state"; + LOG(3) << ss.str(); } return transition.action; @@ -288,12 +400,13 @@ void TransactionCoordinator::ParticipantList::recordVoteAbort(const ShardId& sha participant.vote != Participant::Vote::kCommit); participant.vote = Participant::Vote::kAbort; + participant.ack = Participant::Ack::kAbort; } void TransactionCoordinator::ParticipantList::recordCommitAck(const ShardId& shardId) { auto it = _participants.find(shardId); uassert( - ErrorCodes::InternalError, + 50989, str::stream() << "Transaction commit coordinator processed 'commit' ack from participant " << shardId.toString() << " not in participant list", @@ -301,6 +414,17 @@ void TransactionCoordinator::ParticipantList::recordCommitAck(const ShardId& sha it->second.ack = Participant::Ack::kCommit; } +void TransactionCoordinator::ParticipantList::recordAbortAck(const ShardId& shardId) { + auto it = _participants.find(shardId); + uassert( + 50990, + str::stream() << "Transaction commit coordinator processed 'abort' ack from participant " + << shardId.toString() + << " not in participant list", + it != _participants.end()); + it->second.ack = Participant::Ack::kAbort; +} + bool TransactionCoordinator::ParticipantList::allParticipantsVotedCommit() const { return _fullListReceived && std::all_of(_participants.begin(), _participants.end(), @@ -309,6 +433,13 @@ bool TransactionCoordinator::ParticipantList::allParticipantsVotedCommit() const }); } +bool TransactionCoordinator::ParticipantList::allParticipantsAckedAbort() const { + return std::all_of( + _participants.begin(), _participants.end(), [](const std::pair<ShardId, Participant>& i) { + return i.second.ack == Participant::Ack::kAbort; + }); +} + bool TransactionCoordinator::ParticipantList::allParticipantsAckedCommit() const { invariant(_fullListReceived); return std::all_of( @@ -329,6 +460,14 @@ Timestamp TransactionCoordinator::ParticipantList::getHighestPrepareTimestamp() return highestPrepareTimestamp; } +std::set<ShardId> TransactionCoordinator::ParticipantList::getParticipants() const { + std::set<ShardId> participants; + for (const auto& kv : _participants) { + participants.insert(kv.first); + } + return participants; +} + std::set<ShardId> TransactionCoordinator::ParticipantList::getNonAckedCommitParticipants() const { std::set<ShardId> nonAckedCommitParticipants; for (const auto& kv : _participants) { diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h index f0e6f0612eb..510a3339a9e 100644 --- a/src/mongo/db/transaction_coordinator.h +++ b/src/mongo/db/transaction_coordinator.h @@ -33,10 +33,12 @@ #include <boost/optional.hpp> #include <list> #include <map> +#include <memory> #include <set> #include "mongo/base/disallow_copying.h" #include "mongo/bson/timestamp.h" +#include "mongo/db/logical_session_id.h" #include "mongo/s/shard_id.h" #include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" @@ -53,26 +55,37 @@ class Session; * A state machine that coordinates a distributed transaction commit with the transaction * participants. */ -class TransactionCoordinator { +class TransactionCoordinator : public std::enable_shared_from_this<TransactionCoordinator> { MONGO_DISALLOW_COPYING(TransactionCoordinator); public: TransactionCoordinator() = default; ~TransactionCoordinator() = default; + enum class CommitDecision { + kCommit, + kAbort, + }; + /** * The internal state machine, or "brain", used by the TransactionCoordinator to determine what * to do in response to an "event" (receiving a request or hearing back a response). */ class StateMachine { - friend class TransactionCoordinator; - public: ~StateMachine(); enum class State { - kWaitingForParticipantList, + kUninitialized, + kMakingParticipantListDurable, kWaitingForVotes, + + // Abort path + kMakingAbortDecisionDurable, + kWaitingForAbortAcks, kAborted, + + // Commit path + kMakingCommitDecisionDurable, kWaitingForCommitAcks, kCommitted, @@ -85,16 +98,36 @@ public: // State machine inputs enum class Event { + kRecvParticipantList, + kMadeParticipantListDurable, + + // Abort path kRecvVoteAbort, + kMadeAbortDecisionDurable, + kRecvAbortAck, + kRecvFinalAbortAck, + + // Commit path kRecvVoteCommit, - kRecvParticipantList, kRecvFinalVoteCommit, + kMadeCommitDecisionDurable, + kRecvCommitAck, kRecvFinalCommitAck, + kRecvTryAbort, }; // State machine outputs - enum class Action { kNone, kSendCommit, kSendAbort }; + enum class Action { + kNone, + kWriteParticipantList, + kSendPrepare, + kWriteAbortDecision, + kSendAbort, + kWriteCommitDecision, + kSendCommit, + kDone + }; // IMPORTANT: If there is a state transition, this will release the lock in order to signal // any promises that may be waiting on a state change, and will not reacquire it. @@ -137,7 +170,7 @@ public: }; static const std::map<State, std::map<Event, Transition>> transitionTable; - State _state{State::kWaitingForParticipantList}; + State _state{State::kUninitialized}; std::list<StateTransitionPromise> _stateTransitionPromises; }; @@ -145,38 +178,77 @@ public: * The coordinateCommit command contains the full participant list that this node is responsible * for coordinating the commit across. * - * Stores the participant list. + * Stores the participant list and returns the next action to take. * * Throws if any participants that this node has already heard a vote from are not in the list. */ StateMachine::Action recvCoordinateCommit(const std::set<ShardId>& participants); /** - * A participant sends a voteCommit command with its prepareTimestamp if it succeeded in - * preparing the transaction. + * Advances the state machine and returns the next action to take. + */ + StateMachine::Action madeParticipantListDurable(); + + // + // Abort path + // + + /** + * A participant responds to prepare with failure if it failed to prepare the transaction, has + * timed out and already aborted the transaction, or has received a higher transaction number. * - * Stores the participant's vote. + * Stores the participant's vote and returns the next action to take. * * Throws if the full participant list has been received and this shard is not one of the * participants. */ - StateMachine::Action recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp); + StateMachine::Action recvVoteAbort(const ShardId& shardId); + + /** + * Advances the state machine and returns the next action to take. + */ + StateMachine::Action madeAbortDecisionDurable(); /** - * A participant sends a voteAbort command if it failed to prepare the transaction. + * If this is the final abort ack, advances the state machine. Returns the next action to take. + */ + StateMachine::Action recvAbortAck(const ShardId& shardId); + + // + // Commit path + // + + /** + * A participant responds to prepare with success and its prepare Timestamp if it succeeded in + * preparing the transaction. * - * Stores the participant's vote and causes the coordinator to decide to abort the transaction. + * Stores the participant's vote and prepare Timestamp and returns the next action to take. * * Throws if the full participant list has been received and this shard is not one of the * participants. */ - StateMachine::Action recvVoteAbort(const ShardId& shardId); + StateMachine::Action recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp); /** - * A tryAbort event is received by the coordinator when a transaction is implicitly aborted when - * a new transaction is received for the same session with a higher transaction number. + * Advances the state machine and returns the next action to take. */ - StateMachine::Action recvTryAbort(); + StateMachine::Action madeCommitDecisionDurable(); + + /** + * Marks this participant as having completed committing the transaction. + */ + StateMachine::Action recvCommitAck(const ShardId& shardId); + + // + // Any time + // + + /** + * Returns a Future which will be signaled when the TransactionCoordinator has successfully + * persisted a commit or abort decision. The resulting future will contain coordinator's + * decision. + */ + Future<TransactionCoordinator::CommitDecision> waitForDecision(); /** * Returns a Future which will be signaled when the TransactionCoordinator either commits @@ -185,9 +257,15 @@ public: Future<TransactionCoordinator::StateMachine::State> waitForCompletion(); /** - * Marks this participant as having completed committing the transaction. + * A tryAbort event is received by the coordinator when a transaction is implicitly aborted when + * a new transaction is received for the same session with a higher transaction number. */ - void recvCommitAck(const ShardId& shardId); + StateMachine::Action recvTryAbort(); + + std::set<ShardId> getParticipants() const { + invariant(_stateMachine.state() != StateMachine::State::kUninitialized); + return _participantList.getParticipants(); + } std::set<ShardId> getNonAckedCommitParticipants() const { return _participantList.getNonAckedCommitParticipants(); @@ -197,8 +275,8 @@ public: return _participantList.getNonVotedAbortParticipants(); } - Timestamp getCommitTimestamp() const { - return _participantList.getHighestPrepareTimestamp(); + boost::optional<Timestamp> getCommitTimestamp() const { + return _commitTimestamp; } StateMachine::State state() const { @@ -218,14 +296,23 @@ public: bool allParticipantsAckedCommit() const; Timestamp getHighestPrepareTimestamp() const; - + std::set<ShardId> getParticipants() const; std::set<ShardId> getNonAckedCommitParticipants() const; std::set<ShardId> getNonVotedAbortParticipants() const; class Participant { public: + /** + * This participant's vote, that is, whether the participant responded with success to + * prepareTransaction. + */ enum class Vote { kUnknown, kAbort, kCommit }; - enum class Ack { kNone, kCommit }; + + /** + * Whether this participant has acked the decision. + * TODO (SERVER-37924): Remove this enum and just track the ack as a bool. + */ + enum class Ack { kNone, kAbort, kCommit }; Vote vote{Vote::kUnknown}; Ack ack{Ack::kNone}; @@ -244,23 +331,27 @@ private: stdx::mutex _mutex; ParticipantList _participantList; StateMachine _stateMachine; + boost::optional<Timestamp> _commitTimestamp; }; inline StringBuilder& operator<<(StringBuilder& sb, const TransactionCoordinator::StateMachine::State& state) { using State = TransactionCoordinator::StateMachine::State; + // clang-format off switch (state) { - // clang-format off - case State::kWaitingForParticipantList: return sb << "kWaitingForParticipantlist"; + case State::kUninitialized: return sb << "kUninitialized"; + case State::kMakingParticipantListDurable: return sb << "kMakingParticipantListDurable"; case State::kWaitingForVotes: return sb << "kWaitingForVotes"; + case State::kMakingAbortDecisionDurable: return sb << "kMakingAbortDecisionsDurable"; + case State::kWaitingForAbortAcks: return sb << "kWaitingForAbortAcks"; case State::kAborted: return sb << "kAborted"; + case State::kMakingCommitDecisionDurable: return sb << "kMakingCommiDecisionsDurable"; case State::kWaitingForCommitAcks: return sb << "kWaitingForCommitAcks"; case State::kCommitted: return sb << "kCommitted"; case State::kBroken: return sb << "kBroken"; - // clang-format on - default: - MONGO_UNREACHABLE; }; + // clang-format on + MONGO_UNREACHABLE; } inline std::ostream& operator<<(std::ostream& os, @@ -273,17 +364,23 @@ inline std::ostream& operator<<(std::ostream& os, inline StringBuilder& operator<<(StringBuilder& sb, const TransactionCoordinator::StateMachine::Event& event) { using Event = TransactionCoordinator::StateMachine::Event; + // clang-format off switch (event) { - // clang-format off - case Event::kRecvVoteAbort: return sb << "kRecvVoteAbort"; - case Event::kRecvVoteCommit: return sb << "kRecvVoteCommit"; - case Event::kRecvParticipantList: return sb << "kRecvParticipantList"; - case Event::kRecvFinalVoteCommit: return sb << "kRecvFinalVoteCommit"; - case Event::kRecvFinalCommitAck: return sb << "kRecvFinalCommitAck"; - // clang-format on - default: - MONGO_UNREACHABLE; + case Event::kRecvParticipantList: return sb << "kRecvParticipantList"; + case Event::kMadeParticipantListDurable: return sb << "kMadeParticipantListDurable"; + case Event::kRecvVoteAbort: return sb << "kRecvVoteAbort"; + case Event::kMadeAbortDecisionDurable: return sb << "kMadeAbortDecisionDurable"; + case Event::kRecvAbortAck: return sb << "kRecvAbortAck"; + case Event::kRecvFinalAbortAck: return sb << "kRecvFinalAbortAck"; + case Event::kRecvVoteCommit: return sb << "kRecvVoteCommit"; + case Event::kRecvFinalVoteCommit: return sb << "kRecvFinalVoteCommit"; + case Event::kMadeCommitDecisionDurable: return sb << "kMadeCommitDecisionDurable"; + case Event::kRecvCommitAck: return sb << "kRecvCommitAck"; + case Event::kRecvFinalCommitAck: return sb << "kRecvFinalCommitAck"; + case Event::kRecvTryAbort: return sb << "kRecvTryAbort"; }; + // clang-format on + MONGO_UNREACHABLE; } inline std::ostream& operator<<(std::ostream& os, @@ -298,9 +395,14 @@ inline StringBuilder& operator<<(StringBuilder& sb, using Action = TransactionCoordinator::StateMachine::Action; // clang-format off switch (action) { - case Action::kSendCommit: return sb << "kSendCommit"; - case Action::kSendAbort: return sb << "kSendAbort"; - case Action::kNone: return sb << "kNone"; + case Action::kNone: return sb << "kNone"; + case Action::kWriteParticipantList: return sb << "kWriteParticipantList"; + case Action::kSendPrepare: return sb << "kSendPrepare"; + case Action::kWriteAbortDecision: return sb << "kWriteAbortDecision"; + case Action::kSendAbort: return sb << "kSendAbort"; + case Action::kWriteCommitDecision: return sb << "kWriteCommitDecision"; + case Action::kSendCommit: return sb << "kSendCommit"; + case Action::kDone: return sb << "kDone"; }; // clang-format on MONGO_UNREACHABLE; @@ -312,4 +414,5 @@ inline std::ostream& operator<<(std::ostream& os, sb << action; return os << sb.str(); } + } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp index 79e7de153b2..74b29971677 100644 --- a/src/mongo/db/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/transaction_coordinator_catalog.cpp @@ -78,22 +78,22 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::create(Lo return newCoordinator; } -boost::optional<std::shared_ptr<TransactionCoordinator>> TransactionCoordinatorCatalog::get( - LogicalSessionId lsid, TxnNumber txnNumber) { +std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(LogicalSessionId lsid, + TxnNumber txnNumber) { stdx::lock_guard<stdx::mutex> lk(_mutex); const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid); if (coordinatorsForSessionIter == _coordinatorsBySession.end()) { - return boost::none; + return nullptr; } const auto& coordinatorsForSession = coordinatorsForSessionIter->second; const auto& coordinatorForTxnIter = coordinatorsForSession.find(txnNumber); if (coordinatorForTxnIter == coordinatorsForSession.end()) { - return boost::none; + return nullptr; } return coordinatorForTxnIter->second; diff --git a/src/mongo/db/transaction_coordinator_catalog.h b/src/mongo/db/transaction_coordinator_catalog.h index bf28bf57e1f..85fb7609ab2 100644 --- a/src/mongo/db/transaction_coordinator_catalog.h +++ b/src/mongo/db/transaction_coordinator_catalog.h @@ -68,10 +68,9 @@ public: /** * Returns the coordinator with the given session id and transaction number, if it exists. If it - * does not exist, return boost::none. + * does not exist, return nullptr. */ - boost::optional<std::shared_ptr<TransactionCoordinator>> get(LogicalSessionId lsid, - TxnNumber txnNumber); + std::shared_ptr<TransactionCoordinator> get(LogicalSessionId lsid, TxnNumber txnNumber); /** * Returns the coordinator with the highest transaction number with the given session id, if it diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp index 69d7e2b7d37..5a16a26a4b9 100644 --- a/src/mongo/db/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp @@ -83,7 +83,7 @@ TEST_F(TransactionCoordinatorCatalogTest, GetOnSessionThatDoesNotExistReturnsNon TxnNumber txnNumber = 1; auto coordinator = coordinatorCatalog().get(lsid, txnNumber); - ASSERT_EQ(coordinator, boost::none); + ASSERT(coordinator == nullptr); } TEST_F(TransactionCoordinatorCatalogTest, @@ -92,7 +92,7 @@ TEST_F(TransactionCoordinatorCatalogTest, TxnNumber txnNumber = 1; createCoordinatorInCatalog(lsid, txnNumber); auto coordinatorInCatalog = coordinatorCatalog().get(lsid, txnNumber + 1); - ASSERT_EQ(coordinatorInCatalog, boost::none); + ASSERT(coordinatorInCatalog == nullptr); } @@ -101,7 +101,7 @@ TEST_F(TransactionCoordinatorCatalogTest, CreateFollowedByGetReturnsCoordinator) TxnNumber txnNumber = 1; createCoordinatorInCatalog(lsid, txnNumber); auto coordinatorInCatalog = coordinatorCatalog().get(lsid, txnNumber); - ASSERT_NOT_EQUALS(coordinatorInCatalog, boost::none); + ASSERT(coordinatorInCatalog != nullptr); } TEST_F(TransactionCoordinatorCatalogTest, SecondCreateForSessionDoesNotOverwriteFirstCreate) { @@ -112,7 +112,7 @@ TEST_F(TransactionCoordinatorCatalogTest, SecondCreateForSessionDoesNotOverwrite auto coordinator2 = createCoordinatorInCatalog(lsid, txnNumber2); auto coordinator1InCatalog = coordinatorCatalog().get(lsid, txnNumber1); - ASSERT_NOT_EQUALS(coordinator1InCatalog, boost::none); + ASSERT(coordinator1InCatalog != nullptr); } DEATH_TEST_F(TransactionCoordinatorCatalogTest, @@ -142,37 +142,41 @@ TEST_F(TransactionCoordinatorCatalogTest, ASSERT_EQ(latestTxnNumAndCoordinator->first, txnNumber); } -TEST_F(TransactionCoordinatorCatalogTest, - CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachCommittedState) { - using CoordinatorState = TransactionCoordinator::StateMachine::State; - - LogicalSessionId lsid = makeLogicalSessionIdForTest(); - TxnNumber txnNumber = 1; - auto coordinator = createCoordinatorInCatalog(lsid, txnNumber); - - coordinator->recvCoordinateCommit({ShardId("shard0000")}); - coordinator->recvVoteCommit(ShardId("shard0000"), dummyTimestamp); - coordinator->recvCommitAck(ShardId("shard0000")); - ASSERT_EQ(coordinator->state(), CoordinatorState::kCommitted); - - auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid); - ASSERT_FALSE(latestTxnNumAndCoordinator); -} - -TEST_F(TransactionCoordinatorCatalogTest, - CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachAbortedState) { - using CoordinatorState = TransactionCoordinator::StateMachine::State; - - LogicalSessionId lsid = makeLogicalSessionIdForTest(); - TxnNumber txnNumber = 1; - auto coordinator = createCoordinatorInCatalog(lsid, txnNumber); - - coordinator->recvVoteAbort(ShardId("shard0000")); - ASSERT_EQ(coordinator->state(), CoordinatorState::kAborted); +// TODO (SERVER-XXXX): Re-enable once coordinators are also participants and decision recovery +// works correctly. +// TEST_F(TransactionCoordinatorCatalogTest, +// CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachCommittedState) { +// using CoordinatorState = TransactionCoordinator::StateMachine::State; +// +// LogicalSessionId lsid = makeLogicalSessionIdForTest(); +// TxnNumber txnNumber = 1; +// auto coordinator = createCoordinatorInCatalog(lsid, txnNumber); +// +// coordinator->recvCoordinateCommit({ShardId("shard0000")}); +// coordinator->recvVoteCommit(ShardId("shard0000"), dummyTimestamp); +// coordinator->recvCommitAck(ShardId("shard0000")); +// ASSERT_EQ(coordinator->state(), CoordinatorState::kCommitted); +// +// auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid); +// ASSERT_FALSE(latestTxnNumAndCoordinator); +// } - auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid); - ASSERT_FALSE(latestTxnNumAndCoordinator); -} +// TODO (SERVER-XXXX): Re-enable once coordinators are also participants and decision recovery +// works correctly. +// TEST_F(TransactionCoordinatorCatalogTest, +// CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachAbortedState) { +// using CoordinatorState = TransactionCoordinator::StateMachine::State; +// +// LogicalSessionId lsid = makeLogicalSessionIdForTest(); +// TxnNumber txnNumber = 1; +// auto coordinator = createCoordinatorInCatalog(lsid, txnNumber); +// +// coordinator->recvVoteAbort(ShardId("shard0000")); +// ASSERT_EQ(coordinator->state(), CoordinatorState::kAborted); +// +// auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid); +// ASSERT_FALSE(latestTxnNumAndCoordinator); +// } TEST_F(TransactionCoordinatorCatalogTest, TwoCreatesFollowedByGetLatestOnSessionReturnsCoordinatorWithHighestTxnNumber) { diff --git a/src/mongo/db/transaction_coordinator_commands_impl.cpp b/src/mongo/db/transaction_coordinator_commands_impl.cpp index ce63dd84ef0..12896e1eb83 100644 --- a/src/mongo/db/transaction_coordinator_commands_impl.cpp +++ b/src/mongo/db/transaction_coordinator_commands_impl.cpp @@ -36,70 +36,72 @@ #include "mongo/client/remote_command_targeter.h" #include "mongo/db/commands/txn_cmds_gen.h" -#include "mongo/db/operation_context_session_mongod.h" +#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/concurrency/write_conflict_exception.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/repl/repl_client_info.h" +#include "mongo/executor/task_executor.h" #include "mongo/executor/task_executor_pool.h" #include "mongo/rpc/get_status_from_command_result.h" -#include "mongo/s/async_requests_sender.h" #include "mongo/s/grid.h" +#include "mongo/util/concurrency/notification.h" +#include "mongo/util/concurrency/thread_pool.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { namespace { +using Action = TransactionCoordinator::StateMachine::Action; +using State = TransactionCoordinator::StateMachine::State; +using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs; + /** * Finds the host and port for a shard. */ -StatusWith<HostAndPort> targetHost(OperationContext* opCtx, - const ShardId& shardId, - const ReadPreferenceSetting& readPref) { +StatusWith<HostAndPort> targetHost(const ShardId& shardId, const ReadPreferenceSetting& readPref) { auto shard = Grid::get(getGlobalServiceContext())->shardRegistry()->getShardNoReload(shardId); if (!shard) { return Status(ErrorCodes::ShardNotFound, str::stream() << "Could not find shard " << shardId); } - auto targeter = shard->getTargeter(); - return targeter->findHost(opCtx, readPref); + return shard->getTargeter()->findHostNoWait(readPref); } -using CallbackFn = stdx::function<void(Status status, const ShardId& shardID)>; +using CallbackFn = stdx::function<void( + const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj)>; /** * Sends the given command object to the given shard ID. If scheduling and running the command is * successful, calls the callback with the status of the command response and the shard ID. */ -void sendAsyncCommandToShard(OperationContext* opCtx, - executor::TaskExecutor* executor, +void sendAsyncCommandToShard(executor::TaskExecutor* executor, const ShardId& shardId, const BSONObj& commandObj, - CallbackFn callbackOnCommandResponse) { + const CallbackFn& callbackOnCommandResponse) { auto readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly); - auto swShardHostAndPort = targetHost(opCtx, shardId, readPref); - if (!swShardHostAndPort.isOK()) { + auto swShardHostAndPort = targetHost(shardId, readPref); + while (!swShardHostAndPort.isOK()) { LOG(3) << "Coordinator shard failed to target primary host of participant shard for " << commandObj << causedBy(swShardHostAndPort.getStatus()); - return; + swShardHostAndPort = targetHost(shardId, readPref); } executor::RemoteCommandRequest request( swShardHostAndPort.getValue(), "admin", commandObj, readPref.toContainingBSON(), nullptr); auto swCallbackHandle = executor->scheduleRemoteCommand( - request, - [commandObj, shardId, callbackOnCommandResponse]( - const executor::TaskExecutor::RemoteCommandCallbackArgs& args) { - - auto status = (!args.response.isOK()) ? args.response.status - : getStatusFromCommandResult(args.response.data); + request, [ commandObjOwned = commandObj.getOwned(), + shardId, + callbackOnCommandResponse ](const RemoteCommandCallbackArgs& args) { + LOG(3) << "Coordinator shard got response " << args.response.data << " for " + << commandObjOwned << " to " << shardId; - LOG(3) << "Coordinator shard got response " << status << " for " << commandObj << " to " - << shardId; - - // Only call callback if command successfully executed and got a response. - if (args.response.isOK()) { - callbackOnCommandResponse(status, shardId); - } + callbackOnCommandResponse(args, shardId, commandObjOwned); }); if (!swCallbackHandle.isOK()) { @@ -107,7 +109,8 @@ void sendAsyncCommandToShard(OperationContext* opCtx, << " to shard " << shardId << causedBy(swCallbackHandle.getStatus()); } - // Do not wait for the callback to run. + // Do not wait for the callback to run. The callback will reschedule the remote request on the + // same executor if necessary. } /** @@ -115,67 +118,216 @@ void sendAsyncCommandToShard(OperationContext* opCtx, * scheduling and running the command is successful, calls the callback with the status of the * command response and the shard ID. */ -void sendAsyncCommandToShards(OperationContext* opCtx, - const std::set<ShardId>& shardIds, - const BSONObj& commandObj, - CallbackFn callbackOnCommandResponse) { +void sendCommandToShards(OperationContext* opCtx, + const std::set<ShardId>& shardIds, + const BSONObj& commandObj, + const CallbackFn& callbackOnCommandResponse) { // TODO (SERVER-36638): Change to arbitrary task executor? Unit test only supports fixed // executor. auto exec = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); StringBuilder ss; ss << "["; - // For each non-acked participant, launch an async task to target its shard - // and then asynchronously send the command. for (const auto& shardId : shardIds) { - sendAsyncCommandToShard(opCtx, exec, shardId, commandObj, callbackOnCommandResponse); + sendAsyncCommandToShard(exec, shardId, commandObj, callbackOnCommandResponse); ss << shardId << " "; } - ss << "]"; LOG(3) << "Coordinator shard sending " << commandObj << " to " << ss.str(); } +void driveCoordinatorUntilDone(OperationContext* opCtx, + std::shared_ptr<TransactionCoordinator> coordinator, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber, + Action action) { + while (true) { + switch (action) { + case Action::kWriteParticipantList: + action = coordinator->madeParticipantListDurable(); + break; + case Action::kSendPrepare: + action = txn::sendPrepare( + opCtx, lsid, txnNumber, coordinator, coordinator->getParticipants()); + break; + case Action::kWriteAbortDecision: + action = coordinator->madeAbortDecisionDurable(); + break; + case Action::kSendAbort: + action = txn::sendAbort(opCtx, + lsid, + txnNumber, + coordinator, + coordinator->getNonVotedAbortParticipants()); + break; + case Action::kWriteCommitDecision: + action = coordinator->madeCommitDecisionDurable(); + break; + case Action::kSendCommit: + action = txn::sendCommit(opCtx, + lsid, + txnNumber, + coordinator, + coordinator->getNonAckedCommitParticipants(), + coordinator->getCommitTimestamp().get()); + break; + case Action::kDone: + return; + case Action::kNone: + // This means an event was delivered to the coordinator outside the expected order + // of events. + MONGO_UNREACHABLE; + } + } +} } // namespace namespace txn { -void sendCommit(OperationContext* opCtx, - std::shared_ptr<TransactionCoordinator> coordinator, - const std::set<ShardId>& nonAckedParticipants, - Timestamp commitTimestamp) { - invariant(coordinator); +Action sendPrepare(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber, + std::shared_ptr<TransactionCoordinator> coordinator, + const std::set<ShardId>& participants) { + PrepareTransaction prepareCmd; + prepareCmd.setDbName("admin"); + auto prepareObj = prepareCmd.toBSON( + BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false + << WriteConcernOptions::kWriteConcernField + << WriteConcernOptions::Majority)); + + auto actionNotification = std::make_shared<Notification<Action>>(); + CallbackFn prepareCallback; + prepareCallback = [coordinator, actionNotification, &prepareCallback]( + const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj) { + auto status = (!args.response.isOK()) ? args.response.status + : getStatusFromCommandResult(args.response.data); + + boost::optional<Action> action; + + if (status.isOK()) { + if (args.response.data["prepareTimestamp"].eoo() || + args.response.data["prepareTimestamp"].timestamp().isNull()) { + LOG(0) << "Coordinator shard received an OK response to prepareTransaction without " + "a prepareTimestamp from shard " + << shardId + << ", which is not expected behavior. Interpreting the response from " + << shardId << " as a vote to abort"; + action = coordinator->recvVoteAbort(shardId); + } else { + action = coordinator->recvVoteCommit( + shardId, args.response.data["prepareTimestamp"].timestamp()); + } + } else if (ErrorCodes::isVoteAbortError(status.code())) { + action = coordinator->recvVoteAbort(shardId); + } + + if (action) { + if (*action != Action::kNone) { + actionNotification->set(*action); + } + return; + } + + if (coordinator->state() != State::kWaitingForVotes) { + LOG(3) << "Coordinator shard not retrying prepare against " << shardId + << " because coordinator is no longer waiting for votes"; + } else { + LOG(3) << "Coordinator shard retrying " << commandObj << " against " << shardId; + sendAsyncCommandToShard(args.executor, shardId, commandObj, prepareCallback); + } + }; + + sendCommandToShards(opCtx, participants, prepareObj, prepareCallback); + return actionNotification->get(opCtx); +} + +Action sendCommit(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber, + std::shared_ptr<TransactionCoordinator> coordinator, + const std::set<ShardId>& nonAckedParticipants, + Timestamp commitTimestamp) { CommitTransaction commitTransaction; commitTransaction.setCommitTimestamp(commitTimestamp); commitTransaction.setDbName("admin"); - BSONObj commitObj = commitTransaction.toBSON(BSON( - "lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber" << *opCtx->getTxnNumber() - << "autocommit" - << false)); - - sendAsyncCommandToShards(opCtx, - nonAckedParticipants, - commitObj, - [coordinator](Status commandResponseStatus, const ShardId& shardId) { - // TODO (SERVER-36642): Also interpret TransactionTooOld as - // acknowledgment. - if (commandResponseStatus.isOK()) { - coordinator->recvCommitAck(shardId); - } - }); + BSONObj commitObj = commitTransaction.toBSON( + BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false)); + + auto actionNotification = std::make_shared<Notification<Action>>(); + CallbackFn commitCallback; + commitCallback = [coordinator, actionNotification, &commitCallback]( + const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj) { + auto status = (!args.response.isOK()) ? args.response.status + : getStatusFromCommandResult(args.response.data); + + if (status.isOK() || ErrorCodes::isVoteAbortError(status.code())) { + auto action = coordinator->recvCommitAck(shardId); + if (action != Action::kNone) { + actionNotification->set(action); + } + return; + } + + LOG(3) << "Coordinator shard retrying " << commandObj << " against " << shardId; + sendAsyncCommandToShard(args.executor, shardId, commandObj, commitCallback); + }; + sendCommandToShards(opCtx, nonAckedParticipants, commitObj, commitCallback); + + return actionNotification->get(opCtx); } -void sendAbort(OperationContext* opCtx, const std::set<ShardId>& nonVotedAbortParticipants) { +Action sendAbort(OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber, + std::shared_ptr<TransactionCoordinator> coordinator, + const std::set<ShardId>& nonVotedAbortParticipants) { // TODO (SERVER-36584) Use IDL to create command BSON. - BSONObj abortObj = BSON( - "abortTransaction" << 1 << "lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber" - << *opCtx->getTxnNumber() - << "autocommit" - << false); - - sendAsyncCommandToShards( - opCtx, nonVotedAbortParticipants, abortObj, [](Status, const ShardId&) {}); + BSONObj abortObj = + BSON("abortTransaction" << 1 << "lsid" << lsid.toBSON() << "txnNumber" << txnNumber + << "autocommit" + << false); + + auto actionNotification = std::make_shared<Notification<Action>>(); + + CallbackFn abortCallback; + abortCallback = [coordinator, actionNotification, &abortCallback]( + const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj) { + auto status = (!args.response.isOK()) ? args.response.status + : getStatusFromCommandResult(args.response.data); + + if (status.isOK() || ErrorCodes::isVoteAbortError(status.code())) { + auto action = coordinator->recvAbortAck(shardId); + if (action != Action::kNone) { + actionNotification->set(action); + } + return; + } + + LOG(3) << "Coordinator shard retrying " << commandObj << " against " << shardId; + sendAsyncCommandToShard(args.executor, shardId, commandObj, abortCallback); + + }; + sendCommandToShards(opCtx, nonVotedAbortParticipants, abortObj, abortCallback); + return actionNotification->get(opCtx); +} + + +void launchCoordinateCommitTask(ThreadPool& threadPool, + std::shared_ptr<TransactionCoordinator> coordinator, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber, + TransactionCoordinator::StateMachine::Action initialAction) { + auto ch = threadPool.schedule([coordinator, lsid, txnNumber, initialAction]() { + try { + // The opCtx destructor handles unsetting itself from the Client + auto opCtx = Client::getCurrent()->makeOperationContext(); + driveCoordinatorUntilDone(opCtx.get(), coordinator, lsid, txnNumber, initialAction); + } catch (const DBException& e) { + log() << "Exception was thrown while coordinating commit: " << causedBy(e.toStatus()); + } + }); } } // namespace txn diff --git a/src/mongo/db/transaction_coordinator_commands_impl.h b/src/mongo/db/transaction_coordinator_commands_impl.h index 5a211e424fc..da679b30dde 100644 --- a/src/mongo/db/transaction_coordinator_commands_impl.h +++ b/src/mongo/db/transaction_coordinator_commands_impl.h @@ -31,25 +31,54 @@ #pragma once #include "mongo/db/operation_context.h" +#include "mongo/db/transaction_commit_decision_gen.h" #include "mongo/db/transaction_coordinator.h" namespace mongo { +class ThreadPool; + namespace txn { +void launchCoordinateCommitTask(ThreadPool& threadPool, + std::shared_ptr<TransactionCoordinator> coordinator, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber, + TransactionCoordinator::StateMachine::Action initialAction); + +/** + * Schedules prepare to be sent asynchronously to all participants and blocks on being signaled that + * a voteAbort or the final voteCommit has been received. + */ +TransactionCoordinator::StateMachine::Action sendPrepare( + OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber, + std::shared_ptr<TransactionCoordinator> coordinator, + const std::set<ShardId>& participants); + /** - * Asynchronously sends commit to all participants provided and calls recvCommitAck on the - * coordinator if the commit command succeeds. + * Schedules commit to be sent asynchronously to all participants and blocks on being signaled that + * the final commit ack has been received. */ -void sendCommit(OperationContext* opCtx, - std::shared_ptr<TransactionCoordinator> coordinator, - const std::set<ShardId>& nonAckedParticipants, - Timestamp commitTimestamp); +TransactionCoordinator::StateMachine::Action sendCommit( + OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber, + std::shared_ptr<TransactionCoordinator> coordinator, + const std::set<ShardId>& nonAckedParticipants, + Timestamp commitTimestamp); /** - * Asynchronously sends abort to all participants provided. + * Schedules abort to be sent asynchronously to all participants and blocks on being signaled that + * the final abort ack has been received. */ -void sendAbort(OperationContext* opCtx, const std::set<ShardId>& nonVotedAbortParticipants); +TransactionCoordinator::StateMachine::Action sendAbort( + OperationContext* opCtx, + const LogicalSessionId& lsid, + const TxnNumber& txnNumber, + std::shared_ptr<TransactionCoordinator> coordinator, + const std::set<ShardId>& nonVotedAbortParticipants); } // namespace txn } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp index a1084c1d7c8..7767775296d 100644 --- a/src/mongo/db/transaction_coordinator_service.cpp +++ b/src/mongo/db/transaction_coordinator_service.cpp @@ -38,41 +38,48 @@ #include "mongo/db/service_context.h" #include "mongo/db/transaction_coordinator.h" #include "mongo/db/transaction_coordinator_commands_impl.h" -#include "mongo/executor/task_executor.h" +#include "mongo/rpc/get_status_from_command_result.h" +#include "mongo/s/grid.h" #include "mongo/s/shard_id.h" #include "mongo/util/log.h" namespace mongo { namespace { + const auto transactionCoordinatorServiceDecoration = ServiceContext::declareDecoration<TransactionCoordinatorService>(); -void doCoordinatorAction(OperationContext* opCtx, - std::shared_ptr<TransactionCoordinator> coordinator, - TransactionCoordinator::StateMachine::Action action) { - switch (action) { - case TransactionCoordinator::StateMachine::Action::kSendCommit: { - txn::sendCommit(opCtx, - coordinator, - coordinator->getNonAckedCommitParticipants(), - coordinator->getCommitTimestamp()); - break; - } - case TransactionCoordinator::StateMachine::Action::kSendAbort: { - txn::sendAbort(opCtx, coordinator->getNonVotedAbortParticipants()); - break; - } - case TransactionCoordinator::StateMachine::Action::kNone: - break; - } -} +using Action = TransactionCoordinator::StateMachine::Action; +using State = TransactionCoordinator::StateMachine::State; + +/** + * Constructs the default options for the thread pool used to run commit. + */ +ThreadPool::Options makeDefaultThreadPoolOptions() { + ThreadPool::Options options; + options.poolName = "TransactionCoordinatorService"; + options.minThreads = 0; + options.maxThreads = 20; + + // Ensure all threads have a client + options.onCreateThread = [](const std::string& threadName) { + Client::initThread(threadName.c_str()); + }; + return options; } +} // namespace + TransactionCoordinatorService::TransactionCoordinatorService() - : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {} + : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()), + _threadPool(makeDefaultThreadPoolOptions()) { + _threadPool.startup(); +} -TransactionCoordinatorService::~TransactionCoordinatorService() = default; +void TransactionCoordinatorService::shutdown() { + _threadPool.shutdown(); +} TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) { return get(opCtx->getServiceContext()); @@ -100,72 +107,42 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, "Cannot start a new transaction with the same session ID and transaction " "number as a transaction that has already begun two-phase commit.", latestCoordinator->state() == - TransactionCoordinator::StateMachine::State::kWaitingForParticipantList); + TransactionCoordinator::StateMachine::State::kUninitialized); return; } // Call tryAbort on previous coordinator. - auto actionToTake = latestCoordinator.get()->recvTryAbort(); - doCoordinatorAction(opCtx, latestCoordinator, actionToTake); + latestCoordinator.get()->recvTryAbort(); } _coordinatorCatalog->create(lsid, txnNumber); // TODO (SERVER-37024): Schedule abort task on executor to execute at commitDeadline. - // TODO (SERVER-37025): Schedule poke task on executor. } -Future<TransactionCoordinatorService::CommitDecision> -TransactionCoordinatorService::coordinateCommit(OperationContext* opCtx, - LogicalSessionId lsid, - TxnNumber txnNumber, - const std::set<ShardId>& participantList) { +Future<TransactionCoordinator::CommitDecision> TransactionCoordinatorService::coordinateCommit( + OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::set<ShardId>& participantList) { auto coordinator = _coordinatorCatalog->get(lsid, txnNumber); if (!coordinator) { - return TransactionCoordinatorService::CommitDecision::kAbort; + // TODO (SERVER-37440): Return decision "kForgotten", which indicates that a decision was + // already made and forgotten. The caller can recover the decision from the local + // participant if a higher transaction has not been started on the session and the session + // has not been reaped. + // Currently is MONGO_UNREACHABLE because no tests should cause the router to re-send + // coordinateCommitTransaction. + MONGO_UNREACHABLE; } - auto actionToTake = coordinator.get()->recvCoordinateCommit(participantList); - doCoordinatorAction(opCtx, coordinator.get(), actionToTake); - - return coordinator.get()->waitForCompletion().then([](auto finalState) { - switch (finalState) { - case TransactionCoordinator::StateMachine::State::kAborted: - return TransactionCoordinatorService::CommitDecision::kAbort; - case TransactionCoordinator::StateMachine::State::kCommitted: - return TransactionCoordinatorService::CommitDecision::kCommit; - default: - MONGO_UNREACHABLE; - } - }); -} - -void TransactionCoordinatorService::voteCommit(OperationContext* opCtx, - LogicalSessionId lsid, - TxnNumber txnNumber, - const ShardId& shardId, - Timestamp prepareTimestamp) { - auto coordinator = _coordinatorCatalog->get(lsid, txnNumber); - if (!coordinator) { - txn::sendAbort(opCtx, {shardId}); - return; + Action initialAction = coordinator->recvCoordinateCommit(participantList); + if (initialAction != Action::kNone) { + txn::launchCoordinateCommitTask(_threadPool, coordinator, lsid, txnNumber, initialAction); } - auto actionToTake = coordinator.get()->recvVoteCommit(shardId, prepareTimestamp); - doCoordinatorAction(opCtx, coordinator.get(), actionToTake); -} - -void TransactionCoordinatorService::voteAbort(OperationContext* opCtx, - LogicalSessionId lsid, - TxnNumber txnNumber, - const ShardId& shardId) { - auto coordinator = _coordinatorCatalog->get(lsid, txnNumber); - - if (coordinator) { - auto actionToTake = coordinator.get()->recvVoteAbort(shardId); - doCoordinatorAction(opCtx, coordinator.get(), actionToTake); - } + return coordinator.get()->waitForDecision(); } } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h index 5eb8e6c7f66..c7c4ab77e46 100644 --- a/src/mongo/db/transaction_coordinator_service.h +++ b/src/mongo/db/transaction_coordinator_service.h @@ -34,7 +34,9 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/logical_session_id.h" +#include "mongo/db/transaction_coordinator.h" #include "mongo/db/transaction_coordinator_catalog.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/future.h" namespace mongo { @@ -47,13 +49,13 @@ class TransactionCoordinatorService final { MONGO_DISALLOW_COPYING(TransactionCoordinatorService); public: - enum class CommitDecision { - kCommit, - kAbort, - }; - TransactionCoordinatorService(); - ~TransactionCoordinatorService(); + ~TransactionCoordinatorService() = default; + + /** + * Shuts down the thread pool used for executing commits. + */ + void shutdown(); /** * Retrieves the TransactionCoordinatorService associated with the service or operation context. @@ -75,37 +77,17 @@ public: * Delivers coordinateCommit to the TransactionCoordinator, asynchronously sends commit or * abort to participants if necessary, and returns a Future that will contain the commit * decision when the transaction finishes committing or aborting. - * - * TODO (SERVER-37364): On the commit path, this Future should instead be signaled as soon as - * the coordinator is finished persisting the commit decision, rather than waiting until the - * commit process has been completed entirely. */ - Future<CommitDecision> coordinateCommit(OperationContext* opCtx, - LogicalSessionId lsid, - TxnNumber txnNumber, - const std::set<ShardId>& participantList); - - /** - * Delivers voteCommit to the TransactionCoordinator and asynchronously sends commit or abort to - * participants if necessary. - */ - void voteCommit(OperationContext* opCtx, - LogicalSessionId lsid, - TxnNumber txnNumber, - const ShardId& shardId, - Timestamp prepareTimestamp); - - /** - * Delivers voteAbort on the TransactionCoordinator and asynchronously sends commit or abort to - * participants if necessary. - */ - void voteAbort(OperationContext* opCtx, - LogicalSessionId lsid, - TxnNumber txnNumber, - const ShardId& shardId); + Future<TransactionCoordinator::CommitDecision> coordinateCommit( + OperationContext* opCtx, + LogicalSessionId lsid, + TxnNumber txnNumber, + const std::set<ShardId>& participantList); private: std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog; + + ThreadPool _threadPool; }; } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp index 27d6a034821..3034e117bcc 100644 --- a/src/mongo/db/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/transaction_coordinator_service_test.cpp @@ -1,4 +1,3 @@ - /** * Copyright (C) 2018-present MongoDB, Inc. * @@ -34,8 +33,10 @@ #include "mongo/client/remote_command_targeter_mock.h" #include "mongo/db/commands/txn_cmds_gen.h" +#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/operation_context.h" #include "mongo/db/session_catalog.h" +#include "mongo/db/transaction_coordinator_commands_impl.h" #include "mongo/db/transaction_coordinator_service.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" @@ -55,7 +56,9 @@ const std::set<ShardId> kThreeShardIdSet{{"s1"}, {"s2"}, {"s3"}}; const Timestamp kDummyTimestamp = Timestamp::min(); const Date_t kCommitDeadline = Date_t::max(); const StatusWith<BSONObj> kRetryableError = {ErrorCodes::HostUnreachable, ""}; +const StatusWith<BSONObj> kNoSuchTransaction = {ErrorCodes::NoSuchTransaction, ""}; const StatusWith<BSONObj> kOk = BSON("ok" << 1); +const StatusWith<BSONObj> kPrepareOk = BSON("ok" << 1 << "prepareTimestamp" << Timestamp(1, 1)); HostAndPort makeHostAndPort(const ShardId& shardId) { return HostAndPort(str::stream() << shardId << ":123"); @@ -66,9 +69,6 @@ public: void setUp() override { ShardServerTestFixture::setUp(); - operationContext()->setLogicalSessionId(_lsid); - operationContext()->setTxnNumber(_txnNumber); - for (const auto& shardId : kThreeShardIdList) { auto shardTargeter = RemoteCommandTargeterMock::get( uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId)) @@ -89,6 +89,14 @@ public: }); } + void assertPrepareSentAndRespondWithSuccess() { + assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kPrepareOk); + } + + void assertPrepareSentAndRespondWithNoSuchTransaction() { + assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kNoSuchTransaction); + } + void assertAbortSentAndRespondWithSuccess() { assertCommandSentAndRespondWith("abortTransaction", kOk); } @@ -125,9 +133,8 @@ public: auto commitDecisionFuture = coordinatorService.coordinateCommit( operationContext(), lsid, txnNumber, transactionParticipantShards); - for (const auto& shardId : transactionParticipantShards) { - coordinatorService.voteCommit( - operationContext(), lsid, txnNumber, shardId, kDummyTimestamp); + for (size_t i = 0; i < transactionParticipantShards.size(); ++i) { + assertPrepareSentAndRespondWithSuccess(); } for (size_t i = 0; i < transactionParticipantShards.size(); ++i) { @@ -150,7 +157,13 @@ public: auto commitDecisionFuture = coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet); - coordinatorService.voteAbort(operationContext(), lsid, txnNumber, abortingShard); + for (size_t i = 0; i < shardIdSet.size(); ++i) { + assertPrepareSentAndRespondWithNoSuchTransaction(); + } + + // Abort gets sent to the second participant as soon as the first participant + // receives a not-okay response to prepare. + assertAbortSentAndRespondWithSuccess(); // Wait for abort to complete. commitDecisionFuture.get(); @@ -218,7 +231,6 @@ private: std::unique_ptr<TransactionCoordinatorService> _coordinatorService; }; - } // namespace TEST_F(TransactionCoordinatorServiceTest, CreateCoordinatorOnNewSessionSucceeds) { @@ -253,31 +265,6 @@ TEST_F(TransactionCoordinatorServiceTest, } TEST_F(TransactionCoordinatorServiceTest, - CreateCoordinatorWithHigherTxnNumberThanOngoingUncommittedTxnAbortsPreviousTxnAndSucceeds) { - - TransactionCoordinatorService coordinatorService; - coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline); - - // This is currently the only way we have to get the commit decision. - auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - - // Create a coordinator for a higher transaction number in the same session. - coordinatorService.createCoordinator( - operationContext(), lsid(), txnNumber() + 1, kCommitDeadline); - - assertAbortSentAndRespondWithSuccess(); - assertAbortSentAndRespondWithSuccess(); - - // We should have aborted the previous transaction. - ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()), - static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort)); - - // Make sure the newly created one works fine. - commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet); -} - -TEST_F(TransactionCoordinatorServiceTest, CreateCoordinatorWithHigherTxnNumberThanOngoingCommittingTxnCommitsPreviousTxnAndSucceeds) { TransactionCoordinatorService coordinatorService; @@ -287,16 +274,18 @@ TEST_F(TransactionCoordinatorServiceTest, // commit acks. auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - coordinatorService.voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - coordinatorService.voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp); + + // Simulate all participants acking prepare/voting to commit. + assertPrepareSentAndRespondWithSuccess(); + assertPrepareSentAndRespondWithSuccess(); // Create a coordinator for a higher transaction number in the same session. This should // "tryAbort" on the old coordinator which should NOT abort it since it's already waiting for // commit acks. coordinatorService.createCoordinator( operationContext(), lsid(), txnNumber() + 1, kCommitDeadline); + auto newTxnCommitDecisionFuture = coordinatorService.coordinateCommit( + operationContext(), lsid(), txnNumber() + 1, kTwoShardIdSet); // Finish committing the old transaction by sending it commit acks from both participants. assertCommitSentAndRespondWithSuccess(); @@ -304,35 +293,44 @@ TEST_F(TransactionCoordinatorServiceTest, // The old transaction should now be committed. ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()), - static_cast<int>(TransactionCoordinatorService::CommitDecision::kCommit)); + static_cast<int>(TransactionCoordinator::CommitDecision::kCommit)); // Make sure the newly created one works fine too. - commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet); + assertPrepareSentAndRespondWithSuccess(); + assertPrepareSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + assertCommitSentAndRespondWithSuccess(); + // commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet); } TEST_F(TransactionCoordinatorServiceTestSingleTxn, - CoordinateCommitWithNoVotesReturnsNotReadyFuture) { + CoordinateCommitReturnsCorrectCommitDecisionOnAbort) { auto commitDecisionFuture = coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - ASSERT_FALSE(commitDecisionFuture.isReady()); - // To prevent invariant failure in TransactionCoordinator that all futures have been completed. - abortTransaction( - *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[0]); + // Simulate a participant voting to abort. + assertPrepareSentAndRespondWithNoSuchTransaction(); + assertPrepareSentAndRespondWithSuccess(); + + // Only send abort to the node that voted to commit. + assertAbortSentAndRespondWithSuccess(); + + auto commitDecision = commitDecisionFuture.get(); + ASSERT_EQ(static_cast<int>(commitDecision), + static_cast<int>(TransactionCoordinator::CommitDecision::kAbort)); } TEST_F(TransactionCoordinatorServiceTestSingleTxn, - CoordinateCommitReturnsCorrectCommitDecisionOnAbort) { + CoordinateCommitWithNoVotesReturnsNotReadyFuture) { auto commitDecisionFuture = coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]); - - auto commitDecision = commitDecisionFuture.get(); - ASSERT_EQ(static_cast<int>(commitDecision), - static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort)); + ASSERT_FALSE(commitDecisionFuture.isReady()); + // To prevent invariant failure in TransactionCoordinator that all futures have been completed. + abortTransaction( + *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[0]); } TEST_F(TransactionCoordinatorServiceTestSingleTxn, @@ -341,31 +339,14 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, auto commitDecisionFuture = coordinatorService()->coordinateCommit( operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp); - + assertPrepareSentAndRespondWithSuccess(); + assertPrepareSentAndRespondWithSuccess(); assertCommitSentAndRespondWithSuccess(); assertCommitSentAndRespondWithSuccess(); auto commitDecision = commitDecisionFuture.get(); ASSERT_EQ(static_cast<int>(commitDecision), - static_cast<int>(TransactionCoordinatorService::CommitDecision::kCommit)); -} - -TEST_F(TransactionCoordinatorServiceTest, - CoordinateCommitReturnsAbortDecisionWhenCoordinatorDoesNotExist) { - - TransactionCoordinatorService coordinatorService; - auto commitDecisionFuture = coordinatorService.coordinateCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - ASSERT_TRUE(commitDecisionFuture.isReady()); - - auto commitDecision = commitDecisionFuture.get(); - ASSERT_EQ(static_cast<int>(commitDecision), - static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort)); + static_cast<int>(TransactionCoordinator::CommitDecision::kCommit)); } TEST_F(TransactionCoordinatorServiceTest, @@ -409,152 +390,4 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn, static_cast<int>(commitDecisionFuture2.get())); } -TEST_F(TransactionCoordinatorServiceTestSingleTxn, - VoteCommitDoesNotSendCommitIfParticipantListNotYetReceived) { - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - - assertNoMessageSent(); - // To prevent invariant failure in TransactionCoordinator that all futures have been completed. - abortTransaction( - *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[1]); -} - -TEST_F(TransactionCoordinatorServiceTestSingleTxn, - ResentVoteCommitDoesNotSendCommitIfParticipantListNotYetReceived) { - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - - assertNoMessageSent(); - - // To prevent invariant failure in TransactionCoordinator that all futures have been completed. - abortTransaction( - *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[1]); -} - -TEST_F(TransactionCoordinatorServiceTestSingleTxn, - ResentVoteCommitDoesNotSendCommitIfParticipantListHasBeenReceived) { - - auto commitDecisionFuture = coordinatorService()->coordinateCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - - assertNoMessageSent(); - - // To prevent invariant failure in TransactionCoordinator that all futures have been completed. - abortTransaction( - *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[1]); - commitDecisionFuture.get(); -} - -TEST_F(TransactionCoordinatorServiceTestSingleTxn, FinalVoteCommitSendsCommit) { - auto commitDecisionFuture = coordinatorService()->coordinateCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp); - - assertCommitSentAndRespondWithSuccess(); - assertCommitSentAndRespondWithSuccess(); -} - -// This logic is obviously correct for a transaction which has been aborted prior to receiving -// coordinateCommit, when the coordinator does not yet know all participants and so cannot send -// abortTransaction to all participants. In this case, it can potentially receive voteCommit -// messages from some participants even after the local TransactionCoordinator object has -// transitioned to the aborted state and then removed from the service. We then must tell -// the participant that sent the voteCommit message that it should abort. -// -// More subtly, it also works for voteCommit retries for transactions that have already committed, -// because we'll send abort to the participant, and the abort command will just receive -// NoSuchTransaction or TransactionTooOld (because the participant must have already committed if -// the transaction coordinator finished committing). -TEST_F(TransactionCoordinatorServiceTest, - VoteCommitForCoordinatorThatDoesNotExistSendsVoteAbortToCallingParticipant) { - - TransactionCoordinatorService coordinatorService; - coordinatorService.voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - - assertAbortSentAndRespondWithSuccess(); -} - -TEST_F(TransactionCoordinatorServiceTestSingleTxn, - ResentFinalVoteCommitOnlySendsCommitToNonAckedParticipants) { - - auto commitDecisionFuture = coordinatorService()->coordinateCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdSet); - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp); - - assertCommitSentAndRespondWithSuccess(); - assertCommitSentAndRespondWithRetryableError(); - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp); - - assertCommitSentAndRespondWithSuccess(); -} - -TEST_F(TransactionCoordinatorServiceTestSingleTxn, - VoteAbortDoesNotSendAbortIfIsOnlyVoteReceivedSoFar) { - - coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]); - - assertNoMessageSent(); -} - -TEST_F(TransactionCoordinatorServiceTestSingleTxn, - VoteAbortForCoordinatorThatDoesNotExistDoesNotSendAbort) { - - coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]); - // Coordinator no longer exists. - coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]); - - assertNoMessageSent(); -} - -TEST_F(TransactionCoordinatorServiceTestSingleTxn, - VoteAbortSendsAbortIfSomeParticipantsHaveVotedCommit) { - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp); - - coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[1]); - - // This should be sent to the shard that voted commit (s1). - assertAbortSentAndRespondWithSuccess(); -} - -TEST_F(TransactionCoordinatorServiceTestSingleTxn, - VoteAbortAfterReceivingParticipantListSendsAbortToAllParticipantsWhoHaventVotedAbort) { - - auto commitDecisionFuture = coordinatorService()->coordinateCommit( - operationContext(), lsid(), txnNumber(), kThreeShardIdSet); - - coordinatorService()->voteCommit( - operationContext(), lsid(), txnNumber(), kThreeShardIdList[0], kDummyTimestamp); - - coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kThreeShardIdList[1]); - - // Should send abort to shards s1 and s3 (the ones that did not vote abort). - assertAbortSentAndRespondWithSuccess(); - assertAbortSentAndRespondWithSuccess(); -} - } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_state_machine_test.cpp b/src/mongo/db/transaction_coordinator_state_machine_test.cpp index 4fe4a0fcac2..da97c29f0da 100644 --- a/src/mongo/db/transaction_coordinator_state_machine_test.cpp +++ b/src/mongo/db/transaction_coordinator_state_machine_test.cpp @@ -70,15 +70,49 @@ void expectScheduleThrows(Schedule schedule) { ASSERT_EQ(State::kBroken, coordinator.state()); } + +void doCommit(StateMachine& coordinator) { + runSchedule(coordinator, + {Event::kRecvParticipantList, + Event::kMadeParticipantListDurable, + Event::kRecvFinalVoteCommit, + Event::kMadeCommitDecisionDurable, + Event::kRecvFinalCommitAck}); +} + +void doAbort(StateMachine& coordinator) { + runSchedule(coordinator, + {Event::kRecvParticipantList, + Event::kMadeParticipantListDurable, + Event::kRecvVoteAbort, + Event::kMadeAbortDecisionDurable, + Event::kRecvFinalAbortAck}); +} + TEST(CoordinatorStateMachine, AbortSucceeds) { - expectScheduleSucceeds({Event::kRecvVoteAbort}, State::kAborted); - expectScheduleSucceeds({Event::kRecvVoteAbort, Event::kRecvVoteAbort}, State::kAborted); + expectScheduleSucceeds({Event::kRecvParticipantList, + Event::kMadeParticipantListDurable, + Event::kRecvVoteAbort, + Event::kMadeAbortDecisionDurable, + Event::kRecvFinalAbortAck}, + State::kAborted); + // Check that it's okay to receive two vote aborts. + expectScheduleSucceeds({Event::kRecvParticipantList, + Event::kMadeParticipantListDurable, + Event::kRecvVoteAbort, + Event::kRecvVoteAbort, + Event::kMadeAbortDecisionDurable, + Event::kRecvFinalAbortAck}, + State::kAborted); } TEST(CoordinatorStateMachine, CommitSucceeds) { - expectScheduleSucceeds( - {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit, Event::kRecvFinalCommitAck}, - State::kCommitted); + expectScheduleSucceeds({Event::kRecvParticipantList, + Event::kMadeParticipantListDurable, + Event::kRecvFinalVoteCommit, + Event::kMadeCommitDecisionDurable, + Event::kRecvFinalCommitAck}, + State::kCommitted); } TEST(CoordinatorStateMachine, RecvFinalVoteCommitAndRecvVoteAbortThrows) { @@ -90,7 +124,7 @@ TEST(CoordinatorStateMachine, RecvFinalVoteCommitAndRecvVoteAbortThrows) { TEST(CoordinatorStateMachine, WaitForTransitionToOnlyTerminalStatesReturnsCorrectStateOnAbort) { StateMachine coordinator; auto future = coordinator.waitForTransitionTo({State::kCommitted, State::kAborted}); - runSchedule(coordinator, {Event::kRecvVoteAbort}); + doAbort(coordinator); ASSERT_EQ(future.get(), State::kAborted); } @@ -100,15 +134,13 @@ TEST(CoordinatorStateMachine, WaitForTransitionToStatesThatHaventBeenReachedRetu ASSERT_FALSE(future.isReady()); // We need to abort here because we require that all promises are triggered prior to coordinator // destruction. - runSchedule(coordinator, {Event::kRecvVoteAbort}); + doAbort(coordinator); } TEST(CoordinatorStateMachine, WaitForTransitionToOnlyTerminalStatesReturnsCorrectStateOnCommit) { StateMachine coordinator; auto future = coordinator.waitForTransitionTo({State::kCommitted, State::kAborted}); - runSchedule( - coordinator, - {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit, Event::kRecvFinalCommitAck}); + doCommit(coordinator); ASSERT_EQ(future.get(), State::kCommitted); } @@ -117,8 +149,9 @@ TEST(CoordinatorStateMachine, StateMachine coordinator; runSchedule(coordinator, {Event::kRecvParticipantList}); auto future = coordinator.waitForTransitionTo( - {State::kWaitingForVotes, State::kCommitted, State::kAborted}); - ASSERT_EQ(future.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes); + {State::kMakingParticipantListDurable, State::kCommitted, State::kAborted}); + ASSERT_EQ(future.get(), + TransactionCoordinator::StateMachine::State::kMakingParticipantListDurable); } TEST(CoordinatorStateMachine, WaitForTransitionToMultipleStatesReturnsFirstStateToBeHit) { @@ -128,7 +161,7 @@ TEST(CoordinatorStateMachine, WaitForTransitionToMultipleStatesReturnsFirstState State::kCommitted, State::kAborted}); - runSchedule(coordinator, {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit}); + doCommit(coordinator); ASSERT_EQ(future.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes); } @@ -140,7 +173,7 @@ TEST(CoordinatorStateMachine, {State::kWaitingForVotes, State::kCommitted, State::kAborted}); auto future2 = coordinator.waitForTransitionTo( {State::kWaitingForCommitAcks, State::kCommitted, State::kAborted}); - runSchedule(coordinator, {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit}); + doCommit(coordinator); ASSERT_EQ(future1.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes); ASSERT_EQ(future2.get(), TransactionCoordinator::StateMachine::State::kWaitingForCommitAcks); @@ -163,7 +196,7 @@ TEST(CoordinatorStateMachine, coordinator.waitForTransitionTo( {State::kWaitingForCommitAcks, State::kCommitted, State::kAborted})}; - runSchedule(coordinator, {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit}); + doCommit(coordinator); for (auto& future1 : futures1) { ASSERT_EQ(future1.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes); diff --git a/src/mongo/db/transaction_coordinator_test.cpp b/src/mongo/db/transaction_coordinator_test.cpp index 6f154b3268b..458ff30244e 100644 --- a/src/mongo/db/transaction_coordinator_test.cpp +++ b/src/mongo/db/transaction_coordinator_test.cpp @@ -34,135 +34,107 @@ #include <future> +#include "mongo/db/service_context_d_test_fixture.h" #include "mongo/unittest/unittest.h" namespace mongo { +namespace { + using State = TransactionCoordinator::StateMachine::State; +using Coordinator = ServiceContextMongoDTest; const Timestamp dummyTimestamp = Timestamp::min(); -TEST(Coordinator, SomeParticipantVotesAbortLeadsToAbort) { - TransactionCoordinator coordinator; - coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); - coordinator.recvVoteAbort(ShardId("shard0000")); - coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp); - ASSERT_EQ(State::kAborted, coordinator.state()); +void doCommit(TransactionCoordinator& coordinator) { + coordinator.recvCoordinateCommit({ShardId("shard0000")}); + coordinator.madeParticipantListDurable(); + coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp); + coordinator.madeCommitDecisionDurable(); + coordinator.recvCommitAck(ShardId("shard0000")); } -TEST(Coordinator, SomeParticipantsVoteAbortBeforeCoordinatorReceivesParticipantListLeadsToAbort) { - TransactionCoordinator coordinator; +void doAbort(TransactionCoordinator& coordinator) { + coordinator.recvCoordinateCommit({ShardId("shard0000")}); + coordinator.madeParticipantListDurable(); coordinator.recvVoteAbort(ShardId("shard0000")); - coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); - coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp); - ASSERT_EQ(State::kAborted, coordinator.state()); + coordinator.madeAbortDecisionDurable(); + coordinator.recvAbortAck(ShardId("shard0000")); +} } -TEST(Coordinator, AllParticipantsVoteCommitLeadsToCommit) { +TEST_F(Coordinator, SomeParticipantVotesAbortLeadsToAbort) { TransactionCoordinator coordinator; coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); - coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp); + coordinator.madeParticipantListDurable(); + coordinator.recvVoteAbort(ShardId("shard0000")); coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp); - coordinator.recvCommitAck(ShardId("shard0000")); - coordinator.recvCommitAck(ShardId("shard0001")); - ASSERT_EQ(State::kCommitted, coordinator.state()); + coordinator.madeAbortDecisionDurable(); + coordinator.recvAbortAck(ShardId("shard0001")); + ASSERT_EQ(State::kAborted, coordinator.state()); } -TEST( - Coordinator, - AllParticipantsVoteCommitSomeParticipantsVoteBeforeCoordinatorReceivesParticipantListLeadsToCommit) { +TEST_F(Coordinator, AllParticipantsVoteCommitLeadsToCommit) { TransactionCoordinator coordinator; - coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp); coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); + coordinator.madeParticipantListDurable(); + coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp); coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp); + coordinator.madeCommitDecisionDurable(); coordinator.recvCommitAck(ShardId("shard0000")); coordinator.recvCommitAck(ShardId("shard0001")); ASSERT_EQ(State::kCommitted, coordinator.state()); } -TEST(Coordinator, NotHearingSomeParticipantsVoteOtherParticipantsVotedCommitLeadsToStillWaiting) { +TEST_F(Coordinator, NotHearingSomeParticipantsVoteOtherParticipantsVotedCommitLeadsToStillWaiting) { TransactionCoordinator coordinator; coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); + coordinator.madeParticipantListDurable(); coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp); ASSERT_EQ(State::kWaitingForVotes, coordinator.state()); } -TEST(Coordinator, NotHearingSomeParticipantsVoteAnotherParticipantVotedAbortLeadsToAbort) { +TEST_F(Coordinator, NotHearingSomeParticipantsVoteAnotherParticipantVotedAbortLeadsToAbort) { TransactionCoordinator coordinator; coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); + coordinator.madeParticipantListDurable(); coordinator.recvVoteAbort(ShardId("shard0000")); - ASSERT_EQ(State::kAborted, coordinator.state()); + ASSERT_EQ(State::kMakingAbortDecisionDurable, coordinator.state()); } -TEST(Coordinator, NotHearingSomeParticipantsCommitAckLeadsToStillWaiting) { +TEST_F(Coordinator, NotHearingSomeParticipantsCommitAckLeadsToStillWaiting) { TransactionCoordinator coordinator; coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); + coordinator.madeParticipantListDurable(); coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp); coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp); + coordinator.madeCommitDecisionDurable(); coordinator.recvCommitAck(ShardId("shard0000")); ASSERT_EQ(State::kWaitingForCommitAcks, coordinator.state()); } -TEST(Coordinator, TryAbortWhileWaitingForParticipantListSuccessfullyAborts) { - TransactionCoordinator coordinator; - coordinator.recvTryAbort(); - ASSERT_EQ(State::kAborted, coordinator.state()); -} - -TEST(Coordinator, TryAbortWhileWaitingForVotesSuccessfullyAborts) { - TransactionCoordinator coordinator; - coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); - coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp); - coordinator.recvTryAbort(); - ASSERT_EQ(State::kAborted, coordinator.state()); -} - -TEST(Coordinator, TryAbortWhileWaitingForCommitAcksDoesNotCancelCommit) { - TransactionCoordinator coordinator; - coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); - coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp); - coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp); - ASSERT_EQ(State::kWaitingForCommitAcks, coordinator.state()); - coordinator.recvTryAbort(); - ASSERT_EQ(State::kWaitingForCommitAcks, coordinator.state()); - coordinator.recvCommitAck(ShardId("shard0000")); - coordinator.recvCommitAck(ShardId("shard0001")); - ASSERT_EQ(State::kCommitted, coordinator.state()); -} - -TEST(Coordinator, VoteCommitToAbortedCoordinatorRespondsWithAbort) { - TransactionCoordinator coordinator; - coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")}); - coordinator.recvVoteAbort(ShardId("shard0000")); - ASSERT_EQ(State::kAborted, coordinator.state()); - auto action = coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp); - ASSERT_EQ(TransactionCoordinator::StateMachine::Action::kSendAbort, action); -} - -TEST(Coordinator, WaitForCompletionReturnsOnChangeToCommitted) { +TEST_F(Coordinator, WaitForCompletionReturnsOnChangeToCommitted) { TransactionCoordinator coordinator; auto future = coordinator.waitForCompletion(); - coordinator.recvCoordinateCommit({ShardId("shard0000")}); - coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp); - coordinator.recvCommitAck(ShardId("shard0000")); + doCommit(coordinator); auto finalState = future.get(); ASSERT_EQ(finalState, TransactionCoordinator::StateMachine::State::kCommitted); } -TEST(Coordinator, WaitForCompletionReturnsOnChangeToAborted) { +TEST_F(Coordinator, WaitForCompletionReturnsOnChangeToAborted) { TransactionCoordinator coordinator; auto future = coordinator.waitForCompletion(); - coordinator.recvVoteAbort(ShardId("shard0000")); + doAbort(coordinator); auto finalState = future.get(); ASSERT_EQ(finalState, TransactionCoordinator::StateMachine::State::kAborted); } -TEST(Coordinator, RepeatedCallsToWaitForCompletionAllReturn) { +TEST_F(Coordinator, RepeatedCallsToWaitForCompletionAllReturn) { TransactionCoordinator coordinator; auto futures = {coordinator.waitForCompletion(), coordinator.waitForCompletion(), coordinator.waitForCompletion()}; - coordinator.recvVoteAbort(ShardId("shard0000")); + doAbort(coordinator); for (auto& future : futures) { auto finalState = future.get(); @@ -170,9 +142,9 @@ TEST(Coordinator, RepeatedCallsToWaitForCompletionAllReturn) { } } -TEST(Coordinator, CallingWaitForCompletionAfterAlreadyCompleteReturns) { +TEST_F(Coordinator, CallingWaitForCompletionAfterAlreadyCompleteReturns) { TransactionCoordinator coordinator; - coordinator.recvVoteAbort(ShardId("shard0000")); + doAbort(coordinator); auto future = coordinator.waitForCompletion(); auto finalState = future.get(); diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 07217ec089f..5b6ed2d3c02 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -28,7 +28,7 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction #include "mongo/platform/basic.h" @@ -614,47 +614,23 @@ Shard::CommandResponse TransactionRouter::_commitSingleShardTransaction(Operatio Shard::CommandResponse TransactionRouter::_commitMultiShardTransaction(OperationContext* opCtx) { invariant(_coordinatorId); - - auto shardRegistry = Grid::get(opCtx)->shardRegistry(); - - PrepareTransaction prepareCmd; - prepareCmd.setDbName("admin"); - prepareCmd.setCoordinatorId(*_coordinatorId); - - auto prepareCmdObj = prepareCmd.toBSON( - BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority)); + auto coordinatorIter = _participants.find(*_coordinatorId); + invariant(coordinatorIter != _participants.end()); std::vector<CommitParticipant> participantList; for (const auto& participantEntry : _participants) { - ShardId shardId(participantEntry.first); - CommitParticipant commitParticipant; - commitParticipant.setShardId(shardId); + commitParticipant.setShardId(participantEntry.first); participantList.push_back(std::move(commitParticipant)); - - if (participantEntry.second.isCoordinator()) { - // coordinateCommit is sent to participant that is also a coordinator. - invariant(shardId == *_coordinatorId); - continue; - } - - const auto& participant = participantEntry.second; - auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); - shard->runFireAndForgetCommand(opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - participant.attachTxnFieldsIfNeeded(prepareCmdObj, false)); } - auto coordinatorShard = uassertStatusOK(shardRegistry->getShard(opCtx, *_coordinatorId)); + auto coordinatorShard = + uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, *_coordinatorId)); CoordinateCommitTransaction coordinateCommitCmd; coordinateCommitCmd.setDbName("admin"); coordinateCommitCmd.setParticipants(participantList); - auto coordinatorIter = _participants.find(*_coordinatorId); - invariant(coordinatorIter != _participants.end()); - return uassertStatusOK(coordinatorShard->runCommandWithFixedRetryAttempts( opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 889c0c5ceed..bc14fb5b326 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -657,7 +657,7 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) { future.timed_get(kFutureTimeout); } -TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipants) { +TEST_F(TransactionRouterTest, SendCoordinateCommitForMultipleParticipants) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; @@ -676,21 +676,6 @@ TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipa auto future = launchAsync([&] { txnRouter->commitTransaction(operationContext()); }); onCommand([&](const RemoteCommandRequest& request) { - ASSERT_EQ(hostAndPort2, request.target); - ASSERT_EQ("admin", request.dbname); - - auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); - ASSERT_EQ(cmdName, "prepareTransaction"); - - auto coordinator = request.cmdObj["coordinatorId"].str(); - ASSERT_EQ(shard1.toString(), coordinator); - - checkSessionDetails(request.cmdObj, lsid, txnNum, boost::none); - - return BSON("ok" << 1); - }); - - onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); ASSERT_EQ("admin", request.dbname); diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js index 2c30b10e632..2119e5cef2f 100644 --- a/src/mongo/shell/servers.js +++ b/src/mongo/shell/servers.js @@ -1121,16 +1121,6 @@ var MongoRunner, _startMongod, startMongoProgram, runMongoProgram, startMongoPro } } - // New mongod-specific options in 4.1.x. - if (!programMajorMinorVersion || programMajorMinorVersion >= 410) { - if (jsTest.options().setSkipShardingPartsOfPrepareTransactionFailpoint && - jsTest.options().enableTestCommands) { - argArray.push( - ...["--setParameter", - "failpoint.skipShardingPartsOfPrepareTransaction={mode:'alwaysOn'}"]); - } - } - // New mongod-specific options in 4.0.x if (!programMajorMinorVersion || programMajorMinorVersion >= 400) { if (jsTest.options().transactionLifetimeLimitSeconds !== undefined) { diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js index e6477b0fa10..a98de90cdc1 100644 --- a/src/mongo/shell/utils.js +++ b/src/mongo/shell/utils.js @@ -318,8 +318,6 @@ jsTestOptions = function() { mqlTestFile: TestData.mqlTestFile, mqlRootPath: TestData.mqlRootPath, disableImplicitSessions: TestData.disableImplicitSessions || false, - setSkipShardingPartsOfPrepareTransactionFailpoint: - TestData.setSkipShardingPartsOfPrepareTransactionFailpoint || false, }); } return _jsTestOptions; |