diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2018-09-06 18:31:03 -0400 |
---|---|---|
committer | Matthew Saltz <matthew.saltz@mongodb.com> | 2018-09-12 10:09:24 -0400 |
commit | 0d88ca3610e05e9a524aefc8d7b6d9911288f484 (patch) | |
tree | 7b23d70d9aae0e320f1cd0b798a35b5d169dad3e | |
parent | c9367644543c2734cb2e0449d7f735adfc370890 (diff) | |
download | mongo-0d88ca3610e05e9a524aefc8d7b6d9911288f484.tar.gz |
SERVER-36852 Move TransactionCoordinator off of Session and into TransactionCoordinatorService
-rw-r--r-- | src/mongo/db/operation_context_session_mongod.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/s/txn_two_phase_commit_cmds.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator.cpp | 20 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator.h | 5 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog.h | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_catalog_test.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_commands_impl.cpp | 71 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_commands_impl.h | 13 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_commands_impl_test.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.cpp | 18 |
11 files changed, 116 insertions, 127 deletions
diff --git a/src/mongo/db/operation_context_session_mongod.cpp b/src/mongo/db/operation_context_session_mongod.cpp index 96c15c6f58a..eae70542f0b 100644 --- a/src/mongo/db/operation_context_session_mongod.cpp +++ b/src/mongo/db/operation_context_session_mongod.cpp @@ -29,6 +29,7 @@ #include "mongo/db/operation_context_session_mongod.h" #include "mongo/db/transaction_coordinator.h" +#include "mongo/db/transaction_coordinator_service.h" #include "mongo/db/transaction_participant.h" namespace mongo { @@ -48,18 +49,16 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o session->beginOrContinueTxn(opCtx, clientTxnNumber); if (startTransaction && *startTransaction) { - // If this shard has been selected as the coordinator, set up the coordinator state to - // be ready to receive votes. + auto clientLsid = opCtx->getLogicalSessionId().get(); + auto clockSource = opCtx->getServiceContext()->getFastClockSource(); + + // If this shard has been selected as the coordinator, set up the coordinator state + // to be ready to receive votes. if (coordinator && *coordinator) { - // TODO: Once shards support multiple active coordinators per session, instead of - // resetting the previous TransactionCoordinator, simply create a new - // TransactionCoordinator and push it on the queue of active coordinators for this - // session. - // Note: Until the above TODO is done, starting a new transaction before receiving - // the decision for the previous transaction can corrupt the previous transaction. - auto& txnCoordinator = TransactionCoordinator::get(opCtx); - txnCoordinator.reset(); - TransactionCoordinator::create(session); + TransactionCoordinatorService::get(opCtx)->createCoordinator( + clientLsid, + clientTxnNumber, + clockSource->now() + Seconds(transactionLifetimeLimitSeconds.load())); } } diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index 7256ff443e8..82c1125ae41 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -34,7 +34,7 @@ #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/sharding_state.h" -#include "mongo/db/transaction_coordinator_commands_impl.h" +#include "mongo/db/transaction_coordinator_service.h" #include "mongo/db/transaction_participant.h" namespace mongo { @@ -133,8 +133,12 @@ public: const auto& cmd = request(); - txn::recvVoteCommit( - opCtx, cmd.getShardId(), 0 /* TODO (SERVER-36584) pass real prepareTimestamp */); + TransactionCoordinatorService::get(opCtx)->voteCommit( + opCtx, + opCtx->getLogicalSessionId().get(), + opCtx->getTxnNumber().get(), + cmd.getShardId(), + 0 /* TODO (SERVER-36584) pass real prepareTimestamp */); } private: @@ -183,7 +187,10 @@ public: const auto& cmd = request(); - txn::recvVoteAbort(opCtx, cmd.getShardId()); + TransactionCoordinatorService::get(opCtx)->voteAbort(opCtx, + opCtx->getLogicalSessionId().get(), + opCtx->getTxnNumber().get(), + cmd.getShardId()); } private: @@ -246,7 +253,11 @@ public: participantList.insert(shardId); } - txn::recvCoordinateCommit(opCtx, participantList); + TransactionCoordinatorService::get(opCtx)->coordinateCommit( + opCtx, + opCtx->getLogicalSessionId().get(), + opCtx->getTxnNumber().get(), + participantList); } private: diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp index 38c4eae4783..d8d014322b7 100644 --- a/src/mongo/db/transaction_coordinator.cpp +++ b/src/mongo/db/transaction_coordinator.cpp @@ -41,27 +41,14 @@ using Action = TransactionCoordinator::StateMachine::Action; using Event = TransactionCoordinator::StateMachine::Event; using State = TransactionCoordinator::StateMachine::State; -namespace { -const Session::Decoration<boost::optional<TransactionCoordinator>> getTransactionCoordinator = - Session::declareDecoration<boost::optional<TransactionCoordinator>>(); -} // namespace - -boost::optional<TransactionCoordinator>& TransactionCoordinator::get(OperationContext* opCtx) { - auto session = OperationContextSession::get(opCtx); - return getTransactionCoordinator(session); -} - -void TransactionCoordinator::create(Session* session) { - invariant(!getTransactionCoordinator(session)); - getTransactionCoordinator(session).emplace(); -} - Action TransactionCoordinator::recvCoordinateCommit(const std::set<ShardId>& participants) { + stdx::lock_guard<decltype(_mtx)> lk(_mtx); _participantList.recordFullList(participants); return _stateMachine.onEvent(Event::kRecvParticipantList); } Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, int prepareTimestamp) { + stdx::lock_guard<decltype(_mtx)> lk(_mtx); _participantList.recordVoteCommit(shardId, prepareTimestamp); auto event = (_participantList.allParticipantsVotedCommit()) ? Event::kRecvFinalVoteCommit @@ -70,11 +57,13 @@ Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, int prepar } Action TransactionCoordinator::recvVoteAbort(const ShardId& shardId) { + stdx::lock_guard<decltype(_mtx)> lk(_mtx); _participantList.recordVoteAbort(shardId); return _stateMachine.onEvent(Event::kRecvVoteAbort); } void TransactionCoordinator::recvCommitAck(const ShardId& shardId) { + stdx::lock_guard<decltype(_mtx)> lk(_mtx); _participantList.recordCommitAck(shardId); if (_participantList.allParticipantsAckedCommit()) { _stateMachine.onEvent(Event::kRecvFinalCommitAck); @@ -82,6 +71,7 @@ void TransactionCoordinator::recvCommitAck(const ShardId& shardId) { } void TransactionCoordinator::recvAbortAck(const ShardId& shardId) { + stdx::lock_guard<decltype(_mtx)> lk(_mtx); _participantList.recordAbortAck(shardId); if (_participantList.allParticipantsAckedAbort()) { _stateMachine.onEvent(Event::kRecvFinalAbortAck); diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h index dc07fbce0ea..a587ef34b69 100644 --- a/src/mongo/db/transaction_coordinator.h +++ b/src/mongo/db/transaction_coordinator.h @@ -34,6 +34,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/s/shard_id.h" +#include "mongo/stdx/mutex.h" #include "mongo/util/assert_util.h" #include "mongo/util/decorable.h" #include "mongo/util/mongoutils/str.h" @@ -54,9 +55,6 @@ public: TransactionCoordinator() = default; ~TransactionCoordinator() = default; - static boost::optional<TransactionCoordinator>& get(OperationContext* opCtx); - static void create(Session* session); - /** * The internal state machine, or "brain", used by the TransactionCoordinator to determine what * to do in response to an "event" (receiving a request or hearing back a response). @@ -202,6 +200,7 @@ public: }; private: + stdx::mutex _mtx; ParticipantList _participantList; StateMachine _stateMachine; }; diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp index b36f240305b..4257dc212ec 100644 --- a/src/mongo/db/transaction_coordinator_catalog.cpp +++ b/src/mongo/db/transaction_coordinator_catalog.cpp @@ -45,7 +45,7 @@ TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() = default; std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::create(LogicalSessionId lsid, TxnNumber txnNumber) { - stdx::lock_guard<decltype(_mtx)> lk(_mtx); + stdx::lock_guard<stdx::mutex> lk(_mutex); // Create a new map for the session if it does not exist if (_coordinatorsBySession.find(lsid) == _coordinatorsBySession.end()) { @@ -68,7 +68,7 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::create(Lo boost::optional<std::shared_ptr<TransactionCoordinator>> TransactionCoordinatorCatalog::get( LogicalSessionId lsid, TxnNumber txnNumber) { - stdx::lock_guard<decltype(_mtx)> lk(_mtx); + stdx::lock_guard<stdx::mutex> lk(_mutex); const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid); @@ -89,7 +89,7 @@ boost::optional<std::shared_ptr<TransactionCoordinator>> TransactionCoordinatorC boost::optional<std::pair<TxnNumber, std::shared_ptr<TransactionCoordinator>>> TransactionCoordinatorCatalog::getLatestOnSession(LogicalSessionId lsid) { - stdx::lock_guard<decltype(_mtx)> lk(_mtx); + stdx::lock_guard<stdx::mutex> lk(_mutex); const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid); @@ -110,7 +110,7 @@ TransactionCoordinatorCatalog::getLatestOnSession(LogicalSessionId lsid) { void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnNumber) { using CoordinatorState = TransactionCoordinator::StateMachine::State; - stdx::lock_guard<decltype(_mtx)> lk(_mtx); + stdx::lock_guard<stdx::mutex> lk(_mutex); const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid); @@ -120,8 +120,15 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN if (coordinatorForTxnIter != coordinatorsForSession.end()) { auto coordinator = coordinatorForTxnIter->second; - invariant(coordinator->state() == CoordinatorState::kCommitted || - coordinator->state() == CoordinatorState::kAborted); + // TODO (SERVER-36304/37021): Reenable the below invariant once transaction participants + // are able to send votes and once we validate the state of the coordinator when a new + // transaction comes in for an existing session. For now, we're not validating the state + // of the coordinator which means it is possible that starting a new transaction before + // waiting for the previous one's coordinator to reach state committed or aborted will + // corrupt the previous transaction. + + // invariant(coordinator->state() == CoordinatorState::kCommitted || + // coordinator->state() == CoordinatorState::kAborted); coordinatorsForSession.erase(coordinatorForTxnIter); if (coordinatorsForSession.size() == 0) { _coordinatorsBySession.erase(coordinatorsForSessionIter); diff --git a/src/mongo/db/transaction_coordinator_catalog.h b/src/mongo/db/transaction_coordinator_catalog.h index 58b2009597a..611a3497884 100644 --- a/src/mongo/db/transaction_coordinator_catalog.h +++ b/src/mongo/db/transaction_coordinator_catalog.h @@ -85,7 +85,7 @@ private: /** * Protects the _coordinatorsBySession map. */ - stdx::mutex _mtx; + stdx::mutex _mutex; /** * Contains TransactionCoordinator objects by session id and transaction number. May contain diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp index 6b684b5d5db..fe053f70978 100644 --- a/src/mongo/db/transaction_coordinator_catalog_test.cpp +++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp @@ -185,14 +185,19 @@ TEST_F(TransactionCoordinatorCatalogTest, RemovingAnAbortedCoordinatorSucceeds) ASSERT_EQ(coordinatorInCatalog, boost::none); } -DEATH_TEST_F(TransactionCoordinatorCatalogTest, - RemovingACoordinatorNotInCommittedOrAbortedStateFails, - "Invariant failure") { - LogicalSessionId lsid = makeLogicalSessionIdForTest(); - TxnNumber txnNumber = 1; - coordinatorCatalog().create(lsid, txnNumber); - coordinatorCatalog().remove(lsid, txnNumber); -} +// TODO (SERVER-36304/37021): Reenable once transaction participants are able to send +// votes and once we validate the state of the coordinator when a new transaction comes +// in for an existing session. For now, we're not validating the state of the +// coordinator which means it is possible that if we hit invalid behavior in testing +// that this will result in hidden incorrect behavior. +// DEATH_TEST_F(TransactionCoordinatorCatalogTest, +// RemovingACoordinatorNotInCommittedOrAbortedStateFails, +// "Invariant failure") { +// LogicalSessionId lsid = makeLogicalSessionIdForTest(); +// TxnNumber txnNumber = 1; +// coordinatorCatalog().create(lsid, txnNumber); +// coordinatorCatalog().remove(lsid, txnNumber); +// } } // namespace } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_commands_impl.cpp b/src/mongo/db/transaction_coordinator_commands_impl.cpp index 77e51cd9789..68dbdd7992c 100644 --- a/src/mongo/db/transaction_coordinator_commands_impl.cpp +++ b/src/mongo/db/transaction_coordinator_commands_impl.cpp @@ -160,54 +160,36 @@ std::vector<ShardId> sendAbort(OperationContext* opCtx, std::set<ShardId>& nonAc return ackedParticipants; } -void doAction(OperationContext* opCtx, TransactionCoordinator::StateMachine::Action action) { +void doAction(OperationContext* opCtx, + std::shared_ptr<TransactionCoordinator> coordinator, + TransactionCoordinator::StateMachine::Action action) { switch (action) { case TransactionCoordinator::StateMachine::Action::kSendCommit: { std::set<ShardId> nonAckedParticipants; - { - OperationContextSessionMongod checkOutSession( - opCtx, true, false, boost::none, true); - nonAckedParticipants = - TransactionCoordinator::get(opCtx)->getNonAckedCommitParticipants(); - } + nonAckedParticipants = coordinator->getNonAckedCommitParticipants(); // TODO (SERVER-36638): Spawn a separate thread to do this so that the client's thread // does not block. auto ackedParticipants = sendCommit(opCtx, nonAckedParticipants); - { - OperationContextSessionMongod checkOutSession( - opCtx, true, false, boost::none, true); - auto& coordinator = TransactionCoordinator::get(opCtx); - for (auto& participant : ackedParticipants) { - coordinator->recvCommitAck(participant); - } + for (auto& participant : ackedParticipants) { + coordinator->recvCommitAck(participant); } return; } case TransactionCoordinator::StateMachine::Action::kSendAbort: { std::set<ShardId> nonAckedParticipants; - { - OperationContextSessionMongod checkOutSession( - opCtx, true, false, boost::none, true); - nonAckedParticipants = - TransactionCoordinator::get(opCtx)->getNonAckedAbortParticipants(); - } + nonAckedParticipants = coordinator->getNonAckedAbortParticipants(); // TODO (SERVER-36638): Spawn a separate thread to do this so that the client's thread // does not block. auto ackedParticipants = sendAbort(opCtx, nonAckedParticipants); - { - OperationContextSessionMongod checkOutSession( - opCtx, true, false, boost::none, true); - auto& coordinator = TransactionCoordinator::get(opCtx); - for (auto& participant : ackedParticipants) { - coordinator->recvAbortAck(participant); - } + for (auto& participant : ackedParticipants) { + coordinator->recvAbortAck(participant); } return; @@ -222,7 +204,9 @@ void doAction(OperationContext* opCtx, TransactionCoordinator::StateMachine::Act namespace txn { -void recvCoordinateCommit(OperationContext* opCtx, const std::set<ShardId>& participantList) { +void recvCoordinateCommit(OperationContext* opCtx, + std::shared_ptr<TransactionCoordinator> coordinator, + const std::set<ShardId>& participantList) { // TODO (SERVER-36687): Remove log line or demote to lower log level once cross-shard // transactions are stable. StringBuilder ss; @@ -234,42 +218,39 @@ void recvCoordinateCommit(OperationContext* opCtx, const std::set<ShardId>& part LOG(0) << "Coordinator shard received participant list with shards " << ss.str(); TransactionCoordinator::StateMachine::Action action; - { - OperationContextSessionMongod checkOutSession(opCtx, true, false, boost::none, true); - action = TransactionCoordinator::get(opCtx)->recvCoordinateCommit(participantList); - } - doAction(opCtx, action); + action = coordinator->recvCoordinateCommit(participantList); + + doAction(opCtx, coordinator, action); // TODO (SERVER-36640): Wait for decision to be made. } -void recvVoteCommit(OperationContext* opCtx, const ShardId& shardId, int prepareTimestamp) { +void recvVoteCommit(OperationContext* opCtx, + std::shared_ptr<TransactionCoordinator> coordinator, + const ShardId& shardId, + int prepareTimestamp) { // TODO (SERVER-36687): Remove log line or demote to lower log level once cross-shard // transactions are stable. LOG(0) << "Coordinator shard received voteCommit from " << shardId; TransactionCoordinator::StateMachine::Action action; - { - OperationContextSessionMongod checkOutSession(opCtx, true, false, boost::none, true); - action = TransactionCoordinator::get(opCtx)->recvVoteCommit(shardId, prepareTimestamp); - } + action = coordinator->recvVoteCommit(shardId, prepareTimestamp); - doAction(opCtx, action); + doAction(opCtx, coordinator, action); } -void recvVoteAbort(OperationContext* opCtx, const ShardId& shardId) { +void recvVoteAbort(OperationContext* opCtx, + std::shared_ptr<TransactionCoordinator> coordinator, + const ShardId& shardId) { // TODO (SERVER-36687): Remove log line or demote to lower log level once cross-shard // transactions are stable. LOG(0) << "Coordinator shard received voteAbort from " << shardId; TransactionCoordinator::StateMachine::Action action; - { - OperationContextSessionMongod checkOutSession(opCtx, true, false, boost::none, true); - action = TransactionCoordinator::get(opCtx)->recvVoteAbort(shardId); - } + action = coordinator->recvVoteAbort(shardId); - doAction(opCtx, action); + doAction(opCtx, coordinator, action); } } // namespace txn diff --git a/src/mongo/db/transaction_coordinator_commands_impl.h b/src/mongo/db/transaction_coordinator_commands_impl.h index f05a519ddfd..f8b302c4c41 100644 --- a/src/mongo/db/transaction_coordinator_commands_impl.h +++ b/src/mongo/db/transaction_coordinator_commands_impl.h @@ -39,9 +39,16 @@ namespace mongo { */ namespace txn { -void recvCoordinateCommit(OperationContext* opCtx, const std::set<ShardId>& participantList); -void recvVoteCommit(OperationContext* opCtx, const ShardId& shardId, int prepareTimestamp); -void recvVoteAbort(OperationContext* opCtx, const ShardId& shardId); +void recvCoordinateCommit(OperationContext* opCtx, + std::shared_ptr<TransactionCoordinator> coordinator, + const std::set<ShardId>& participantList); +void recvVoteCommit(OperationContext* opCtx, + std::shared_ptr<TransactionCoordinator> coordinator, + const ShardId& shardId, + int prepareTimestamp); +void recvVoteAbort(OperationContext* opCtx, + std::shared_ptr<TransactionCoordinator> coordinator, + const ShardId& shardId); } // namespace txn } // namespace mongo diff --git a/src/mongo/db/transaction_coordinator_commands_impl_test.cpp b/src/mongo/db/transaction_coordinator_commands_impl_test.cpp index 61722a019b3..3d460abb5a9 100644 --- a/src/mongo/db/transaction_coordinator_commands_impl_test.cpp +++ b/src/mongo/db/transaction_coordinator_commands_impl_test.cpp @@ -67,22 +67,7 @@ protected: void setUp() final { ShardServerTestFixture::setUp(); - SessionCatalog::get(getServiceContext())->onStepUp(operationContext()); - auto scopedSession = - SessionCatalog::get(operationContext())->getOrCreateSession(operationContext(), _lsid); - - // Simulate that a regular transaction statement containing 'coordinator: true' was already - // sent to this shard. - operationContext()->setLogicalSessionId(_lsid); - operationContext()->setTxnNumber(_txnNumber); - { - OperationContextSessionMongod checkOutSession( - operationContext(), true, false, true, true); - - auto txnParticipant = TransactionParticipant::get(operationContext()); - txnParticipant->unstashTransactionResources(operationContext(), "dummy"); - txnParticipant->stashTransactionResources(operationContext()); - } + _coordinator = std::make_shared<TransactionCoordinator>(); for_each(shardIds.begin(), shardIds.end(), [this](const ShardId& shardId) { auto shardTargeter = RemoteCommandTargeterMock::get( @@ -94,6 +79,7 @@ protected: void tearDown() final { SessionCatalog::get(getServiceContext())->reset_forTest(); + _coordinator.reset(); ShardServerTestFixture::tearDown(); } @@ -112,7 +98,7 @@ protected: auto opCtxPtr = cc().makeOperationContext(); auto opCtx = opCtxPtr.get(); - // Required to be able to check out the session later. + // Required to be able to send abort and commit commands opCtx->setLogicalSessionId(_lsid); opCtx->setTxnNumber(_txnNumber); @@ -127,19 +113,20 @@ protected: } auto receiveCoordinateCommit(std::set<ShardId> participantList) { - auto commandFn = - std::bind(txn::recvCoordinateCommit, std::placeholders::_1, participantList); + auto commandFn = std::bind( + txn::recvCoordinateCommit, std::placeholders::_1, _coordinator, participantList); return simulateHandleRequest(commandFn); } auto receiveVoteCommit(ShardId shardId, int prepareTimestamp) { - auto commandFn = - std::bind(txn::recvVoteCommit, std::placeholders::_1, shardId, prepareTimestamp); + auto commandFn = std::bind( + txn::recvVoteCommit, std::placeholders::_1, _coordinator, shardId, prepareTimestamp); return simulateHandleRequest(commandFn); } auto receiveVoteAbort(ShardId shardId) { - auto commandFn = std::bind(txn::recvVoteAbort, std::placeholders::_1, shardId); + auto commandFn = + std::bind(txn::recvVoteAbort, std::placeholders::_1, _coordinator, shardId); return simulateHandleRequest(commandFn); } @@ -205,6 +192,7 @@ private: return stdx::make_unique<StaticCatalogClient>(); } + std::shared_ptr<TransactionCoordinator> _coordinator; const LogicalSessionId _lsid{makeLogicalSessionIdForTest()}; const TxnNumber _txnNumber{0}; }; diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp index a72dc1992bd..9e5dc5a9bb0 100644 --- a/src/mongo/db/transaction_coordinator_service.cpp +++ b/src/mongo/db/transaction_coordinator_service.cpp @@ -62,10 +62,15 @@ TransactionCoordinatorService* TransactionCoordinatorService::get(ServiceContext void TransactionCoordinatorService::createCoordinator(LogicalSessionId lsid, TxnNumber txnNumber, Date_t commitDeadline) { - // TODO (SERVER-37021): Validate lsid and txnNumber against latest txnNumber on session in the // catalog. + auto latestTxnNumAndCoordinator = _coordinatorCatalog.getLatestOnSession(lsid); + // TODO (SERVER-37039): The below removal logic for a coordinator will change/be removed once we + // allow multiple coordinators for a session. + if (latestTxnNumAndCoordinator) { + _coordinatorCatalog.remove(lsid, latestTxnNumAndCoordinator->first); + } _coordinatorCatalog.create(lsid, txnNumber); // TODO (SERVER-37024): Schedule abort task on executor to execute at commitDeadline. @@ -84,11 +89,7 @@ TransactionCoordinatorService::CommitDecision TransactionCoordinatorService::coo } // TODO (SERVER-37017): Execute this asynchronously. - txn::recvCoordinateCommit(opCtx, participantList); - // TODO (SERVER-37017): Once coordinate commit is asynchronous and/or returns after deciding to - // commit instead of after finishing commit, removal of the coordinator from the catalog will - // need to be done somewhere else. - _coordinatorCatalog.remove(lsid, txnNumber); + txn::recvCoordinateCommit(opCtx, coordinator.get(), participantList); // TODO (SERVER-36640): Return a notification wrapping the decision that the caller can wait on. return TransactionCoordinatorService::CommitDecision::kAbort; @@ -102,10 +103,11 @@ void TransactionCoordinatorService::voteCommit(OperationContext* opCtx, auto coordinator = _coordinatorCatalog.get(lsid, txnNumber); if (!coordinator) { // TODO (SERVER-37018): Send abort to the participant who sent this vote (shardId) + return; } // TODO (SERVER-37017): Execute this asynchronously - txn::recvVoteCommit(opCtx, shardId, prepareTimestamp); + txn::recvVoteCommit(opCtx, coordinator.get(), shardId, prepareTimestamp); } void TransactionCoordinatorService::voteAbort(OperationContext* opCtx, @@ -116,7 +118,7 @@ void TransactionCoordinatorService::voteAbort(OperationContext* opCtx, if (coordinator) { // TODO (SERVER-37017): Execute this asynchronously. - txn::recvVoteAbort(opCtx, shardId); + txn::recvVoteAbort(opCtx, coordinator.get(), shardId); } } |