summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/base/error_codes.err2
-rw-r--r--src/mongo/db/commands/txn_cmds.cpp17
-rw-r--r--src/mongo/db/commands/txn_two_phase_commit_cmds.idl4
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp271
-rw-r--r--src/mongo/db/transaction_coordinator.cpp205
-rw-r--r--src/mongo/db/transaction_coordinator.h185
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.cpp8
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.h5
-rw-r--r--src/mongo/db/transaction_coordinator_catalog_test.cpp72
-rw-r--r--src/mongo/db/transaction_coordinator_commands_impl.cpp282
-rw-r--r--src/mongo/db/transaction_coordinator_commands_impl.h45
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp117
-rw-r--r--src/mongo/db/transaction_coordinator_service.h48
-rw-r--r--src/mongo/db/transaction_coordinator_service_test.cpp271
-rw-r--r--src/mongo/db/transaction_coordinator_state_machine_test.cpp63
-rw-r--r--src/mongo/db/transaction_coordinator_test.cpp112
-rw-r--r--src/mongo/s/transaction_router.cpp36
-rw-r--r--src/mongo/s/transaction_router_test.cpp17
-rw-r--r--src/mongo/shell/servers.js10
-rw-r--r--src/mongo/shell/utils.js2
20 files changed, 860 insertions, 912 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 6b2ff3d0762..0d79c3a1618 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -333,3 +333,5 @@ error_class("ConnectionFatalMessageParseError", ["IllegalOpMsgFlag",
error_class("ExceededTimeLimitError", ["ExceededTimeLimit", "MaxTimeMSExpired", "NetworkInterfaceExceededTimeLimit"])
error_class("SnapshotError", ["SnapshotTooOld", "SnapshotUnavailable", "StaleChunkHistory", "MigrationConflict"])
+
+error_class("VoteAbortError", ["NoSuchTransaction", "TransactionTooOld"])
diff --git a/src/mongo/db/commands/txn_cmds.cpp b/src/mongo/db/commands/txn_cmds.cpp
index 99ce548cd23..c043a38dba8 100644
--- a/src/mongo/db/commands/txn_cmds.cpp
+++ b/src/mongo/db/commands/txn_cmds.cpp
@@ -46,6 +46,9 @@
namespace mongo {
namespace {
+MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForAbortAfterExecutingAbortLogic);
+MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForCommitAfterExecutingCommitLogic);
+
class CmdCommitTxn : public BasicCommand {
public:
CmdCommitTxn() : BasicCommand("commitTransaction") {}
@@ -94,6 +97,11 @@ public:
// commit oplog entry.
auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient());
replClient.setLastOpToSystemLastOpTime(opCtx);
+ if (MONGO_FAIL_POINT(participantReturnNetworkErrorForCommitAfterExecutingCommitLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
+ }
+
return true;
}
@@ -109,6 +117,10 @@ public:
// commitUnpreparedTransaction will throw if the transaction is prepared.
txnParticipant->commitUnpreparedTransaction(opCtx);
}
+ if (MONGO_FAIL_POINT(participantReturnNetworkErrorForCommitAfterExecutingCommitLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
+ }
return true;
}
@@ -172,6 +184,11 @@ public:
<< exceptionToStatus());
throw;
}
+ if (MONGO_FAIL_POINT(participantReturnNetworkErrorForAbortAfterExecutingAbortLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
+ }
+
return true;
}
diff --git a/src/mongo/db/commands/txn_two_phase_commit_cmds.idl b/src/mongo/db/commands/txn_two_phase_commit_cmds.idl
index d90cc4fa49a..0b77fc9827f 100644
--- a/src/mongo/db/commands/txn_two_phase_commit_cmds.idl
+++ b/src/mongo/db/commands/txn_two_phase_commit_cmds.idl
@@ -45,10 +45,6 @@ commands:
description: "Parser for the 'prepareTransaction' command."
strict: true
namespace: ignored
- fields:
- coordinatorId:
- description: "The coordinator shard for this transaction."
- type: shard_id
voteCommitTransaction:
description: "Parser for the 'voteCommitTransaction' command."
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index cb4766efc4e..e535c31fc5b 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -48,7 +48,7 @@
namespace mongo {
namespace {
-MONGO_FAIL_POINT_DEFINE(skipShardingPartsOfPrepareTransaction);
+MONGO_FAIL_POINT_DEFINE(participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic);
class PrepareTransactionCmd : public TypedCommand<PrepareTransactionCmd> {
public:
@@ -71,13 +71,9 @@ public:
using InvocationBase::InvocationBase;
Response typedRun(OperationContext* opCtx) {
- // In production, only config servers or initialized shard servers can participate in a
- // sharded transaction. However, many test suites test the replication and storage parts
- // of prepareTransaction against a standalone replica set, so allow skipping the check.
- if (!MONGO_FAIL_POINT(skipShardingPartsOfPrepareTransaction)) {
- if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
- uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
- }
+ if (!getTestCommandsEnabled() &&
+ serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
+ uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
}
auto txnParticipant = TransactionParticipant::get(opCtx);
@@ -99,8 +95,6 @@ public:
"Transaction isn't in progress",
txnParticipant->inMultiDocumentTransaction());
- const auto& cmd = request();
-
if (txnParticipant->transactionIsPrepared()) {
auto& replClient = repl::ReplClientInfo::forClient(opCtx->getClient());
auto prepareOpTime = txnParticipant->getPrepareOpTime();
@@ -118,96 +112,24 @@ public:
<< " participant prepareOpTime: "
<< prepareOpTime.toString());
- // A participant should re-send its vote if it re-received prepare.
- _sendVoteCommit(opCtx, prepareOpTime.getTimestamp(), cmd.getCoordinatorId());
-
+ if (MONGO_FAIL_POINT(
+ participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
+ }
return PrepareTimestamp(prepareOpTime.getTimestamp());
}
- // TODO (SERVER-36839): Pass coordinatorId into prepareTransaction() so that the
- // coordinatorId can be included in the write to config.transactions.
const auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx, {});
- _sendVoteCommit(opCtx, prepareTimestamp, cmd.getCoordinatorId());
-
- return PrepareTimestamp(prepareTimestamp);
- }
-
- private:
- void _sendVoteCommit(OperationContext* opCtx,
- Timestamp prepareTimestamp,
- ShardId coordinatorId) {
- // In a production cluster, a participant should always send its vote to the coordinator
- // as part of prepareTransaction. However, many test suites test the replication and
- // storage parts of prepareTransaction against a standalone replica set, so allow
- // skipping sending a vote.
- if (MONGO_FAIL_POINT(skipShardingPartsOfPrepareTransaction)) {
- return;
- }
-
- VoteCommitTransaction voteCommit;
- voteCommit.setDbName("admin");
- voteCommit.setShardId(ShardingState::get(opCtx)->shardId());
- voteCommit.setPrepareTimestamp(prepareTimestamp);
- BSONObj voteCommitObj = voteCommit.toBSON(
- BSON("lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber"
- << *opCtx->getTxnNumber()
- << "autocommit"
- << false));
- _sendVote(opCtx, voteCommitObj, coordinatorId);
- }
-
- void _sendVoteAbort(OperationContext* opCtx, ShardId coordinatorId) {
- // In a production cluster, a participant should always send its vote to the coordinator
- // as part of prepareTransaction. However, many test suites test the replication and
- // storage parts of prepareTransaction against a standalone replica set, so allow
- // skipping sending a vote.
- if (MONGO_FAIL_POINT(skipShardingPartsOfPrepareTransaction)) {
- return;
- }
-
- VoteAbortTransaction voteAbort;
- voteAbort.setDbName("admin");
- voteAbort.setShardId(ShardingState::get(opCtx)->shardId());
- BSONObj voteAbortObj = voteAbort.toBSON(
- BSON("lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber"
- << *opCtx->getTxnNumber()
- << "autocommit"
- << false));
- _sendVote(opCtx, voteAbortObj, coordinatorId);
- }
-
- void _sendVote(OperationContext* opCtx, const BSONObj& voteObj, ShardId coordinatorId) {
- try {
- // TODO (SERVER-37328): Participant should wait for writeConcern before sending its
- // vote.
-
- LOG(3) << "Participant shard sending " << voteObj << " to " << coordinatorId;
-
- const auto coordinatorPrimaryHost = [&] {
- auto coordinatorShard = uassertStatusOK(
- Grid::get(opCtx)->shardRegistry()->getShard(opCtx, coordinatorId));
- return uassertStatusOK(coordinatorShard->getTargeter()->findHostNoWait(
- ReadPreferenceSetting{ReadPreference::PrimaryOnly}));
- }();
-
- const executor::RemoteCommandRequest request(
- coordinatorPrimaryHost,
- NamespaceString::kAdminDb.toString(),
- voteObj,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly}.toContainingBSON(),
- opCtx,
- executor::RemoteCommandRequest::kNoTimeout);
-
- auto noOp = [](const executor::TaskExecutor::RemoteCommandCallbackArgs&) {};
- uassertStatusOK(
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()->scheduleRemoteCommand(
- request, noOp));
- } catch (const DBException& ex) {
- LOG(3) << "Participant shard failed to send " << voteObj << " to " << coordinatorId
- << causedBy(ex.toStatus());
+ if (MONGO_FAIL_POINT(
+ participantReturnNetworkErrorForPrepareAfterExecutingPrepareLogic)) {
+ uasserted(ErrorCodes::SocketException,
+ "returning network error because failpoint is on");
}
+ return PrepareTimestamp(std::move(prepareTimestamp));
}
+ private:
bool supportsWriteConcern() const override {
return true;
}
@@ -233,122 +155,6 @@ public:
}
} prepareTransactionCmd;
-class VoteCommitTransactionCmd : public TypedCommand<VoteCommitTransactionCmd> {
-public:
- using Request = VoteCommitTransaction;
- class Invocation final : public InvocationBase {
- public:
- using InvocationBase::InvocationBase;
-
- void typedRun(OperationContext* opCtx) {
- // Only config servers or initialized shard servers can act as transaction coordinators.
- if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
- uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
- }
-
- uassert(
- ErrorCodes::CommandNotSupported,
- "'voteCommitTransaction' is only supported in feature compatibility version 4.2",
- (serverGlobalParams.featureCompatibility.getVersion() ==
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42));
-
- const auto& cmd = request();
-
- LOG(3) << "Coordinator shard received voteCommit from " << cmd.getShardId()
- << " with prepare timestamp " << cmd.getPrepareTimestamp() << " for transaction "
- << opCtx->getTxnNumber() << " on session "
- << opCtx->getLogicalSessionId()->toBSON();
-
- TransactionCoordinatorService::get(opCtx)->voteCommit(
- opCtx,
- opCtx->getLogicalSessionId().get(),
- opCtx->getTxnNumber().get(),
- cmd.getShardId(),
- cmd.getPrepareTimestamp());
- }
-
- private:
- bool supportsWriteConcern() const override {
- return false;
- }
-
- NamespaceString ns() const override {
- return NamespaceString(request().getDbName(), "");
- }
-
- void doCheckAuthorization(OperationContext* opCtx) const override {}
- };
-
- virtual bool adminOnly() const {
- return true;
- }
-
- std::string help() const override {
- return "Votes to commit a transaction; sent by a transaction participant to the "
- "transaction commit coordinator for a cross-shard transaction";
- }
-
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kNever;
- }
-} voteCommitTransactionCmd;
-
-class VoteAbortTransactionCmd : public TypedCommand<VoteAbortTransactionCmd> {
-public:
- using Request = VoteAbortTransaction;
- class Invocation final : public InvocationBase {
- public:
- using InvocationBase::InvocationBase;
-
- void typedRun(OperationContext* opCtx) {
- // Only config servers or initialized shard servers can act as transaction coordinators.
- if (serverGlobalParams.clusterRole != ClusterRole::ConfigServer) {
- uassertStatusOK(ShardingState::get(opCtx)->canAcceptShardedCommands());
- }
-
- uassert(ErrorCodes::CommandNotSupported,
- "'voteAbortTransaction' is only supported in feature compatibility version 4.2",
- (serverGlobalParams.featureCompatibility.getVersion() ==
- ServerGlobalParams::FeatureCompatibility::Version::kFullyUpgradedTo42));
-
- const auto& cmd = request();
-
- LOG(3) << "Coordinator shard received voteAbort from " << cmd.getShardId()
- << " for transaction " << opCtx->getTxnNumber() << " on session "
- << opCtx->getLogicalSessionId()->toBSON();
-
- TransactionCoordinatorService::get(opCtx)->voteAbort(opCtx,
- opCtx->getLogicalSessionId().get(),
- opCtx->getTxnNumber().get(),
- cmd.getShardId());
- }
-
- private:
- bool supportsWriteConcern() const override {
- return false;
- }
-
- NamespaceString ns() const override {
- return NamespaceString(request().getDbName(), "");
- }
-
- void doCheckAuthorization(OperationContext* opCtx) const override {}
- };
-
- virtual bool adminOnly() const {
- return true;
- }
-
- std::string help() const override {
- return "Votes to abort a transaction; sent by a transaction participant to the transaction "
- "commit coordinator for a cross-shard transaction";
- }
-
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kNever;
- }
-} voteAbortTransactionCmd;
-
// TODO (SERVER-37440): Make coordinateCommit idempotent.
class CoordinateCommitTransactionCmd : public TypedCommand<CoordinateCommitTransactionCmd> {
public:
@@ -397,61 +203,16 @@ public:
opCtx->getTxnNumber().get(),
participantList);
- // If the commit decision is already available before we prepare locally, it means the
- // transaction has completed and we should skip preparing locally.
- //
- // TODO (SERVER-37440): Reconsider when coordinateCommit is made idempotent.
- if (!commitDecisionFuture.isReady()) {
- // Execute the 'prepare' logic on the local participant (the router does not send a
- // separate 'prepare' message to the coordinator shard).
- _callPrepareOnLocalParticipant(opCtx);
- }
-
// Block waiting for the commit decision.
auto commitDecision = commitDecisionFuture.get(opCtx);
// If the decision was abort, propagate NoSuchTransaction exception back to mongos.
uassert(ErrorCodes::NoSuchTransaction,
"Transaction was aborted",
- commitDecision != TransactionCoordinatorService::CommitDecision::kAbort);
+ commitDecision != TransactionCoordinator::CommitDecision::kAbort);
}
private:
- void _callPrepareOnLocalParticipant(OperationContext* opCtx) {
- auto localParticipantPrepareTimestamp = [&]() -> Timestamp {
- OperationSessionInfoFromClient sessionInfo;
- sessionInfo.setAutocommit(false);
- sessionInfo.setCoordinator(false);
- OperationContextSessionMongod checkOutSession(opCtx, true, sessionInfo);
-
- auto txnParticipant = TransactionParticipant::get(opCtx);
-
- txnParticipant->unstashTransactionResources(opCtx, "prepareTransaction");
- ScopeGuard guard = MakeGuard([&txnParticipant, opCtx]() {
- txnParticipant->abortActiveUnpreparedOrStashPreparedTransaction(opCtx);
- });
-
- auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx, {});
-
- txnParticipant->stashTransactionResources(opCtx);
- guard.Dismiss();
- return prepareTimestamp;
- }();
-
- LOG(3) << "Participant shard delivering voteCommit with prepareTimestamp "
- << localParticipantPrepareTimestamp << " to local coordinator for transaction "
- << opCtx->getTxnNumber() << " on session "
- << opCtx->getLogicalSessionId()->toBSON();
-
- // Deliver the local participant's vote to the coordinator.
- TransactionCoordinatorService::get(opCtx)->voteCommit(
- opCtx,
- opCtx->getLogicalSessionId().get(),
- opCtx->getTxnNumber().get(),
- ShardingState::get(opCtx)->shardId(),
- localParticipantPrepareTimestamp);
- }
-
bool supportsWriteConcern() const override {
return true;
}
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp
index 03bf534a104..cb12b61c719 100644
--- a/src/mongo/db/transaction_coordinator.cpp
+++ b/src/mongo/db/transaction_coordinator.cpp
@@ -28,14 +28,15 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction
#include "mongo/platform/basic.h"
+#include "mongo/db/logical_clock.h"
+#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/transaction_coordinator.h"
-
-#include "mongo/db/session.h"
+#include "mongo/util/log.h"
namespace mongo {
@@ -43,46 +44,114 @@ using Action = TransactionCoordinator::StateMachine::Action;
using Event = TransactionCoordinator::StateMachine::Event;
using State = TransactionCoordinator::StateMachine::State;
+//
+// Pre-decision
+//
+
Action TransactionCoordinator::recvCoordinateCommit(const std::set<ShardId>& participants) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_participantList.recordFullList(participants);
return _stateMachine.onEvent(std::move(lk), Event::kRecvParticipantList);
}
-Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp) {
+Action TransactionCoordinator::madeParticipantListDurable() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
-
- _participantList.recordVoteCommit(shardId, prepareTimestamp);
-
- auto event = (_participantList.allParticipantsVotedCommit()) ? Event::kRecvFinalVoteCommit
- : Event::kRecvVoteCommit;
- return _stateMachine.onEvent(std::move(lk), event);
+ return _stateMachine.onEvent(std::move(lk), Event::kMadeParticipantListDurable);
}
+//
+// Abort path
+//
+
Action TransactionCoordinator::recvVoteAbort(const ShardId& shardId) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
_participantList.recordVoteAbort(shardId);
return _stateMachine.onEvent(std::move(lk), Event::kRecvVoteAbort);
}
-Action TransactionCoordinator::recvTryAbort() {
+Action TransactionCoordinator::madeAbortDecisionDurable() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- return _stateMachine.onEvent(std::move(lk), Event::kRecvTryAbort);
+ return _stateMachine.onEvent(std::move(lk), Event::kMadeAbortDecisionDurable);
}
-void TransactionCoordinator::recvCommitAck(const ShardId& shardId) {
+Action TransactionCoordinator::recvAbortAck(const ShardId& shardId) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _participantList.recordCommitAck(shardId);
- if (_participantList.allParticipantsAckedCommit()) {
- _stateMachine.onEvent(std::move(lk), Event::kRecvFinalCommitAck);
+ _participantList.recordAbortAck(shardId);
+ auto event = _participantList.allParticipantsAckedAbort() ? Event::kRecvFinalAbortAck
+ : Event::kRecvAbortAck;
+ return _stateMachine.onEvent(std::move(lk), event);
+}
+
+//
+// Commit path
+//
+
+Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _participantList.recordVoteCommit(shardId, prepareTimestamp);
+ auto event = (_participantList.allParticipantsVotedCommit()) ? Event::kRecvFinalVoteCommit
+ : Event::kRecvVoteCommit;
+ if (event == Event::kRecvFinalVoteCommit) {
+ const auto maxPrepareTs = _participantList.getHighestPrepareTimestamp();
+ _commitTimestamp = Timestamp(maxPrepareTs.getSecs(), maxPrepareTs.getInc() + 1);
+ Status s = LogicalClock::get(getGlobalServiceContext())
+ ->advanceClusterTime(LogicalTime(_commitTimestamp.get()));
+ if (!s.isOK()) {
+ log() << "Coordinator shard failed to advance cluster time to commitTimestamp "
+ << causedBy(s);
+ }
}
+ return _stateMachine.onEvent(std::move(lk), event);
+}
+
+Action TransactionCoordinator::madeCommitDecisionDurable() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _stateMachine.onEvent(std::move(lk), Event::kMadeCommitDecisionDurable);
}
+Action TransactionCoordinator::recvCommitAck(const ShardId& shardId) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _participantList.recordCommitAck(shardId);
+ auto event = _participantList.allParticipantsAckedCommit() ? Event::kRecvFinalCommitAck
+ : Event::kRecvCommitAck;
+ return _stateMachine.onEvent(std::move(lk), event);
+}
+
+//
+// Any time
+//
+
Future<TransactionCoordinator::StateMachine::State> TransactionCoordinator::waitForCompletion() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
return _stateMachine.waitForTransitionTo({State::kCommitted, State::kAborted});
}
+Future<TransactionCoordinator::CommitDecision> TransactionCoordinator::waitForDecision() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _stateMachine
+ .waitForTransitionTo({State::kWaitingForAbortAcks,
+ State::kWaitingForCommitAcks,
+ State::kCommitted,
+ State::kAborted})
+ .then([](auto state) {
+ switch (state) {
+ case TransactionCoordinator::StateMachine::State::kWaitingForAbortAcks:
+ case TransactionCoordinator::StateMachine::State::kAborted:
+ return TransactionCoordinator::CommitDecision::kAbort;
+ case TransactionCoordinator::StateMachine::State::kWaitingForCommitAcks:
+ case TransactionCoordinator::StateMachine::State::kCommitted:
+ return TransactionCoordinator::CommitDecision::kCommit;
+ default:
+ MONGO_UNREACHABLE;
+ }
+ });
+}
+
+Action TransactionCoordinator::recvTryAbort() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _stateMachine.onEvent(std::move(lk), Event::kRecvTryAbort);
+}
+
//
// StateMachine
//
@@ -101,39 +170,70 @@ Future<TransactionCoordinator::StateMachine::State> TransactionCoordinator::wait
const std::map<State, std::map<Event, TransactionCoordinator::StateMachine::Transition>>
TransactionCoordinator::StateMachine::transitionTable = {
// clang-format off
- {State::kWaitingForParticipantList, {
- {Event::kRecvVoteAbort, {Action::kSendAbort, State::kAborted}},
- {Event::kRecvVoteCommit, {}},
- {Event::kRecvParticipantList, {State::kWaitingForVotes}},
- {Event::kRecvTryAbort, {Action::kSendAbort, State::kAborted}},
+ {State::kUninitialized, {
+ {Event::kRecvParticipantList, {Action::kWriteParticipantList, State::kMakingParticipantListDurable}},
+ {Event::kRecvTryAbort, {{}, State::kAborted}},
+ }},
+ {State::kMakingParticipantListDurable, {
+ {Event::kRecvParticipantList, {}},
+ {Event::kMadeParticipantListDurable, {Action::kSendPrepare, State::kWaitingForVotes}},
+ {Event::kRecvTryAbort, {}},
}},
{State::kWaitingForVotes, {
- {Event::kRecvVoteAbort, {Action::kSendAbort, State::kAborted}},
+ {Event::kRecvParticipantList, {}},
+ {Event::kRecvVoteAbort, {Action::kWriteAbortDecision, State::kMakingAbortDecisionDurable}},
{Event::kRecvVoteCommit, {}},
+ {Event::kRecvFinalVoteCommit, {Action::kWriteCommitDecision, State::kMakingCommitDecisionDurable}},
+ {Event::kRecvTryAbort, {}},
+ }},
+
+ // Abort path
+ // Note: Can continue to receive votes after abort decision has been made, because an abort
+ // decision only requires a single voteAbort.
+ {State::kMakingAbortDecisionDurable, {
+ {Event::kRecvParticipantList, {}},
+ {Event::kRecvVoteAbort, {}},
+ {Event::kRecvVoteCommit, {}},
+ {Event::kMadeAbortDecisionDurable, {Action::kSendAbort, State::kWaitingForAbortAcks}},
+ {Event::kRecvTryAbort, {}},
+ }},
+ {State::kWaitingForAbortAcks, {
{Event::kRecvParticipantList, {}},
- {Event::kRecvFinalVoteCommit, {Action::kSendCommit, State::kWaitingForCommitAcks}},
- {Event::kRecvTryAbort, {Action::kSendAbort, State::kAborted}},
+ {Event::kRecvVoteAbort, {}},
+ {Event::kRecvVoteCommit, {}},
+ {Event::kRecvAbortAck, {}},
+ {Event::kRecvFinalAbortAck, {Action::kDone, State::kAborted}},
+ {Event::kRecvTryAbort, {}},
+
}},
{State::kAborted, {
- {Event::kRecvVoteAbort, {}},
- {Event::kRecvVoteCommit, {Action::kSendAbort}},
{Event::kRecvParticipantList, {}},
+ {Event::kRecvVoteAbort, {}},
+ {Event::kRecvVoteCommit, {}},
{Event::kRecvTryAbort, {}},
}},
+
+ // Commit path
+ // Note: Cannot continue to receive votes after commit decision has been made, because a
+ // commit decision requires all voteCommits.
+ {State::kMakingCommitDecisionDurable, {
+ {Event::kRecvParticipantList, {}},
+ {Event::kMadeCommitDecisionDurable, {Action::kSendCommit, State::kWaitingForCommitAcks}},
+ {Event::kRecvTryAbort, {}},
+
+ }},
{State::kWaitingForCommitAcks, {
- {Event::kRecvVoteCommit, {}},
{Event::kRecvParticipantList, {}},
- {Event::kRecvFinalVoteCommit, {Action::kSendCommit}},
- {Event::kRecvFinalCommitAck, {State::kCommitted}},
+ {Event::kRecvCommitAck, {}},
+ {Event::kRecvFinalCommitAck, {Action::kDone, State::kCommitted}},
{Event::kRecvTryAbort, {}},
+
}},
{State::kCommitted, {
- {Event::kRecvVoteCommit, {}},
{Event::kRecvParticipantList, {}},
- {Event::kRecvFinalVoteCommit, {}},
- {Event::kRecvFinalCommitAck, {}},
{Event::kRecvTryAbort, {}},
}},
+
{State::kBroken, {}},
// clang-format on
};
@@ -175,6 +275,7 @@ void TransactionCoordinator::StateMachine::_signalAllPromisesWaitingForState(
Action TransactionCoordinator::StateMachine::onEvent(stdx::unique_lock<stdx::mutex> lk,
Event event) {
+
const auto legalTransitions = transitionTable.find(_state)->second;
if (!legalTransitions.count(event)) {
std::string errmsg = str::stream() << "Transaction coordinator received illegal event '"
@@ -184,9 +285,20 @@ Action TransactionCoordinator::StateMachine::onEvent(stdx::unique_lock<stdx::mut
}
const auto transition = legalTransitions.find(event)->second;
+
if (transition.nextState) {
+ StringBuilder ss;
+ ss << "TransactionCoordinator received event " << event << " while in state " << _state
+ << " and returning " << transition.action << " and transitioning to "
+ << *transition.nextState;
+ LOG(3) << ss.str();
_state = *transition.nextState;
_signalAllPromisesWaitingForState(std::move(lk), _state);
+ } else {
+ StringBuilder ss;
+ ss << "TransactionCoordinator received event " << event << " while in state " << _state
+ << " and returning " << transition.action << " and not transitioning to new state";
+ LOG(3) << ss.str();
}
return transition.action;
@@ -288,12 +400,13 @@ void TransactionCoordinator::ParticipantList::recordVoteAbort(const ShardId& sha
participant.vote != Participant::Vote::kCommit);
participant.vote = Participant::Vote::kAbort;
+ participant.ack = Participant::Ack::kAbort;
}
void TransactionCoordinator::ParticipantList::recordCommitAck(const ShardId& shardId) {
auto it = _participants.find(shardId);
uassert(
- ErrorCodes::InternalError,
+ 50989,
str::stream() << "Transaction commit coordinator processed 'commit' ack from participant "
<< shardId.toString()
<< " not in participant list",
@@ -301,6 +414,17 @@ void TransactionCoordinator::ParticipantList::recordCommitAck(const ShardId& sha
it->second.ack = Participant::Ack::kCommit;
}
+void TransactionCoordinator::ParticipantList::recordAbortAck(const ShardId& shardId) {
+ auto it = _participants.find(shardId);
+ uassert(
+ 50990,
+ str::stream() << "Transaction commit coordinator processed 'abort' ack from participant "
+ << shardId.toString()
+ << " not in participant list",
+ it != _participants.end());
+ it->second.ack = Participant::Ack::kAbort;
+}
+
bool TransactionCoordinator::ParticipantList::allParticipantsVotedCommit() const {
return _fullListReceived && std::all_of(_participants.begin(),
_participants.end(),
@@ -309,6 +433,13 @@ bool TransactionCoordinator::ParticipantList::allParticipantsVotedCommit() const
});
}
+bool TransactionCoordinator::ParticipantList::allParticipantsAckedAbort() const {
+ return std::all_of(
+ _participants.begin(), _participants.end(), [](const std::pair<ShardId, Participant>& i) {
+ return i.second.ack == Participant::Ack::kAbort;
+ });
+}
+
bool TransactionCoordinator::ParticipantList::allParticipantsAckedCommit() const {
invariant(_fullListReceived);
return std::all_of(
@@ -329,6 +460,14 @@ Timestamp TransactionCoordinator::ParticipantList::getHighestPrepareTimestamp()
return highestPrepareTimestamp;
}
+std::set<ShardId> TransactionCoordinator::ParticipantList::getParticipants() const {
+ std::set<ShardId> participants;
+ for (const auto& kv : _participants) {
+ participants.insert(kv.first);
+ }
+ return participants;
+}
+
std::set<ShardId> TransactionCoordinator::ParticipantList::getNonAckedCommitParticipants() const {
std::set<ShardId> nonAckedCommitParticipants;
for (const auto& kv : _participants) {
diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h
index f0e6f0612eb..510a3339a9e 100644
--- a/src/mongo/db/transaction_coordinator.h
+++ b/src/mongo/db/transaction_coordinator.h
@@ -33,10 +33,12 @@
#include <boost/optional.hpp>
#include <list>
#include <map>
+#include <memory>
#include <set>
#include "mongo/base/disallow_copying.h"
#include "mongo/bson/timestamp.h"
+#include "mongo/db/logical_session_id.h"
#include "mongo/s/shard_id.h"
#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
@@ -53,26 +55,37 @@ class Session;
* A state machine that coordinates a distributed transaction commit with the transaction
* participants.
*/
-class TransactionCoordinator {
+class TransactionCoordinator : public std::enable_shared_from_this<TransactionCoordinator> {
MONGO_DISALLOW_COPYING(TransactionCoordinator);
public:
TransactionCoordinator() = default;
~TransactionCoordinator() = default;
+ enum class CommitDecision {
+ kCommit,
+ kAbort,
+ };
+
/**
* The internal state machine, or "brain", used by the TransactionCoordinator to determine what
* to do in response to an "event" (receiving a request or hearing back a response).
*/
class StateMachine {
- friend class TransactionCoordinator;
-
public:
~StateMachine();
enum class State {
- kWaitingForParticipantList,
+ kUninitialized,
+ kMakingParticipantListDurable,
kWaitingForVotes,
+
+ // Abort path
+ kMakingAbortDecisionDurable,
+ kWaitingForAbortAcks,
kAborted,
+
+ // Commit path
+ kMakingCommitDecisionDurable,
kWaitingForCommitAcks,
kCommitted,
@@ -85,16 +98,36 @@ public:
// State machine inputs
enum class Event {
+ kRecvParticipantList,
+ kMadeParticipantListDurable,
+
+ // Abort path
kRecvVoteAbort,
+ kMadeAbortDecisionDurable,
+ kRecvAbortAck,
+ kRecvFinalAbortAck,
+
+ // Commit path
kRecvVoteCommit,
- kRecvParticipantList,
kRecvFinalVoteCommit,
+ kMadeCommitDecisionDurable,
+ kRecvCommitAck,
kRecvFinalCommitAck,
+
kRecvTryAbort,
};
// State machine outputs
- enum class Action { kNone, kSendCommit, kSendAbort };
+ enum class Action {
+ kNone,
+ kWriteParticipantList,
+ kSendPrepare,
+ kWriteAbortDecision,
+ kSendAbort,
+ kWriteCommitDecision,
+ kSendCommit,
+ kDone
+ };
// IMPORTANT: If there is a state transition, this will release the lock in order to signal
// any promises that may be waiting on a state change, and will not reacquire it.
@@ -137,7 +170,7 @@ public:
};
static const std::map<State, std::map<Event, Transition>> transitionTable;
- State _state{State::kWaitingForParticipantList};
+ State _state{State::kUninitialized};
std::list<StateTransitionPromise> _stateTransitionPromises;
};
@@ -145,38 +178,77 @@ public:
* The coordinateCommit command contains the full participant list that this node is responsible
* for coordinating the commit across.
*
- * Stores the participant list.
+ * Stores the participant list and returns the next action to take.
*
* Throws if any participants that this node has already heard a vote from are not in the list.
*/
StateMachine::Action recvCoordinateCommit(const std::set<ShardId>& participants);
/**
- * A participant sends a voteCommit command with its prepareTimestamp if it succeeded in
- * preparing the transaction.
+ * Advances the state machine and returns the next action to take.
+ */
+ StateMachine::Action madeParticipantListDurable();
+
+ //
+ // Abort path
+ //
+
+ /**
+ * A participant responds to prepare with failure if it failed to prepare the transaction, has
+ * timed out and already aborted the transaction, or has received a higher transaction number.
*
- * Stores the participant's vote.
+ * Stores the participant's vote and returns the next action to take.
*
* Throws if the full participant list has been received and this shard is not one of the
* participants.
*/
- StateMachine::Action recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp);
+ StateMachine::Action recvVoteAbort(const ShardId& shardId);
+
+ /**
+ * Advances the state machine and returns the next action to take.
+ */
+ StateMachine::Action madeAbortDecisionDurable();
/**
- * A participant sends a voteAbort command if it failed to prepare the transaction.
+ * If this is the final abort ack, advances the state machine. Returns the next action to take.
+ */
+ StateMachine::Action recvAbortAck(const ShardId& shardId);
+
+ //
+ // Commit path
+ //
+
+ /**
+ * A participant responds to prepare with success and its prepare Timestamp if it succeeded in
+ * preparing the transaction.
*
- * Stores the participant's vote and causes the coordinator to decide to abort the transaction.
+ * Stores the participant's vote and prepare Timestamp and returns the next action to take.
*
* Throws if the full participant list has been received and this shard is not one of the
* participants.
*/
- StateMachine::Action recvVoteAbort(const ShardId& shardId);
+ StateMachine::Action recvVoteCommit(const ShardId& shardId, Timestamp prepareTimestamp);
/**
- * A tryAbort event is received by the coordinator when a transaction is implicitly aborted when
- * a new transaction is received for the same session with a higher transaction number.
+ * Advances the state machine and returns the next action to take.
*/
- StateMachine::Action recvTryAbort();
+ StateMachine::Action madeCommitDecisionDurable();
+
+ /**
+ * Marks this participant as having completed committing the transaction.
+ */
+ StateMachine::Action recvCommitAck(const ShardId& shardId);
+
+ //
+ // Any time
+ //
+
+ /**
+ * Returns a Future which will be signaled when the TransactionCoordinator has successfully
+ * persisted a commit or abort decision. The resulting future will contain coordinator's
+ * decision.
+ */
+ Future<TransactionCoordinator::CommitDecision> waitForDecision();
/**
* Returns a Future which will be signaled when the TransactionCoordinator either commits
@@ -185,9 +257,15 @@ public:
Future<TransactionCoordinator::StateMachine::State> waitForCompletion();
/**
- * Marks this participant as having completed committing the transaction.
+ * A tryAbort event is received by the coordinator when a transaction is implicitly aborted when
+ * a new transaction is received for the same session with a higher transaction number.
*/
- void recvCommitAck(const ShardId& shardId);
+ StateMachine::Action recvTryAbort();
+
+ std::set<ShardId> getParticipants() const {
+ invariant(_stateMachine.state() != StateMachine::State::kUninitialized);
+ return _participantList.getParticipants();
+ }
std::set<ShardId> getNonAckedCommitParticipants() const {
return _participantList.getNonAckedCommitParticipants();
@@ -197,8 +275,8 @@ public:
return _participantList.getNonVotedAbortParticipants();
}
- Timestamp getCommitTimestamp() const {
- return _participantList.getHighestPrepareTimestamp();
+ boost::optional<Timestamp> getCommitTimestamp() const {
+ return _commitTimestamp;
}
StateMachine::State state() const {
@@ -218,14 +296,23 @@ public:
bool allParticipantsAckedCommit() const;
Timestamp getHighestPrepareTimestamp() const;
-
+ std::set<ShardId> getParticipants() const;
std::set<ShardId> getNonAckedCommitParticipants() const;
std::set<ShardId> getNonVotedAbortParticipants() const;
class Participant {
public:
+ /**
+ * This participant's vote, that is, whether the participant responded with success to
+ * prepareTransaction.
+ */
enum class Vote { kUnknown, kAbort, kCommit };
- enum class Ack { kNone, kCommit };
+
+ /**
+ * Whether this participant has acked the decision.
+ * TODO (SERVER-37924): Remove this enum and just track the ack as a bool.
+ */
+ enum class Ack { kNone, kAbort, kCommit };
Vote vote{Vote::kUnknown};
Ack ack{Ack::kNone};
@@ -244,23 +331,27 @@ private:
stdx::mutex _mutex;
ParticipantList _participantList;
StateMachine _stateMachine;
+ boost::optional<Timestamp> _commitTimestamp;
};
inline StringBuilder& operator<<(StringBuilder& sb,
const TransactionCoordinator::StateMachine::State& state) {
using State = TransactionCoordinator::StateMachine::State;
+ // clang-format off
switch (state) {
- // clang-format off
- case State::kWaitingForParticipantList: return sb << "kWaitingForParticipantlist";
+ case State::kUninitialized: return sb << "kUninitialized";
+ case State::kMakingParticipantListDurable: return sb << "kMakingParticipantListDurable";
case State::kWaitingForVotes: return sb << "kWaitingForVotes";
+ case State::kMakingAbortDecisionDurable: return sb << "kMakingAbortDecisionsDurable";
+ case State::kWaitingForAbortAcks: return sb << "kWaitingForAbortAcks";
case State::kAborted: return sb << "kAborted";
+ case State::kMakingCommitDecisionDurable: return sb << "kMakingCommiDecisionsDurable";
case State::kWaitingForCommitAcks: return sb << "kWaitingForCommitAcks";
case State::kCommitted: return sb << "kCommitted";
case State::kBroken: return sb << "kBroken";
- // clang-format on
- default:
- MONGO_UNREACHABLE;
};
+ // clang-format on
+ MONGO_UNREACHABLE;
}
inline std::ostream& operator<<(std::ostream& os,
@@ -273,17 +364,23 @@ inline std::ostream& operator<<(std::ostream& os,
inline StringBuilder& operator<<(StringBuilder& sb,
const TransactionCoordinator::StateMachine::Event& event) {
using Event = TransactionCoordinator::StateMachine::Event;
+ // clang-format off
switch (event) {
- // clang-format off
- case Event::kRecvVoteAbort: return sb << "kRecvVoteAbort";
- case Event::kRecvVoteCommit: return sb << "kRecvVoteCommit";
- case Event::kRecvParticipantList: return sb << "kRecvParticipantList";
- case Event::kRecvFinalVoteCommit: return sb << "kRecvFinalVoteCommit";
- case Event::kRecvFinalCommitAck: return sb << "kRecvFinalCommitAck";
- // clang-format on
- default:
- MONGO_UNREACHABLE;
+ case Event::kRecvParticipantList: return sb << "kRecvParticipantList";
+ case Event::kMadeParticipantListDurable: return sb << "kMadeParticipantListDurable";
+ case Event::kRecvVoteAbort: return sb << "kRecvVoteAbort";
+ case Event::kMadeAbortDecisionDurable: return sb << "kMadeAbortDecisionDurable";
+ case Event::kRecvAbortAck: return sb << "kRecvAbortAck";
+ case Event::kRecvFinalAbortAck: return sb << "kRecvFinalAbortAck";
+ case Event::kRecvVoteCommit: return sb << "kRecvVoteCommit";
+ case Event::kRecvFinalVoteCommit: return sb << "kRecvFinalVoteCommit";
+ case Event::kMadeCommitDecisionDurable: return sb << "kMadeCommitDecisionDurable";
+ case Event::kRecvCommitAck: return sb << "kRecvCommitAck";
+ case Event::kRecvFinalCommitAck: return sb << "kRecvFinalCommitAck";
+ case Event::kRecvTryAbort: return sb << "kRecvTryAbort";
};
+ // clang-format on
+ MONGO_UNREACHABLE;
}
inline std::ostream& operator<<(std::ostream& os,
@@ -298,9 +395,14 @@ inline StringBuilder& operator<<(StringBuilder& sb,
using Action = TransactionCoordinator::StateMachine::Action;
// clang-format off
switch (action) {
- case Action::kSendCommit: return sb << "kSendCommit";
- case Action::kSendAbort: return sb << "kSendAbort";
- case Action::kNone: return sb << "kNone";
+ case Action::kNone: return sb << "kNone";
+ case Action::kWriteParticipantList: return sb << "kWriteParticipantList";
+ case Action::kSendPrepare: return sb << "kSendPrepare";
+ case Action::kWriteAbortDecision: return sb << "kWriteAbortDecision";
+ case Action::kSendAbort: return sb << "kSendAbort";
+ case Action::kWriteCommitDecision: return sb << "kWriteCommitDecision";
+ case Action::kSendCommit: return sb << "kSendCommit";
+ case Action::kDone: return sb << "kDone";
};
// clang-format on
MONGO_UNREACHABLE;
@@ -312,4 +414,5 @@ inline std::ostream& operator<<(std::ostream& os,
sb << action;
return os << sb.str();
}
+
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp
index 79e7de153b2..74b29971677 100644
--- a/src/mongo/db/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog.cpp
@@ -78,22 +78,22 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::create(Lo
return newCoordinator;
}
-boost::optional<std::shared_ptr<TransactionCoordinator>> TransactionCoordinatorCatalog::get(
- LogicalSessionId lsid, TxnNumber txnNumber) {
+std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::get(LogicalSessionId lsid,
+ TxnNumber txnNumber) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid);
if (coordinatorsForSessionIter == _coordinatorsBySession.end()) {
- return boost::none;
+ return nullptr;
}
const auto& coordinatorsForSession = coordinatorsForSessionIter->second;
const auto& coordinatorForTxnIter = coordinatorsForSession.find(txnNumber);
if (coordinatorForTxnIter == coordinatorsForSession.end()) {
- return boost::none;
+ return nullptr;
}
return coordinatorForTxnIter->second;
diff --git a/src/mongo/db/transaction_coordinator_catalog.h b/src/mongo/db/transaction_coordinator_catalog.h
index bf28bf57e1f..85fb7609ab2 100644
--- a/src/mongo/db/transaction_coordinator_catalog.h
+++ b/src/mongo/db/transaction_coordinator_catalog.h
@@ -68,10 +68,9 @@ public:
/**
* Returns the coordinator with the given session id and transaction number, if it exists. If it
- * does not exist, return boost::none.
+ * does not exist, return nullptr.
*/
- boost::optional<std::shared_ptr<TransactionCoordinator>> get(LogicalSessionId lsid,
- TxnNumber txnNumber);
+ std::shared_ptr<TransactionCoordinator> get(LogicalSessionId lsid, TxnNumber txnNumber);
/**
* Returns the coordinator with the highest transaction number with the given session id, if it
diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp
index 69d7e2b7d37..5a16a26a4b9 100644
--- a/src/mongo/db/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp
@@ -83,7 +83,7 @@ TEST_F(TransactionCoordinatorCatalogTest, GetOnSessionThatDoesNotExistReturnsNon
TxnNumber txnNumber = 1;
auto coordinator = coordinatorCatalog().get(lsid, txnNumber);
- ASSERT_EQ(coordinator, boost::none);
+ ASSERT(coordinator == nullptr);
}
TEST_F(TransactionCoordinatorCatalogTest,
@@ -92,7 +92,7 @@ TEST_F(TransactionCoordinatorCatalogTest,
TxnNumber txnNumber = 1;
createCoordinatorInCatalog(lsid, txnNumber);
auto coordinatorInCatalog = coordinatorCatalog().get(lsid, txnNumber + 1);
- ASSERT_EQ(coordinatorInCatalog, boost::none);
+ ASSERT(coordinatorInCatalog == nullptr);
}
@@ -101,7 +101,7 @@ TEST_F(TransactionCoordinatorCatalogTest, CreateFollowedByGetReturnsCoordinator)
TxnNumber txnNumber = 1;
createCoordinatorInCatalog(lsid, txnNumber);
auto coordinatorInCatalog = coordinatorCatalog().get(lsid, txnNumber);
- ASSERT_NOT_EQUALS(coordinatorInCatalog, boost::none);
+ ASSERT(coordinatorInCatalog != nullptr);
}
TEST_F(TransactionCoordinatorCatalogTest, SecondCreateForSessionDoesNotOverwriteFirstCreate) {
@@ -112,7 +112,7 @@ TEST_F(TransactionCoordinatorCatalogTest, SecondCreateForSessionDoesNotOverwrite
auto coordinator2 = createCoordinatorInCatalog(lsid, txnNumber2);
auto coordinator1InCatalog = coordinatorCatalog().get(lsid, txnNumber1);
- ASSERT_NOT_EQUALS(coordinator1InCatalog, boost::none);
+ ASSERT(coordinator1InCatalog != nullptr);
}
DEATH_TEST_F(TransactionCoordinatorCatalogTest,
@@ -142,37 +142,41 @@ TEST_F(TransactionCoordinatorCatalogTest,
ASSERT_EQ(latestTxnNumAndCoordinator->first, txnNumber);
}
-TEST_F(TransactionCoordinatorCatalogTest,
- CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachCommittedState) {
- using CoordinatorState = TransactionCoordinator::StateMachine::State;
-
- LogicalSessionId lsid = makeLogicalSessionIdForTest();
- TxnNumber txnNumber = 1;
- auto coordinator = createCoordinatorInCatalog(lsid, txnNumber);
-
- coordinator->recvCoordinateCommit({ShardId("shard0000")});
- coordinator->recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
- coordinator->recvCommitAck(ShardId("shard0000"));
- ASSERT_EQ(coordinator->state(), CoordinatorState::kCommitted);
-
- auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid);
- ASSERT_FALSE(latestTxnNumAndCoordinator);
-}
-
-TEST_F(TransactionCoordinatorCatalogTest,
- CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachAbortedState) {
- using CoordinatorState = TransactionCoordinator::StateMachine::State;
-
- LogicalSessionId lsid = makeLogicalSessionIdForTest();
- TxnNumber txnNumber = 1;
- auto coordinator = createCoordinatorInCatalog(lsid, txnNumber);
-
- coordinator->recvVoteAbort(ShardId("shard0000"));
- ASSERT_EQ(coordinator->state(), CoordinatorState::kAborted);
+// TODO (SERVER-XXXX): Re-enable once coordinators are also participants and decision recovery
+// works correctly.
+// TEST_F(TransactionCoordinatorCatalogTest,
+// CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachCommittedState) {
+// using CoordinatorState = TransactionCoordinator::StateMachine::State;
+//
+// LogicalSessionId lsid = makeLogicalSessionIdForTest();
+// TxnNumber txnNumber = 1;
+// auto coordinator = createCoordinatorInCatalog(lsid, txnNumber);
+//
+// coordinator->recvCoordinateCommit({ShardId("shard0000")});
+// coordinator->recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
+// coordinator->recvCommitAck(ShardId("shard0000"));
+// ASSERT_EQ(coordinator->state(), CoordinatorState::kCommitted);
+//
+// auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid);
+// ASSERT_FALSE(latestTxnNumAndCoordinator);
+// }
- auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid);
- ASSERT_FALSE(latestTxnNumAndCoordinator);
-}
+// TODO (SERVER-XXXX): Re-enable once coordinators are also participants and decision recovery
+// works correctly.
+// TEST_F(TransactionCoordinatorCatalogTest,
+// CoordinatorsRemoveThemselvesFromCatalogWhenTheyReachAbortedState) {
+// using CoordinatorState = TransactionCoordinator::StateMachine::State;
+//
+// LogicalSessionId lsid = makeLogicalSessionIdForTest();
+// TxnNumber txnNumber = 1;
+// auto coordinator = createCoordinatorInCatalog(lsid, txnNumber);
+//
+// coordinator->recvVoteAbort(ShardId("shard0000"));
+// ASSERT_EQ(coordinator->state(), CoordinatorState::kAborted);
+//
+// auto latestTxnNumAndCoordinator = coordinatorCatalog().getLatestOnSession(lsid);
+// ASSERT_FALSE(latestTxnNumAndCoordinator);
+// }
TEST_F(TransactionCoordinatorCatalogTest,
TwoCreatesFollowedByGetLatestOnSessionReturnsCoordinatorWithHighestTxnNumber) {
diff --git a/src/mongo/db/transaction_coordinator_commands_impl.cpp b/src/mongo/db/transaction_coordinator_commands_impl.cpp
index ce63dd84ef0..12896e1eb83 100644
--- a/src/mongo/db/transaction_coordinator_commands_impl.cpp
+++ b/src/mongo/db/transaction_coordinator_commands_impl.cpp
@@ -36,70 +36,72 @@
#include "mongo/client/remote_command_targeter.h"
#include "mongo/db/commands/txn_cmds_gen.h"
-#include "mongo/db/operation_context_session_mongod.h"
+#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/executor/task_executor_pool.h"
#include "mongo/rpc/get_status_from_command_result.h"
-#include "mongo/s/async_requests_sender.h"
#include "mongo/s/grid.h"
+#include "mongo/util/concurrency/notification.h"
+#include "mongo/util/concurrency/thread_pool.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
+using Action = TransactionCoordinator::StateMachine::Action;
+using State = TransactionCoordinator::StateMachine::State;
+using RemoteCommandCallbackArgs = executor::TaskExecutor::RemoteCommandCallbackArgs;
+
/**
* Finds the host and port for a shard.
*/
-StatusWith<HostAndPort> targetHost(OperationContext* opCtx,
- const ShardId& shardId,
- const ReadPreferenceSetting& readPref) {
+StatusWith<HostAndPort> targetHost(const ShardId& shardId, const ReadPreferenceSetting& readPref) {
auto shard = Grid::get(getGlobalServiceContext())->shardRegistry()->getShardNoReload(shardId);
if (!shard) {
return Status(ErrorCodes::ShardNotFound,
str::stream() << "Could not find shard " << shardId);
}
- auto targeter = shard->getTargeter();
- return targeter->findHost(opCtx, readPref);
+ return shard->getTargeter()->findHostNoWait(readPref);
}
-using CallbackFn = stdx::function<void(Status status, const ShardId& shardID)>;
+using CallbackFn = stdx::function<void(
+ const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj)>;
/**
* Sends the given command object to the given shard ID. If scheduling and running the command is
* successful, calls the callback with the status of the command response and the shard ID.
*/
-void sendAsyncCommandToShard(OperationContext* opCtx,
- executor::TaskExecutor* executor,
+void sendAsyncCommandToShard(executor::TaskExecutor* executor,
const ShardId& shardId,
const BSONObj& commandObj,
- CallbackFn callbackOnCommandResponse) {
+ const CallbackFn& callbackOnCommandResponse) {
auto readPref = ReadPreferenceSetting(ReadPreference::PrimaryOnly);
- auto swShardHostAndPort = targetHost(opCtx, shardId, readPref);
- if (!swShardHostAndPort.isOK()) {
+ auto swShardHostAndPort = targetHost(shardId, readPref);
+ while (!swShardHostAndPort.isOK()) {
LOG(3) << "Coordinator shard failed to target primary host of participant shard for "
<< commandObj << causedBy(swShardHostAndPort.getStatus());
- return;
+ swShardHostAndPort = targetHost(shardId, readPref);
}
executor::RemoteCommandRequest request(
swShardHostAndPort.getValue(), "admin", commandObj, readPref.toContainingBSON(), nullptr);
auto swCallbackHandle = executor->scheduleRemoteCommand(
- request,
- [commandObj, shardId, callbackOnCommandResponse](
- const executor::TaskExecutor::RemoteCommandCallbackArgs& args) {
-
- auto status = (!args.response.isOK()) ? args.response.status
- : getStatusFromCommandResult(args.response.data);
+ request, [ commandObjOwned = commandObj.getOwned(),
+ shardId,
+ callbackOnCommandResponse ](const RemoteCommandCallbackArgs& args) {
+ LOG(3) << "Coordinator shard got response " << args.response.data << " for "
+ << commandObjOwned << " to " << shardId;
- LOG(3) << "Coordinator shard got response " << status << " for " << commandObj << " to "
- << shardId;
-
- // Only call callback if command successfully executed and got a response.
- if (args.response.isOK()) {
- callbackOnCommandResponse(status, shardId);
- }
+ callbackOnCommandResponse(args, shardId, commandObjOwned);
});
if (!swCallbackHandle.isOK()) {
@@ -107,7 +109,8 @@ void sendAsyncCommandToShard(OperationContext* opCtx,
<< " to shard " << shardId << causedBy(swCallbackHandle.getStatus());
}
- // Do not wait for the callback to run.
+ // Do not wait for the callback to run. The callback will reschedule the remote request on the
+ // same executor if necessary.
}
/**
@@ -115,67 +118,216 @@ void sendAsyncCommandToShard(OperationContext* opCtx,
* scheduling and running the command is successful, calls the callback with the status of the
* command response and the shard ID.
*/
-void sendAsyncCommandToShards(OperationContext* opCtx,
- const std::set<ShardId>& shardIds,
- const BSONObj& commandObj,
- CallbackFn callbackOnCommandResponse) {
+void sendCommandToShards(OperationContext* opCtx,
+ const std::set<ShardId>& shardIds,
+ const BSONObj& commandObj,
+ const CallbackFn& callbackOnCommandResponse) {
// TODO (SERVER-36638): Change to arbitrary task executor? Unit test only supports fixed
// executor.
auto exec = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
StringBuilder ss;
ss << "[";
- // For each non-acked participant, launch an async task to target its shard
- // and then asynchronously send the command.
for (const auto& shardId : shardIds) {
- sendAsyncCommandToShard(opCtx, exec, shardId, commandObj, callbackOnCommandResponse);
+ sendAsyncCommandToShard(exec, shardId, commandObj, callbackOnCommandResponse);
ss << shardId << " ";
}
-
ss << "]";
LOG(3) << "Coordinator shard sending " << commandObj << " to " << ss.str();
}
+void driveCoordinatorUntilDone(OperationContext* opCtx,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ Action action) {
+ while (true) {
+ switch (action) {
+ case Action::kWriteParticipantList:
+ action = coordinator->madeParticipantListDurable();
+ break;
+ case Action::kSendPrepare:
+ action = txn::sendPrepare(
+ opCtx, lsid, txnNumber, coordinator, coordinator->getParticipants());
+ break;
+ case Action::kWriteAbortDecision:
+ action = coordinator->madeAbortDecisionDurable();
+ break;
+ case Action::kSendAbort:
+ action = txn::sendAbort(opCtx,
+ lsid,
+ txnNumber,
+ coordinator,
+ coordinator->getNonVotedAbortParticipants());
+ break;
+ case Action::kWriteCommitDecision:
+ action = coordinator->madeCommitDecisionDurable();
+ break;
+ case Action::kSendCommit:
+ action = txn::sendCommit(opCtx,
+ lsid,
+ txnNumber,
+ coordinator,
+ coordinator->getNonAckedCommitParticipants(),
+ coordinator->getCommitTimestamp().get());
+ break;
+ case Action::kDone:
+ return;
+ case Action::kNone:
+ // This means an event was delivered to the coordinator outside the expected order
+ // of events.
+ MONGO_UNREACHABLE;
+ }
+ }
+}
} // namespace
namespace txn {
-void sendCommit(OperationContext* opCtx,
- std::shared_ptr<TransactionCoordinator> coordinator,
- const std::set<ShardId>& nonAckedParticipants,
- Timestamp commitTimestamp) {
- invariant(coordinator);
+Action sendPrepare(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& participants) {
+ PrepareTransaction prepareCmd;
+ prepareCmd.setDbName("admin");
+ auto prepareObj = prepareCmd.toBSON(
+ BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false
+ << WriteConcernOptions::kWriteConcernField
+ << WriteConcernOptions::Majority));
+
+ auto actionNotification = std::make_shared<Notification<Action>>();
+ CallbackFn prepareCallback;
+ prepareCallback = [coordinator, actionNotification, &prepareCallback](
+ const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj) {
+ auto status = (!args.response.isOK()) ? args.response.status
+ : getStatusFromCommandResult(args.response.data);
+
+ boost::optional<Action> action;
+
+ if (status.isOK()) {
+ if (args.response.data["prepareTimestamp"].eoo() ||
+ args.response.data["prepareTimestamp"].timestamp().isNull()) {
+ LOG(0) << "Coordinator shard received an OK response to prepareTransaction without "
+ "a prepareTimestamp from shard "
+ << shardId
+ << ", which is not expected behavior. Interpreting the response from "
+ << shardId << " as a vote to abort";
+ action = coordinator->recvVoteAbort(shardId);
+ } else {
+ action = coordinator->recvVoteCommit(
+ shardId, args.response.data["prepareTimestamp"].timestamp());
+ }
+ } else if (ErrorCodes::isVoteAbortError(status.code())) {
+ action = coordinator->recvVoteAbort(shardId);
+ }
+
+ if (action) {
+ if (*action != Action::kNone) {
+ actionNotification->set(*action);
+ }
+ return;
+ }
+
+ if (coordinator->state() != State::kWaitingForVotes) {
+ LOG(3) << "Coordinator shard not retrying prepare against " << shardId
+ << " because coordinator is no longer waiting for votes";
+ } else {
+ LOG(3) << "Coordinator shard retrying " << commandObj << " against " << shardId;
+ sendAsyncCommandToShard(args.executor, shardId, commandObj, prepareCallback);
+ }
+ };
+
+ sendCommandToShards(opCtx, participants, prepareObj, prepareCallback);
+ return actionNotification->get(opCtx);
+}
+
+Action sendCommit(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& nonAckedParticipants,
+ Timestamp commitTimestamp) {
CommitTransaction commitTransaction;
commitTransaction.setCommitTimestamp(commitTimestamp);
commitTransaction.setDbName("admin");
- BSONObj commitObj = commitTransaction.toBSON(BSON(
- "lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber" << *opCtx->getTxnNumber()
- << "autocommit"
- << false));
-
- sendAsyncCommandToShards(opCtx,
- nonAckedParticipants,
- commitObj,
- [coordinator](Status commandResponseStatus, const ShardId& shardId) {
- // TODO (SERVER-36642): Also interpret TransactionTooOld as
- // acknowledgment.
- if (commandResponseStatus.isOK()) {
- coordinator->recvCommitAck(shardId);
- }
- });
+ BSONObj commitObj = commitTransaction.toBSON(
+ BSON("lsid" << lsid.toBSON() << "txnNumber" << txnNumber << "autocommit" << false));
+
+ auto actionNotification = std::make_shared<Notification<Action>>();
+ CallbackFn commitCallback;
+ commitCallback = [coordinator, actionNotification, &commitCallback](
+ const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj) {
+ auto status = (!args.response.isOK()) ? args.response.status
+ : getStatusFromCommandResult(args.response.data);
+
+ if (status.isOK() || ErrorCodes::isVoteAbortError(status.code())) {
+ auto action = coordinator->recvCommitAck(shardId);
+ if (action != Action::kNone) {
+ actionNotification->set(action);
+ }
+ return;
+ }
+
+ LOG(3) << "Coordinator shard retrying " << commandObj << " against " << shardId;
+ sendAsyncCommandToShard(args.executor, shardId, commandObj, commitCallback);
+ };
+ sendCommandToShards(opCtx, nonAckedParticipants, commitObj, commitCallback);
+
+ return actionNotification->get(opCtx);
}
-void sendAbort(OperationContext* opCtx, const std::set<ShardId>& nonVotedAbortParticipants) {
+Action sendAbort(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& nonVotedAbortParticipants) {
// TODO (SERVER-36584) Use IDL to create command BSON.
- BSONObj abortObj = BSON(
- "abortTransaction" << 1 << "lsid" << opCtx->getLogicalSessionId()->toBSON() << "txnNumber"
- << *opCtx->getTxnNumber()
- << "autocommit"
- << false);
-
- sendAsyncCommandToShards(
- opCtx, nonVotedAbortParticipants, abortObj, [](Status, const ShardId&) {});
+ BSONObj abortObj =
+ BSON("abortTransaction" << 1 << "lsid" << lsid.toBSON() << "txnNumber" << txnNumber
+ << "autocommit"
+ << false);
+
+ auto actionNotification = std::make_shared<Notification<Action>>();
+
+ CallbackFn abortCallback;
+ abortCallback = [coordinator, actionNotification, &abortCallback](
+ const RemoteCommandCallbackArgs& args, const ShardId& shardId, const BSONObj& commandObj) {
+ auto status = (!args.response.isOK()) ? args.response.status
+ : getStatusFromCommandResult(args.response.data);
+
+ if (status.isOK() || ErrorCodes::isVoteAbortError(status.code())) {
+ auto action = coordinator->recvAbortAck(shardId);
+ if (action != Action::kNone) {
+ actionNotification->set(action);
+ }
+ return;
+ }
+
+ LOG(3) << "Coordinator shard retrying " << commandObj << " against " << shardId;
+ sendAsyncCommandToShard(args.executor, shardId, commandObj, abortCallback);
+
+ };
+ sendCommandToShards(opCtx, nonVotedAbortParticipants, abortObj, abortCallback);
+ return actionNotification->get(opCtx);
+}
+
+
+void launchCoordinateCommitTask(ThreadPool& threadPool,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ TransactionCoordinator::StateMachine::Action initialAction) {
+ auto ch = threadPool.schedule([coordinator, lsid, txnNumber, initialAction]() {
+ try {
+ // The opCtx destructor handles unsetting itself from the Client
+ auto opCtx = Client::getCurrent()->makeOperationContext();
+ driveCoordinatorUntilDone(opCtx.get(), coordinator, lsid, txnNumber, initialAction);
+ } catch (const DBException& e) {
+ log() << "Exception was thrown while coordinating commit: " << causedBy(e.toStatus());
+ }
+ });
}
} // namespace txn
diff --git a/src/mongo/db/transaction_coordinator_commands_impl.h b/src/mongo/db/transaction_coordinator_commands_impl.h
index 5a211e424fc..da679b30dde 100644
--- a/src/mongo/db/transaction_coordinator_commands_impl.h
+++ b/src/mongo/db/transaction_coordinator_commands_impl.h
@@ -31,25 +31,54 @@
#pragma once
#include "mongo/db/operation_context.h"
+#include "mongo/db/transaction_commit_decision_gen.h"
#include "mongo/db/transaction_coordinator.h"
namespace mongo {
+class ThreadPool;
+
namespace txn {
+void launchCoordinateCommitTask(ThreadPool& threadPool,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ TransactionCoordinator::StateMachine::Action initialAction);
+
+/**
+ * Schedules prepare to be sent asynchronously to all participants and blocks on being signaled that
+ * a voteAbort or the final voteCommit has been received.
+ */
+TransactionCoordinator::StateMachine::Action sendPrepare(
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& participants);
+
/**
- * Asynchronously sends commit to all participants provided and calls recvCommitAck on the
- * coordinator if the commit command succeeds.
+ * Schedules commit to be sent asynchronously to all participants and blocks on being signaled that
+ * the final commit ack has been received.
*/
-void sendCommit(OperationContext* opCtx,
- std::shared_ptr<TransactionCoordinator> coordinator,
- const std::set<ShardId>& nonAckedParticipants,
- Timestamp commitTimestamp);
+TransactionCoordinator::StateMachine::Action sendCommit(
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& nonAckedParticipants,
+ Timestamp commitTimestamp);
/**
- * Asynchronously sends abort to all participants provided.
+ * Schedules abort to be sent asynchronously to all participants and blocks on being signaled that
+ * the final abort ack has been received.
*/
-void sendAbort(OperationContext* opCtx, const std::set<ShardId>& nonVotedAbortParticipants);
+TransactionCoordinator::StateMachine::Action sendAbort(
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ const TxnNumber& txnNumber,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& nonVotedAbortParticipants);
} // namespace txn
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index a1084c1d7c8..7767775296d 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -38,41 +38,48 @@
#include "mongo/db/service_context.h"
#include "mongo/db/transaction_coordinator.h"
#include "mongo/db/transaction_coordinator_commands_impl.h"
-#include "mongo/executor/task_executor.h"
+#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/grid.h"
#include "mongo/s/shard_id.h"
#include "mongo/util/log.h"
namespace mongo {
namespace {
+
const auto transactionCoordinatorServiceDecoration =
ServiceContext::declareDecoration<TransactionCoordinatorService>();
-void doCoordinatorAction(OperationContext* opCtx,
- std::shared_ptr<TransactionCoordinator> coordinator,
- TransactionCoordinator::StateMachine::Action action) {
- switch (action) {
- case TransactionCoordinator::StateMachine::Action::kSendCommit: {
- txn::sendCommit(opCtx,
- coordinator,
- coordinator->getNonAckedCommitParticipants(),
- coordinator->getCommitTimestamp());
- break;
- }
- case TransactionCoordinator::StateMachine::Action::kSendAbort: {
- txn::sendAbort(opCtx, coordinator->getNonVotedAbortParticipants());
- break;
- }
- case TransactionCoordinator::StateMachine::Action::kNone:
- break;
- }
-}
+using Action = TransactionCoordinator::StateMachine::Action;
+using State = TransactionCoordinator::StateMachine::State;
+
+/**
+ * Constructs the default options for the thread pool used to run commit.
+ */
+ThreadPool::Options makeDefaultThreadPoolOptions() {
+ ThreadPool::Options options;
+ options.poolName = "TransactionCoordinatorService";
+ options.minThreads = 0;
+ options.maxThreads = 20;
+
+ // Ensure all threads have a client
+ options.onCreateThread = [](const std::string& threadName) {
+ Client::initThread(threadName.c_str());
+ };
+ return options;
}
+} // namespace
+
TransactionCoordinatorService::TransactionCoordinatorService()
- : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {}
+ : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()),
+ _threadPool(makeDefaultThreadPoolOptions()) {
+ _threadPool.startup();
+}
-TransactionCoordinatorService::~TransactionCoordinatorService() = default;
+void TransactionCoordinatorService::shutdown() {
+ _threadPool.shutdown();
+}
TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
@@ -100,72 +107,42 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
"Cannot start a new transaction with the same session ID and transaction "
"number as a transaction that has already begun two-phase commit.",
latestCoordinator->state() ==
- TransactionCoordinator::StateMachine::State::kWaitingForParticipantList);
+ TransactionCoordinator::StateMachine::State::kUninitialized);
return;
}
// Call tryAbort on previous coordinator.
- auto actionToTake = latestCoordinator.get()->recvTryAbort();
- doCoordinatorAction(opCtx, latestCoordinator, actionToTake);
+ latestCoordinator.get()->recvTryAbort();
}
_coordinatorCatalog->create(lsid, txnNumber);
// TODO (SERVER-37024): Schedule abort task on executor to execute at commitDeadline.
- // TODO (SERVER-37025): Schedule poke task on executor.
}
-Future<TransactionCoordinatorService::CommitDecision>
-TransactionCoordinatorService::coordinateCommit(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const std::set<ShardId>& participantList) {
+Future<TransactionCoordinator::CommitDecision> TransactionCoordinatorService::coordinateCommit(
+ OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::set<ShardId>& participantList) {
auto coordinator = _coordinatorCatalog->get(lsid, txnNumber);
if (!coordinator) {
- return TransactionCoordinatorService::CommitDecision::kAbort;
+ // TODO (SERVER-37440): Return decision "kForgotten", which indicates that a decision was
+ // already made and forgotten. The caller can recover the decision from the local
+ // participant if a higher transaction has not been started on the session and the session
+ // has not been reaped.
+ // Currently is MONGO_UNREACHABLE because no tests should cause the router to re-send
+ // coordinateCommitTransaction.
+ MONGO_UNREACHABLE;
}
- auto actionToTake = coordinator.get()->recvCoordinateCommit(participantList);
- doCoordinatorAction(opCtx, coordinator.get(), actionToTake);
-
- return coordinator.get()->waitForCompletion().then([](auto finalState) {
- switch (finalState) {
- case TransactionCoordinator::StateMachine::State::kAborted:
- return TransactionCoordinatorService::CommitDecision::kAbort;
- case TransactionCoordinator::StateMachine::State::kCommitted:
- return TransactionCoordinatorService::CommitDecision::kCommit;
- default:
- MONGO_UNREACHABLE;
- }
- });
-}
-
-void TransactionCoordinatorService::voteCommit(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const ShardId& shardId,
- Timestamp prepareTimestamp) {
- auto coordinator = _coordinatorCatalog->get(lsid, txnNumber);
- if (!coordinator) {
- txn::sendAbort(opCtx, {shardId});
- return;
+ Action initialAction = coordinator->recvCoordinateCommit(participantList);
+ if (initialAction != Action::kNone) {
+ txn::launchCoordinateCommitTask(_threadPool, coordinator, lsid, txnNumber, initialAction);
}
- auto actionToTake = coordinator.get()->recvVoteCommit(shardId, prepareTimestamp);
- doCoordinatorAction(opCtx, coordinator.get(), actionToTake);
-}
-
-void TransactionCoordinatorService::voteAbort(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const ShardId& shardId) {
- auto coordinator = _coordinatorCatalog->get(lsid, txnNumber);
-
- if (coordinator) {
- auto actionToTake = coordinator.get()->recvVoteAbort(shardId);
- doCoordinatorAction(opCtx, coordinator.get(), actionToTake);
- }
+ return coordinator.get()->waitForDecision();
}
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service.h b/src/mongo/db/transaction_coordinator_service.h
index 5eb8e6c7f66..c7c4ab77e46 100644
--- a/src/mongo/db/transaction_coordinator_service.h
+++ b/src/mongo/db/transaction_coordinator_service.h
@@ -34,7 +34,9 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/transaction_coordinator.h"
#include "mongo/db/transaction_coordinator_catalog.h"
+#include "mongo/util/concurrency/thread_pool.h"
#include "mongo/util/future.h"
namespace mongo {
@@ -47,13 +49,13 @@ class TransactionCoordinatorService final {
MONGO_DISALLOW_COPYING(TransactionCoordinatorService);
public:
- enum class CommitDecision {
- kCommit,
- kAbort,
- };
-
TransactionCoordinatorService();
- ~TransactionCoordinatorService();
+ ~TransactionCoordinatorService() = default;
+
+ /**
+ * Shuts down the thread pool used for executing commits.
+ */
+ void shutdown();
/**
* Retrieves the TransactionCoordinatorService associated with the service or operation context.
@@ -75,37 +77,17 @@ public:
* Delivers coordinateCommit to the TransactionCoordinator, asynchronously sends commit or
* abort to participants if necessary, and returns a Future that will contain the commit
* decision when the transaction finishes committing or aborting.
- *
- * TODO (SERVER-37364): On the commit path, this Future should instead be signaled as soon as
- * the coordinator is finished persisting the commit decision, rather than waiting until the
- * commit process has been completed entirely.
*/
- Future<CommitDecision> coordinateCommit(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const std::set<ShardId>& participantList);
-
- /**
- * Delivers voteCommit to the TransactionCoordinator and asynchronously sends commit or abort to
- * participants if necessary.
- */
- void voteCommit(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const ShardId& shardId,
- Timestamp prepareTimestamp);
-
- /**
- * Delivers voteAbort on the TransactionCoordinator and asynchronously sends commit or abort to
- * participants if necessary.
- */
- void voteAbort(OperationContext* opCtx,
- LogicalSessionId lsid,
- TxnNumber txnNumber,
- const ShardId& shardId);
+ Future<TransactionCoordinator::CommitDecision> coordinateCommit(
+ OperationContext* opCtx,
+ LogicalSessionId lsid,
+ TxnNumber txnNumber,
+ const std::set<ShardId>& participantList);
private:
std::shared_ptr<TransactionCoordinatorCatalog> _coordinatorCatalog;
+
+ ThreadPool _threadPool;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp
index 27d6a034821..3034e117bcc 100644
--- a/src/mongo/db/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/transaction_coordinator_service_test.cpp
@@ -1,4 +1,3 @@
-
/**
* Copyright (C) 2018-present MongoDB, Inc.
*
@@ -34,8 +33,10 @@
#include "mongo/client/remote_command_targeter_mock.h"
#include "mongo/db/commands/txn_cmds_gen.h"
+#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/session_catalog.h"
+#include "mongo/db/transaction_coordinator_commands_impl.h"
#include "mongo/db/transaction_coordinator_service.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
@@ -55,7 +56,9 @@ const std::set<ShardId> kThreeShardIdSet{{"s1"}, {"s2"}, {"s3"}};
const Timestamp kDummyTimestamp = Timestamp::min();
const Date_t kCommitDeadline = Date_t::max();
const StatusWith<BSONObj> kRetryableError = {ErrorCodes::HostUnreachable, ""};
+const StatusWith<BSONObj> kNoSuchTransaction = {ErrorCodes::NoSuchTransaction, ""};
const StatusWith<BSONObj> kOk = BSON("ok" << 1);
+const StatusWith<BSONObj> kPrepareOk = BSON("ok" << 1 << "prepareTimestamp" << Timestamp(1, 1));
HostAndPort makeHostAndPort(const ShardId& shardId) {
return HostAndPort(str::stream() << shardId << ":123");
@@ -66,9 +69,6 @@ public:
void setUp() override {
ShardServerTestFixture::setUp();
- operationContext()->setLogicalSessionId(_lsid);
- operationContext()->setTxnNumber(_txnNumber);
-
for (const auto& shardId : kThreeShardIdList) {
auto shardTargeter = RemoteCommandTargeterMock::get(
uassertStatusOK(shardRegistry()->getShard(operationContext(), shardId))
@@ -89,6 +89,14 @@ public:
});
}
+ void assertPrepareSentAndRespondWithSuccess() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kPrepareOk);
+ }
+
+ void assertPrepareSentAndRespondWithNoSuchTransaction() {
+ assertCommandSentAndRespondWith(PrepareTransaction::kCommandName, kNoSuchTransaction);
+ }
+
void assertAbortSentAndRespondWithSuccess() {
assertCommandSentAndRespondWith("abortTransaction", kOk);
}
@@ -125,9 +133,8 @@ public:
auto commitDecisionFuture = coordinatorService.coordinateCommit(
operationContext(), lsid, txnNumber, transactionParticipantShards);
- for (const auto& shardId : transactionParticipantShards) {
- coordinatorService.voteCommit(
- operationContext(), lsid, txnNumber, shardId, kDummyTimestamp);
+ for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
+ assertPrepareSentAndRespondWithSuccess();
}
for (size_t i = 0; i < transactionParticipantShards.size(); ++i) {
@@ -150,7 +157,13 @@ public:
auto commitDecisionFuture =
coordinatorService.coordinateCommit(operationContext(), lsid, txnNumber, shardIdSet);
- coordinatorService.voteAbort(operationContext(), lsid, txnNumber, abortingShard);
+ for (size_t i = 0; i < shardIdSet.size(); ++i) {
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ }
+
+ // Abort gets sent to the second participant as soon as the first participant
+ // receives a not-okay response to prepare.
+ assertAbortSentAndRespondWithSuccess();
// Wait for abort to complete.
commitDecisionFuture.get();
@@ -218,7 +231,6 @@ private:
std::unique_ptr<TransactionCoordinatorService> _coordinatorService;
};
-
} // namespace
TEST_F(TransactionCoordinatorServiceTest, CreateCoordinatorOnNewSessionSucceeds) {
@@ -253,31 +265,6 @@ TEST_F(TransactionCoordinatorServiceTest,
}
TEST_F(TransactionCoordinatorServiceTest,
- CreateCoordinatorWithHigherTxnNumberThanOngoingUncommittedTxnAbortsPreviousTxnAndSucceeds) {
-
- TransactionCoordinatorService coordinatorService;
- coordinatorService.createCoordinator(operationContext(), lsid(), txnNumber(), kCommitDeadline);
-
- // This is currently the only way we have to get the commit decision.
- auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
-
- // Create a coordinator for a higher transaction number in the same session.
- coordinatorService.createCoordinator(
- operationContext(), lsid(), txnNumber() + 1, kCommitDeadline);
-
- assertAbortSentAndRespondWithSuccess();
- assertAbortSentAndRespondWithSuccess();
-
- // We should have aborted the previous transaction.
- ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort));
-
- // Make sure the newly created one works fine.
- commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet);
-}
-
-TEST_F(TransactionCoordinatorServiceTest,
CreateCoordinatorWithHigherTxnNumberThanOngoingCommittingTxnCommitsPreviousTxnAndSucceeds) {
TransactionCoordinatorService coordinatorService;
@@ -287,16 +274,18 @@ TEST_F(TransactionCoordinatorServiceTest,
// commit acks.
auto oldTxnCommitDecisionFuture = coordinatorService.coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- coordinatorService.voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
- coordinatorService.voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
+
+ // Simulate all participants acking prepare/voting to commit.
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
// Create a coordinator for a higher transaction number in the same session. This should
// "tryAbort" on the old coordinator which should NOT abort it since it's already waiting for
// commit acks.
coordinatorService.createCoordinator(
operationContext(), lsid(), txnNumber() + 1, kCommitDeadline);
+ auto newTxnCommitDecisionFuture = coordinatorService.coordinateCommit(
+ operationContext(), lsid(), txnNumber() + 1, kTwoShardIdSet);
// Finish committing the old transaction by sending it commit acks from both participants.
assertCommitSentAndRespondWithSuccess();
@@ -304,35 +293,44 @@ TEST_F(TransactionCoordinatorServiceTest,
// The old transaction should now be committed.
ASSERT_EQ(static_cast<int>(oldTxnCommitDecisionFuture.get()),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kCommit));
+ static_cast<int>(TransactionCoordinator::CommitDecision::kCommit));
// Make sure the newly created one works fine too.
- commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet);
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+ assertCommitSentAndRespondWithSuccess();
+ // commitTransaction(coordinatorService, lsid(), txnNumber() + 1, kTwoShardIdSet);
}
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- CoordinateCommitWithNoVotesReturnsNotReadyFuture) {
+ CoordinateCommitReturnsCorrectCommitDecisionOnAbort) {
auto commitDecisionFuture = coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- ASSERT_FALSE(commitDecisionFuture.isReady());
- // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
- abortTransaction(
- *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[0]);
+ // Simulate a participant voting to abort.
+ assertPrepareSentAndRespondWithNoSuchTransaction();
+ assertPrepareSentAndRespondWithSuccess();
+
+ // Only send abort to the node that voted to commit.
+ assertAbortSentAndRespondWithSuccess();
+
+ auto commitDecision = commitDecisionFuture.get();
+ ASSERT_EQ(static_cast<int>(commitDecision),
+ static_cast<int>(TransactionCoordinator::CommitDecision::kAbort));
}
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- CoordinateCommitReturnsCorrectCommitDecisionOnAbort) {
+ CoordinateCommitWithNoVotesReturnsNotReadyFuture) {
auto commitDecisionFuture = coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]);
-
- auto commitDecision = commitDecisionFuture.get();
- ASSERT_EQ(static_cast<int>(commitDecision),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort));
+ ASSERT_FALSE(commitDecisionFuture.isReady());
+ // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
+ abortTransaction(
+ *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[0]);
}
TEST_F(TransactionCoordinatorServiceTestSingleTxn,
@@ -341,31 +339,14 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
auto commitDecisionFuture = coordinatorService()->coordinateCommit(
operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
-
+ assertPrepareSentAndRespondWithSuccess();
+ assertPrepareSentAndRespondWithSuccess();
assertCommitSentAndRespondWithSuccess();
assertCommitSentAndRespondWithSuccess();
auto commitDecision = commitDecisionFuture.get();
ASSERT_EQ(static_cast<int>(commitDecision),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kCommit));
-}
-
-TEST_F(TransactionCoordinatorServiceTest,
- CoordinateCommitReturnsAbortDecisionWhenCoordinatorDoesNotExist) {
-
- TransactionCoordinatorService coordinatorService;
- auto commitDecisionFuture = coordinatorService.coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
- ASSERT_TRUE(commitDecisionFuture.isReady());
-
- auto commitDecision = commitDecisionFuture.get();
- ASSERT_EQ(static_cast<int>(commitDecision),
- static_cast<int>(TransactionCoordinatorService::CommitDecision::kAbort));
+ static_cast<int>(TransactionCoordinator::CommitDecision::kCommit));
}
TEST_F(TransactionCoordinatorServiceTest,
@@ -409,152 +390,4 @@ TEST_F(TransactionCoordinatorServiceTestSingleTxn,
static_cast<int>(commitDecisionFuture2.get()));
}
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteCommitDoesNotSendCommitIfParticipantListNotYetReceived) {
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- assertNoMessageSent();
- // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
- abortTransaction(
- *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[1]);
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- ResentVoteCommitDoesNotSendCommitIfParticipantListNotYetReceived) {
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- assertNoMessageSent();
-
- // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
- abortTransaction(
- *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[1]);
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- ResentVoteCommitDoesNotSendCommitIfParticipantListHasBeenReceived) {
-
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- assertNoMessageSent();
-
- // To prevent invariant failure in TransactionCoordinator that all futures have been completed.
- abortTransaction(
- *coordinatorService(), lsid(), txnNumber(), kTwoShardIdSet, kTwoShardIdList[1]);
- commitDecisionFuture.get();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn, FinalVoteCommitSendsCommit) {
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
-
- assertCommitSentAndRespondWithSuccess();
- assertCommitSentAndRespondWithSuccess();
-}
-
-// This logic is obviously correct for a transaction which has been aborted prior to receiving
-// coordinateCommit, when the coordinator does not yet know all participants and so cannot send
-// abortTransaction to all participants. In this case, it can potentially receive voteCommit
-// messages from some participants even after the local TransactionCoordinator object has
-// transitioned to the aborted state and then removed from the service. We then must tell
-// the participant that sent the voteCommit message that it should abort.
-//
-// More subtly, it also works for voteCommit retries for transactions that have already committed,
-// because we'll send abort to the participant, and the abort command will just receive
-// NoSuchTransaction or TransactionTooOld (because the participant must have already committed if
-// the transaction coordinator finished committing).
-TEST_F(TransactionCoordinatorServiceTest,
- VoteCommitForCoordinatorThatDoesNotExistSendsVoteAbortToCallingParticipant) {
-
- TransactionCoordinatorService coordinatorService;
- coordinatorService.voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- assertAbortSentAndRespondWithSuccess();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- ResentFinalVoteCommitOnlySendsCommitToNonAckedParticipants) {
-
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdSet);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
-
- assertCommitSentAndRespondWithSuccess();
- assertCommitSentAndRespondWithRetryableError();
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[1], kDummyTimestamp);
-
- assertCommitSentAndRespondWithSuccess();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteAbortDoesNotSendAbortIfIsOnlyVoteReceivedSoFar) {
-
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]);
-
- assertNoMessageSent();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteAbortForCoordinatorThatDoesNotExistDoesNotSendAbort) {
-
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]);
- // Coordinator no longer exists.
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[0]);
-
- assertNoMessageSent();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteAbortSendsAbortIfSomeParticipantsHaveVotedCommit) {
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kTwoShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kTwoShardIdList[1]);
-
- // This should be sent to the shard that voted commit (s1).
- assertAbortSentAndRespondWithSuccess();
-}
-
-TEST_F(TransactionCoordinatorServiceTestSingleTxn,
- VoteAbortAfterReceivingParticipantListSendsAbortToAllParticipantsWhoHaventVotedAbort) {
-
- auto commitDecisionFuture = coordinatorService()->coordinateCommit(
- operationContext(), lsid(), txnNumber(), kThreeShardIdSet);
-
- coordinatorService()->voteCommit(
- operationContext(), lsid(), txnNumber(), kThreeShardIdList[0], kDummyTimestamp);
-
- coordinatorService()->voteAbort(operationContext(), lsid(), txnNumber(), kThreeShardIdList[1]);
-
- // Should send abort to shards s1 and s3 (the ones that did not vote abort).
- assertAbortSentAndRespondWithSuccess();
- assertAbortSentAndRespondWithSuccess();
-}
-
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_state_machine_test.cpp b/src/mongo/db/transaction_coordinator_state_machine_test.cpp
index 4fe4a0fcac2..da97c29f0da 100644
--- a/src/mongo/db/transaction_coordinator_state_machine_test.cpp
+++ b/src/mongo/db/transaction_coordinator_state_machine_test.cpp
@@ -70,15 +70,49 @@ void expectScheduleThrows(Schedule schedule) {
ASSERT_EQ(State::kBroken, coordinator.state());
}
+
+void doCommit(StateMachine& coordinator) {
+ runSchedule(coordinator,
+ {Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvFinalVoteCommit,
+ Event::kMadeCommitDecisionDurable,
+ Event::kRecvFinalCommitAck});
+}
+
+void doAbort(StateMachine& coordinator) {
+ runSchedule(coordinator,
+ {Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvVoteAbort,
+ Event::kMadeAbortDecisionDurable,
+ Event::kRecvFinalAbortAck});
+}
+
TEST(CoordinatorStateMachine, AbortSucceeds) {
- expectScheduleSucceeds({Event::kRecvVoteAbort}, State::kAborted);
- expectScheduleSucceeds({Event::kRecvVoteAbort, Event::kRecvVoteAbort}, State::kAborted);
+ expectScheduleSucceeds({Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvVoteAbort,
+ Event::kMadeAbortDecisionDurable,
+ Event::kRecvFinalAbortAck},
+ State::kAborted);
+ // Check that it's okay to receive two vote aborts.
+ expectScheduleSucceeds({Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvVoteAbort,
+ Event::kRecvVoteAbort,
+ Event::kMadeAbortDecisionDurable,
+ Event::kRecvFinalAbortAck},
+ State::kAborted);
}
TEST(CoordinatorStateMachine, CommitSucceeds) {
- expectScheduleSucceeds(
- {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit, Event::kRecvFinalCommitAck},
- State::kCommitted);
+ expectScheduleSucceeds({Event::kRecvParticipantList,
+ Event::kMadeParticipantListDurable,
+ Event::kRecvFinalVoteCommit,
+ Event::kMadeCommitDecisionDurable,
+ Event::kRecvFinalCommitAck},
+ State::kCommitted);
}
TEST(CoordinatorStateMachine, RecvFinalVoteCommitAndRecvVoteAbortThrows) {
@@ -90,7 +124,7 @@ TEST(CoordinatorStateMachine, RecvFinalVoteCommitAndRecvVoteAbortThrows) {
TEST(CoordinatorStateMachine, WaitForTransitionToOnlyTerminalStatesReturnsCorrectStateOnAbort) {
StateMachine coordinator;
auto future = coordinator.waitForTransitionTo({State::kCommitted, State::kAborted});
- runSchedule(coordinator, {Event::kRecvVoteAbort});
+ doAbort(coordinator);
ASSERT_EQ(future.get(), State::kAborted);
}
@@ -100,15 +134,13 @@ TEST(CoordinatorStateMachine, WaitForTransitionToStatesThatHaventBeenReachedRetu
ASSERT_FALSE(future.isReady());
// We need to abort here because we require that all promises are triggered prior to coordinator
// destruction.
- runSchedule(coordinator, {Event::kRecvVoteAbort});
+ doAbort(coordinator);
}
TEST(CoordinatorStateMachine, WaitForTransitionToOnlyTerminalStatesReturnsCorrectStateOnCommit) {
StateMachine coordinator;
auto future = coordinator.waitForTransitionTo({State::kCommitted, State::kAborted});
- runSchedule(
- coordinator,
- {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit, Event::kRecvFinalCommitAck});
+ doCommit(coordinator);
ASSERT_EQ(future.get(), State::kCommitted);
}
@@ -117,8 +149,9 @@ TEST(CoordinatorStateMachine,
StateMachine coordinator;
runSchedule(coordinator, {Event::kRecvParticipantList});
auto future = coordinator.waitForTransitionTo(
- {State::kWaitingForVotes, State::kCommitted, State::kAborted});
- ASSERT_EQ(future.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes);
+ {State::kMakingParticipantListDurable, State::kCommitted, State::kAborted});
+ ASSERT_EQ(future.get(),
+ TransactionCoordinator::StateMachine::State::kMakingParticipantListDurable);
}
TEST(CoordinatorStateMachine, WaitForTransitionToMultipleStatesReturnsFirstStateToBeHit) {
@@ -128,7 +161,7 @@ TEST(CoordinatorStateMachine, WaitForTransitionToMultipleStatesReturnsFirstState
State::kCommitted,
State::kAborted});
- runSchedule(coordinator, {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit});
+ doCommit(coordinator);
ASSERT_EQ(future.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes);
}
@@ -140,7 +173,7 @@ TEST(CoordinatorStateMachine,
{State::kWaitingForVotes, State::kCommitted, State::kAborted});
auto future2 = coordinator.waitForTransitionTo(
{State::kWaitingForCommitAcks, State::kCommitted, State::kAborted});
- runSchedule(coordinator, {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit});
+ doCommit(coordinator);
ASSERT_EQ(future1.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes);
ASSERT_EQ(future2.get(), TransactionCoordinator::StateMachine::State::kWaitingForCommitAcks);
@@ -163,7 +196,7 @@ TEST(CoordinatorStateMachine,
coordinator.waitForTransitionTo(
{State::kWaitingForCommitAcks, State::kCommitted, State::kAborted})};
- runSchedule(coordinator, {Event::kRecvParticipantList, Event::kRecvFinalVoteCommit});
+ doCommit(coordinator);
for (auto& future1 : futures1) {
ASSERT_EQ(future1.get(), TransactionCoordinator::StateMachine::State::kWaitingForVotes);
diff --git a/src/mongo/db/transaction_coordinator_test.cpp b/src/mongo/db/transaction_coordinator_test.cpp
index 6f154b3268b..458ff30244e 100644
--- a/src/mongo/db/transaction_coordinator_test.cpp
+++ b/src/mongo/db/transaction_coordinator_test.cpp
@@ -34,135 +34,107 @@
#include <future>
+#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
+namespace {
+
using State = TransactionCoordinator::StateMachine::State;
+using Coordinator = ServiceContextMongoDTest;
const Timestamp dummyTimestamp = Timestamp::min();
-TEST(Coordinator, SomeParticipantVotesAbortLeadsToAbort) {
- TransactionCoordinator coordinator;
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteAbort(ShardId("shard0000"));
- coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- ASSERT_EQ(State::kAborted, coordinator.state());
+void doCommit(TransactionCoordinator& coordinator) {
+ coordinator.recvCoordinateCommit({ShardId("shard0000")});
+ coordinator.madeParticipantListDurable();
+ coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
+ coordinator.madeCommitDecisionDurable();
+ coordinator.recvCommitAck(ShardId("shard0000"));
}
-TEST(Coordinator, SomeParticipantsVoteAbortBeforeCoordinatorReceivesParticipantListLeadsToAbort) {
- TransactionCoordinator coordinator;
+void doAbort(TransactionCoordinator& coordinator) {
+ coordinator.recvCoordinateCommit({ShardId("shard0000")});
+ coordinator.madeParticipantListDurable();
coordinator.recvVoteAbort(ShardId("shard0000"));
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- ASSERT_EQ(State::kAborted, coordinator.state());
+ coordinator.madeAbortDecisionDurable();
+ coordinator.recvAbortAck(ShardId("shard0000"));
+}
}
-TEST(Coordinator, AllParticipantsVoteCommitLeadsToCommit) {
+TEST_F(Coordinator, SomeParticipantVotesAbortLeadsToAbort) {
TransactionCoordinator coordinator;
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
+ coordinator.madeParticipantListDurable();
+ coordinator.recvVoteAbort(ShardId("shard0000"));
coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- coordinator.recvCommitAck(ShardId("shard0000"));
- coordinator.recvCommitAck(ShardId("shard0001"));
- ASSERT_EQ(State::kCommitted, coordinator.state());
+ coordinator.madeAbortDecisionDurable();
+ coordinator.recvAbortAck(ShardId("shard0001"));
+ ASSERT_EQ(State::kAborted, coordinator.state());
}
-TEST(
- Coordinator,
- AllParticipantsVoteCommitSomeParticipantsVoteBeforeCoordinatorReceivesParticipantListLeadsToCommit) {
+TEST_F(Coordinator, AllParticipantsVoteCommitLeadsToCommit) {
TransactionCoordinator coordinator;
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
+ coordinator.madeParticipantListDurable();
+ coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
+ coordinator.madeCommitDecisionDurable();
coordinator.recvCommitAck(ShardId("shard0000"));
coordinator.recvCommitAck(ShardId("shard0001"));
ASSERT_EQ(State::kCommitted, coordinator.state());
}
-TEST(Coordinator, NotHearingSomeParticipantsVoteOtherParticipantsVotedCommitLeadsToStillWaiting) {
+TEST_F(Coordinator, NotHearingSomeParticipantsVoteOtherParticipantsVotedCommitLeadsToStillWaiting) {
TransactionCoordinator coordinator;
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
+ coordinator.madeParticipantListDurable();
coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
ASSERT_EQ(State::kWaitingForVotes, coordinator.state());
}
-TEST(Coordinator, NotHearingSomeParticipantsVoteAnotherParticipantVotedAbortLeadsToAbort) {
+TEST_F(Coordinator, NotHearingSomeParticipantsVoteAnotherParticipantVotedAbortLeadsToAbort) {
TransactionCoordinator coordinator;
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
+ coordinator.madeParticipantListDurable();
coordinator.recvVoteAbort(ShardId("shard0000"));
- ASSERT_EQ(State::kAborted, coordinator.state());
+ ASSERT_EQ(State::kMakingAbortDecisionDurable, coordinator.state());
}
-TEST(Coordinator, NotHearingSomeParticipantsCommitAckLeadsToStillWaiting) {
+TEST_F(Coordinator, NotHearingSomeParticipantsCommitAckLeadsToStillWaiting) {
TransactionCoordinator coordinator;
coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
+ coordinator.madeParticipantListDurable();
coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
+ coordinator.madeCommitDecisionDurable();
coordinator.recvCommitAck(ShardId("shard0000"));
ASSERT_EQ(State::kWaitingForCommitAcks, coordinator.state());
}
-TEST(Coordinator, TryAbortWhileWaitingForParticipantListSuccessfullyAborts) {
- TransactionCoordinator coordinator;
- coordinator.recvTryAbort();
- ASSERT_EQ(State::kAborted, coordinator.state());
-}
-
-TEST(Coordinator, TryAbortWhileWaitingForVotesSuccessfullyAborts) {
- TransactionCoordinator coordinator;
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
- coordinator.recvTryAbort();
- ASSERT_EQ(State::kAborted, coordinator.state());
-}
-
-TEST(Coordinator, TryAbortWhileWaitingForCommitAcksDoesNotCancelCommit) {
- TransactionCoordinator coordinator;
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
- coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- ASSERT_EQ(State::kWaitingForCommitAcks, coordinator.state());
- coordinator.recvTryAbort();
- ASSERT_EQ(State::kWaitingForCommitAcks, coordinator.state());
- coordinator.recvCommitAck(ShardId("shard0000"));
- coordinator.recvCommitAck(ShardId("shard0001"));
- ASSERT_EQ(State::kCommitted, coordinator.state());
-}
-
-TEST(Coordinator, VoteCommitToAbortedCoordinatorRespondsWithAbort) {
- TransactionCoordinator coordinator;
- coordinator.recvCoordinateCommit({ShardId("shard0000"), ShardId("shard0001")});
- coordinator.recvVoteAbort(ShardId("shard0000"));
- ASSERT_EQ(State::kAborted, coordinator.state());
- auto action = coordinator.recvVoteCommit(ShardId("shard0001"), dummyTimestamp);
- ASSERT_EQ(TransactionCoordinator::StateMachine::Action::kSendAbort, action);
-}
-
-TEST(Coordinator, WaitForCompletionReturnsOnChangeToCommitted) {
+TEST_F(Coordinator, WaitForCompletionReturnsOnChangeToCommitted) {
TransactionCoordinator coordinator;
auto future = coordinator.waitForCompletion();
- coordinator.recvCoordinateCommit({ShardId("shard0000")});
- coordinator.recvVoteCommit(ShardId("shard0000"), dummyTimestamp);
- coordinator.recvCommitAck(ShardId("shard0000"));
+ doCommit(coordinator);
auto finalState = future.get();
ASSERT_EQ(finalState, TransactionCoordinator::StateMachine::State::kCommitted);
}
-TEST(Coordinator, WaitForCompletionReturnsOnChangeToAborted) {
+TEST_F(Coordinator, WaitForCompletionReturnsOnChangeToAborted) {
TransactionCoordinator coordinator;
auto future = coordinator.waitForCompletion();
- coordinator.recvVoteAbort(ShardId("shard0000"));
+ doAbort(coordinator);
auto finalState = future.get();
ASSERT_EQ(finalState, TransactionCoordinator::StateMachine::State::kAborted);
}
-TEST(Coordinator, RepeatedCallsToWaitForCompletionAllReturn) {
+TEST_F(Coordinator, RepeatedCallsToWaitForCompletionAllReturn) {
TransactionCoordinator coordinator;
auto futures = {coordinator.waitForCompletion(),
coordinator.waitForCompletion(),
coordinator.waitForCompletion()};
- coordinator.recvVoteAbort(ShardId("shard0000"));
+ doAbort(coordinator);
for (auto& future : futures) {
auto finalState = future.get();
@@ -170,9 +142,9 @@ TEST(Coordinator, RepeatedCallsToWaitForCompletionAllReturn) {
}
}
-TEST(Coordinator, CallingWaitForCompletionAfterAlreadyCompleteReturns) {
+TEST_F(Coordinator, CallingWaitForCompletionAfterAlreadyCompleteReturns) {
TransactionCoordinator coordinator;
- coordinator.recvVoteAbort(ShardId("shard0000"));
+ doAbort(coordinator);
auto future = coordinator.waitForCompletion();
auto finalState = future.get();
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 07217ec089f..5b6ed2d3c02 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -28,7 +28,7 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kSharding
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kTransaction
#include "mongo/platform/basic.h"
@@ -614,47 +614,23 @@ Shard::CommandResponse TransactionRouter::_commitSingleShardTransaction(Operatio
Shard::CommandResponse TransactionRouter::_commitMultiShardTransaction(OperationContext* opCtx) {
invariant(_coordinatorId);
-
- auto shardRegistry = Grid::get(opCtx)->shardRegistry();
-
- PrepareTransaction prepareCmd;
- prepareCmd.setDbName("admin");
- prepareCmd.setCoordinatorId(*_coordinatorId);
-
- auto prepareCmdObj = prepareCmd.toBSON(
- BSON(WriteConcernOptions::kWriteConcernField << WriteConcernOptions::Majority));
+ auto coordinatorIter = _participants.find(*_coordinatorId);
+ invariant(coordinatorIter != _participants.end());
std::vector<CommitParticipant> participantList;
for (const auto& participantEntry : _participants) {
- ShardId shardId(participantEntry.first);
-
CommitParticipant commitParticipant;
- commitParticipant.setShardId(shardId);
+ commitParticipant.setShardId(participantEntry.first);
participantList.push_back(std::move(commitParticipant));
-
- if (participantEntry.second.isCoordinator()) {
- // coordinateCommit is sent to participant that is also a coordinator.
- invariant(shardId == *_coordinatorId);
- continue;
- }
-
- const auto& participant = participantEntry.second;
- auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
- shard->runFireAndForgetCommand(opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- participant.attachTxnFieldsIfNeeded(prepareCmdObj, false));
}
- auto coordinatorShard = uassertStatusOK(shardRegistry->getShard(opCtx, *_coordinatorId));
+ auto coordinatorShard =
+ uassertStatusOK(Grid::get(opCtx)->shardRegistry()->getShard(opCtx, *_coordinatorId));
CoordinateCommitTransaction coordinateCommitCmd;
coordinateCommitCmd.setDbName("admin");
coordinateCommitCmd.setParticipants(participantList);
- auto coordinatorIter = _participants.find(*_coordinatorId);
- invariant(coordinatorIter != _participants.end());
-
return uassertStatusOK(coordinatorShard->runCommandWithFixedRetryAttempts(
opCtx,
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 889c0c5ceed..bc14fb5b326 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -657,7 +657,7 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) {
future.timed_get(kFutureTimeout);
}
-TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipants) {
+TEST_F(TransactionRouterTest, SendCoordinateCommitForMultipleParticipants) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
@@ -676,21 +676,6 @@ TEST_F(TransactionRouterTest, SendPrepareAndCoordinateCommitForMultipleParticipa
auto future = launchAsync([&] { txnRouter->commitTransaction(operationContext()); });
onCommand([&](const RemoteCommandRequest& request) {
- ASSERT_EQ(hostAndPort2, request.target);
- ASSERT_EQ("admin", request.dbname);
-
- auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
- ASSERT_EQ(cmdName, "prepareTransaction");
-
- auto coordinator = request.cmdObj["coordinatorId"].str();
- ASSERT_EQ(shard1.toString(), coordinator);
-
- checkSessionDetails(request.cmdObj, lsid, txnNum, boost::none);
-
- return BSON("ok" << 1);
- });
-
- onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
ASSERT_EQ("admin", request.dbname);
diff --git a/src/mongo/shell/servers.js b/src/mongo/shell/servers.js
index 2c30b10e632..2119e5cef2f 100644
--- a/src/mongo/shell/servers.js
+++ b/src/mongo/shell/servers.js
@@ -1121,16 +1121,6 @@ var MongoRunner, _startMongod, startMongoProgram, runMongoProgram, startMongoPro
}
}
- // New mongod-specific options in 4.1.x.
- if (!programMajorMinorVersion || programMajorMinorVersion >= 410) {
- if (jsTest.options().setSkipShardingPartsOfPrepareTransactionFailpoint &&
- jsTest.options().enableTestCommands) {
- argArray.push(
- ...["--setParameter",
- "failpoint.skipShardingPartsOfPrepareTransaction={mode:'alwaysOn'}"]);
- }
- }
-
// New mongod-specific options in 4.0.x
if (!programMajorMinorVersion || programMajorMinorVersion >= 400) {
if (jsTest.options().transactionLifetimeLimitSeconds !== undefined) {
diff --git a/src/mongo/shell/utils.js b/src/mongo/shell/utils.js
index e6477b0fa10..a98de90cdc1 100644
--- a/src/mongo/shell/utils.js
+++ b/src/mongo/shell/utils.js
@@ -318,8 +318,6 @@ jsTestOptions = function() {
mqlTestFile: TestData.mqlTestFile,
mqlRootPath: TestData.mqlRootPath,
disableImplicitSessions: TestData.disableImplicitSessions || false,
- setSkipShardingPartsOfPrepareTransactionFailpoint:
- TestData.setSkipShardingPartsOfPrepareTransactionFailpoint || false,
});
}
return _jsTestOptions;