summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2018-09-06 18:31:03 -0400
committerMatthew Saltz <matthew.saltz@mongodb.com>2018-09-12 10:09:24 -0400
commit0d88ca3610e05e9a524aefc8d7b6d9911288f484 (patch)
tree7b23d70d9aae0e320f1cd0b798a35b5d169dad3e
parentc9367644543c2734cb2e0449d7f735adfc370890 (diff)
downloadmongo-0d88ca3610e05e9a524aefc8d7b6d9911288f484.tar.gz
SERVER-36852 Move TransactionCoordinator off of Session and into TransactionCoordinatorService
-rw-r--r--src/mongo/db/operation_context_session_mongod.cpp21
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp21
-rw-r--r--src/mongo/db/transaction_coordinator.cpp20
-rw-r--r--src/mongo/db/transaction_coordinator.h5
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.cpp19
-rw-r--r--src/mongo/db/transaction_coordinator_catalog.h2
-rw-r--r--src/mongo/db/transaction_coordinator_catalog_test.cpp21
-rw-r--r--src/mongo/db/transaction_coordinator_commands_impl.cpp71
-rw-r--r--src/mongo/db/transaction_coordinator_commands_impl.h13
-rw-r--r--src/mongo/db/transaction_coordinator_commands_impl_test.cpp32
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp18
11 files changed, 116 insertions, 127 deletions
diff --git a/src/mongo/db/operation_context_session_mongod.cpp b/src/mongo/db/operation_context_session_mongod.cpp
index 96c15c6f58a..eae70542f0b 100644
--- a/src/mongo/db/operation_context_session_mongod.cpp
+++ b/src/mongo/db/operation_context_session_mongod.cpp
@@ -29,6 +29,7 @@
#include "mongo/db/operation_context_session_mongod.h"
#include "mongo/db/transaction_coordinator.h"
+#include "mongo/db/transaction_coordinator_service.h"
#include "mongo/db/transaction_participant.h"
namespace mongo {
@@ -48,18 +49,16 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o
session->beginOrContinueTxn(opCtx, clientTxnNumber);
if (startTransaction && *startTransaction) {
- // If this shard has been selected as the coordinator, set up the coordinator state to
- // be ready to receive votes.
+ auto clientLsid = opCtx->getLogicalSessionId().get();
+ auto clockSource = opCtx->getServiceContext()->getFastClockSource();
+
+ // If this shard has been selected as the coordinator, set up the coordinator state
+ // to be ready to receive votes.
if (coordinator && *coordinator) {
- // TODO: Once shards support multiple active coordinators per session, instead of
- // resetting the previous TransactionCoordinator, simply create a new
- // TransactionCoordinator and push it on the queue of active coordinators for this
- // session.
- // Note: Until the above TODO is done, starting a new transaction before receiving
- // the decision for the previous transaction can corrupt the previous transaction.
- auto& txnCoordinator = TransactionCoordinator::get(opCtx);
- txnCoordinator.reset();
- TransactionCoordinator::create(session);
+ TransactionCoordinatorService::get(opCtx)->createCoordinator(
+ clientLsid,
+ clientTxnNumber,
+ clockSource->now() + Seconds(transactionLifetimeLimitSeconds.load()));
}
}
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index 7256ff443e8..82c1125ae41 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -34,7 +34,7 @@
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/sharding_state.h"
-#include "mongo/db/transaction_coordinator_commands_impl.h"
+#include "mongo/db/transaction_coordinator_service.h"
#include "mongo/db/transaction_participant.h"
namespace mongo {
@@ -133,8 +133,12 @@ public:
const auto& cmd = request();
- txn::recvVoteCommit(
- opCtx, cmd.getShardId(), 0 /* TODO (SERVER-36584) pass real prepareTimestamp */);
+ TransactionCoordinatorService::get(opCtx)->voteCommit(
+ opCtx,
+ opCtx->getLogicalSessionId().get(),
+ opCtx->getTxnNumber().get(),
+ cmd.getShardId(),
+ 0 /* TODO (SERVER-36584) pass real prepareTimestamp */);
}
private:
@@ -183,7 +187,10 @@ public:
const auto& cmd = request();
- txn::recvVoteAbort(opCtx, cmd.getShardId());
+ TransactionCoordinatorService::get(opCtx)->voteAbort(opCtx,
+ opCtx->getLogicalSessionId().get(),
+ opCtx->getTxnNumber().get(),
+ cmd.getShardId());
}
private:
@@ -246,7 +253,11 @@ public:
participantList.insert(shardId);
}
- txn::recvCoordinateCommit(opCtx, participantList);
+ TransactionCoordinatorService::get(opCtx)->coordinateCommit(
+ opCtx,
+ opCtx->getLogicalSessionId().get(),
+ opCtx->getTxnNumber().get(),
+ participantList);
}
private:
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp
index 38c4eae4783..d8d014322b7 100644
--- a/src/mongo/db/transaction_coordinator.cpp
+++ b/src/mongo/db/transaction_coordinator.cpp
@@ -41,27 +41,14 @@ using Action = TransactionCoordinator::StateMachine::Action;
using Event = TransactionCoordinator::StateMachine::Event;
using State = TransactionCoordinator::StateMachine::State;
-namespace {
-const Session::Decoration<boost::optional<TransactionCoordinator>> getTransactionCoordinator =
- Session::declareDecoration<boost::optional<TransactionCoordinator>>();
-} // namespace
-
-boost::optional<TransactionCoordinator>& TransactionCoordinator::get(OperationContext* opCtx) {
- auto session = OperationContextSession::get(opCtx);
- return getTransactionCoordinator(session);
-}
-
-void TransactionCoordinator::create(Session* session) {
- invariant(!getTransactionCoordinator(session));
- getTransactionCoordinator(session).emplace();
-}
-
Action TransactionCoordinator::recvCoordinateCommit(const std::set<ShardId>& participants) {
+ stdx::lock_guard<decltype(_mtx)> lk(_mtx);
_participantList.recordFullList(participants);
return _stateMachine.onEvent(Event::kRecvParticipantList);
}
Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, int prepareTimestamp) {
+ stdx::lock_guard<decltype(_mtx)> lk(_mtx);
_participantList.recordVoteCommit(shardId, prepareTimestamp);
auto event = (_participantList.allParticipantsVotedCommit()) ? Event::kRecvFinalVoteCommit
@@ -70,11 +57,13 @@ Action TransactionCoordinator::recvVoteCommit(const ShardId& shardId, int prepar
}
Action TransactionCoordinator::recvVoteAbort(const ShardId& shardId) {
+ stdx::lock_guard<decltype(_mtx)> lk(_mtx);
_participantList.recordVoteAbort(shardId);
return _stateMachine.onEvent(Event::kRecvVoteAbort);
}
void TransactionCoordinator::recvCommitAck(const ShardId& shardId) {
+ stdx::lock_guard<decltype(_mtx)> lk(_mtx);
_participantList.recordCommitAck(shardId);
if (_participantList.allParticipantsAckedCommit()) {
_stateMachine.onEvent(Event::kRecvFinalCommitAck);
@@ -82,6 +71,7 @@ void TransactionCoordinator::recvCommitAck(const ShardId& shardId) {
}
void TransactionCoordinator::recvAbortAck(const ShardId& shardId) {
+ stdx::lock_guard<decltype(_mtx)> lk(_mtx);
_participantList.recordAbortAck(shardId);
if (_participantList.allParticipantsAckedAbort()) {
_stateMachine.onEvent(Event::kRecvFinalAbortAck);
diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h
index dc07fbce0ea..a587ef34b69 100644
--- a/src/mongo/db/transaction_coordinator.h
+++ b/src/mongo/db/transaction_coordinator.h
@@ -34,6 +34,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/s/shard_id.h"
+#include "mongo/stdx/mutex.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/decorable.h"
#include "mongo/util/mongoutils/str.h"
@@ -54,9 +55,6 @@ public:
TransactionCoordinator() = default;
~TransactionCoordinator() = default;
- static boost::optional<TransactionCoordinator>& get(OperationContext* opCtx);
- static void create(Session* session);
-
/**
* The internal state machine, or "brain", used by the TransactionCoordinator to determine what
* to do in response to an "event" (receiving a request or hearing back a response).
@@ -202,6 +200,7 @@ public:
};
private:
+ stdx::mutex _mtx;
ParticipantList _participantList;
StateMachine _stateMachine;
};
diff --git a/src/mongo/db/transaction_coordinator_catalog.cpp b/src/mongo/db/transaction_coordinator_catalog.cpp
index b36f240305b..4257dc212ec 100644
--- a/src/mongo/db/transaction_coordinator_catalog.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog.cpp
@@ -45,7 +45,7 @@ TransactionCoordinatorCatalog::~TransactionCoordinatorCatalog() = default;
std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::create(LogicalSessionId lsid,
TxnNumber txnNumber) {
- stdx::lock_guard<decltype(_mtx)> lk(_mtx);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Create a new map for the session if it does not exist
if (_coordinatorsBySession.find(lsid) == _coordinatorsBySession.end()) {
@@ -68,7 +68,7 @@ std::shared_ptr<TransactionCoordinator> TransactionCoordinatorCatalog::create(Lo
boost::optional<std::shared_ptr<TransactionCoordinator>> TransactionCoordinatorCatalog::get(
LogicalSessionId lsid, TxnNumber txnNumber) {
- stdx::lock_guard<decltype(_mtx)> lk(_mtx);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid);
@@ -89,7 +89,7 @@ boost::optional<std::shared_ptr<TransactionCoordinator>> TransactionCoordinatorC
boost::optional<std::pair<TxnNumber, std::shared_ptr<TransactionCoordinator>>>
TransactionCoordinatorCatalog::getLatestOnSession(LogicalSessionId lsid) {
- stdx::lock_guard<decltype(_mtx)> lk(_mtx);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid);
@@ -110,7 +110,7 @@ TransactionCoordinatorCatalog::getLatestOnSession(LogicalSessionId lsid) {
void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnNumber) {
using CoordinatorState = TransactionCoordinator::StateMachine::State;
- stdx::lock_guard<decltype(_mtx)> lk(_mtx);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto& coordinatorsForSessionIter = _coordinatorsBySession.find(lsid);
@@ -120,8 +120,15 @@ void TransactionCoordinatorCatalog::remove(LogicalSessionId lsid, TxnNumber txnN
if (coordinatorForTxnIter != coordinatorsForSession.end()) {
auto coordinator = coordinatorForTxnIter->second;
- invariant(coordinator->state() == CoordinatorState::kCommitted ||
- coordinator->state() == CoordinatorState::kAborted);
+ // TODO (SERVER-36304/37021): Reenable the below invariant once transaction participants
+ // are able to send votes and once we validate the state of the coordinator when a new
+ // transaction comes in for an existing session. For now, we're not validating the state
+ // of the coordinator which means it is possible that starting a new transaction before
+ // waiting for the previous one's coordinator to reach state committed or aborted will
+ // corrupt the previous transaction.
+
+ // invariant(coordinator->state() == CoordinatorState::kCommitted ||
+ // coordinator->state() == CoordinatorState::kAborted);
coordinatorsForSession.erase(coordinatorForTxnIter);
if (coordinatorsForSession.size() == 0) {
_coordinatorsBySession.erase(coordinatorsForSessionIter);
diff --git a/src/mongo/db/transaction_coordinator_catalog.h b/src/mongo/db/transaction_coordinator_catalog.h
index 58b2009597a..611a3497884 100644
--- a/src/mongo/db/transaction_coordinator_catalog.h
+++ b/src/mongo/db/transaction_coordinator_catalog.h
@@ -85,7 +85,7 @@ private:
/**
* Protects the _coordinatorsBySession map.
*/
- stdx::mutex _mtx;
+ stdx::mutex _mutex;
/**
* Contains TransactionCoordinator objects by session id and transaction number. May contain
diff --git a/src/mongo/db/transaction_coordinator_catalog_test.cpp b/src/mongo/db/transaction_coordinator_catalog_test.cpp
index 6b684b5d5db..fe053f70978 100644
--- a/src/mongo/db/transaction_coordinator_catalog_test.cpp
+++ b/src/mongo/db/transaction_coordinator_catalog_test.cpp
@@ -185,14 +185,19 @@ TEST_F(TransactionCoordinatorCatalogTest, RemovingAnAbortedCoordinatorSucceeds)
ASSERT_EQ(coordinatorInCatalog, boost::none);
}
-DEATH_TEST_F(TransactionCoordinatorCatalogTest,
- RemovingACoordinatorNotInCommittedOrAbortedStateFails,
- "Invariant failure") {
- LogicalSessionId lsid = makeLogicalSessionIdForTest();
- TxnNumber txnNumber = 1;
- coordinatorCatalog().create(lsid, txnNumber);
- coordinatorCatalog().remove(lsid, txnNumber);
-}
+// TODO (SERVER-36304/37021): Reenable once transaction participants are able to send
+// votes and once we validate the state of the coordinator when a new transaction comes
+// in for an existing session. For now, we're not validating the state of the
+// coordinator which means it is possible that if we hit invalid behavior in testing
+// that this will result in hidden incorrect behavior.
+// DEATH_TEST_F(TransactionCoordinatorCatalogTest,
+// RemovingACoordinatorNotInCommittedOrAbortedStateFails,
+// "Invariant failure") {
+// LogicalSessionId lsid = makeLogicalSessionIdForTest();
+// TxnNumber txnNumber = 1;
+// coordinatorCatalog().create(lsid, txnNumber);
+// coordinatorCatalog().remove(lsid, txnNumber);
+// }
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_commands_impl.cpp b/src/mongo/db/transaction_coordinator_commands_impl.cpp
index 77e51cd9789..68dbdd7992c 100644
--- a/src/mongo/db/transaction_coordinator_commands_impl.cpp
+++ b/src/mongo/db/transaction_coordinator_commands_impl.cpp
@@ -160,54 +160,36 @@ std::vector<ShardId> sendAbort(OperationContext* opCtx, std::set<ShardId>& nonAc
return ackedParticipants;
}
-void doAction(OperationContext* opCtx, TransactionCoordinator::StateMachine::Action action) {
+void doAction(OperationContext* opCtx,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ TransactionCoordinator::StateMachine::Action action) {
switch (action) {
case TransactionCoordinator::StateMachine::Action::kSendCommit: {
std::set<ShardId> nonAckedParticipants;
- {
- OperationContextSessionMongod checkOutSession(
- opCtx, true, false, boost::none, true);
- nonAckedParticipants =
- TransactionCoordinator::get(opCtx)->getNonAckedCommitParticipants();
- }
+ nonAckedParticipants = coordinator->getNonAckedCommitParticipants();
// TODO (SERVER-36638): Spawn a separate thread to do this so that the client's thread
// does not block.
auto ackedParticipants = sendCommit(opCtx, nonAckedParticipants);
- {
- OperationContextSessionMongod checkOutSession(
- opCtx, true, false, boost::none, true);
- auto& coordinator = TransactionCoordinator::get(opCtx);
- for (auto& participant : ackedParticipants) {
- coordinator->recvCommitAck(participant);
- }
+ for (auto& participant : ackedParticipants) {
+ coordinator->recvCommitAck(participant);
}
return;
}
case TransactionCoordinator::StateMachine::Action::kSendAbort: {
std::set<ShardId> nonAckedParticipants;
- {
- OperationContextSessionMongod checkOutSession(
- opCtx, true, false, boost::none, true);
- nonAckedParticipants =
- TransactionCoordinator::get(opCtx)->getNonAckedAbortParticipants();
- }
+ nonAckedParticipants = coordinator->getNonAckedAbortParticipants();
// TODO (SERVER-36638): Spawn a separate thread to do this so that the client's thread
// does not block.
auto ackedParticipants = sendAbort(opCtx, nonAckedParticipants);
- {
- OperationContextSessionMongod checkOutSession(
- opCtx, true, false, boost::none, true);
- auto& coordinator = TransactionCoordinator::get(opCtx);
- for (auto& participant : ackedParticipants) {
- coordinator->recvAbortAck(participant);
- }
+ for (auto& participant : ackedParticipants) {
+ coordinator->recvAbortAck(participant);
}
return;
@@ -222,7 +204,9 @@ void doAction(OperationContext* opCtx, TransactionCoordinator::StateMachine::Act
namespace txn {
-void recvCoordinateCommit(OperationContext* opCtx, const std::set<ShardId>& participantList) {
+void recvCoordinateCommit(OperationContext* opCtx,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& participantList) {
// TODO (SERVER-36687): Remove log line or demote to lower log level once cross-shard
// transactions are stable.
StringBuilder ss;
@@ -234,42 +218,39 @@ void recvCoordinateCommit(OperationContext* opCtx, const std::set<ShardId>& part
LOG(0) << "Coordinator shard received participant list with shards " << ss.str();
TransactionCoordinator::StateMachine::Action action;
- {
- OperationContextSessionMongod checkOutSession(opCtx, true, false, boost::none, true);
- action = TransactionCoordinator::get(opCtx)->recvCoordinateCommit(participantList);
- }
- doAction(opCtx, action);
+ action = coordinator->recvCoordinateCommit(participantList);
+
+ doAction(opCtx, coordinator, action);
// TODO (SERVER-36640): Wait for decision to be made.
}
-void recvVoteCommit(OperationContext* opCtx, const ShardId& shardId, int prepareTimestamp) {
+void recvVoteCommit(OperationContext* opCtx,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const ShardId& shardId,
+ int prepareTimestamp) {
// TODO (SERVER-36687): Remove log line or demote to lower log level once cross-shard
// transactions are stable.
LOG(0) << "Coordinator shard received voteCommit from " << shardId;
TransactionCoordinator::StateMachine::Action action;
- {
- OperationContextSessionMongod checkOutSession(opCtx, true, false, boost::none, true);
- action = TransactionCoordinator::get(opCtx)->recvVoteCommit(shardId, prepareTimestamp);
- }
+ action = coordinator->recvVoteCommit(shardId, prepareTimestamp);
- doAction(opCtx, action);
+ doAction(opCtx, coordinator, action);
}
-void recvVoteAbort(OperationContext* opCtx, const ShardId& shardId) {
+void recvVoteAbort(OperationContext* opCtx,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const ShardId& shardId) {
// TODO (SERVER-36687): Remove log line or demote to lower log level once cross-shard
// transactions are stable.
LOG(0) << "Coordinator shard received voteAbort from " << shardId;
TransactionCoordinator::StateMachine::Action action;
- {
- OperationContextSessionMongod checkOutSession(opCtx, true, false, boost::none, true);
- action = TransactionCoordinator::get(opCtx)->recvVoteAbort(shardId);
- }
+ action = coordinator->recvVoteAbort(shardId);
- doAction(opCtx, action);
+ doAction(opCtx, coordinator, action);
}
} // namespace txn
diff --git a/src/mongo/db/transaction_coordinator_commands_impl.h b/src/mongo/db/transaction_coordinator_commands_impl.h
index f05a519ddfd..f8b302c4c41 100644
--- a/src/mongo/db/transaction_coordinator_commands_impl.h
+++ b/src/mongo/db/transaction_coordinator_commands_impl.h
@@ -39,9 +39,16 @@ namespace mongo {
*/
namespace txn {
-void recvCoordinateCommit(OperationContext* opCtx, const std::set<ShardId>& participantList);
-void recvVoteCommit(OperationContext* opCtx, const ShardId& shardId, int prepareTimestamp);
-void recvVoteAbort(OperationContext* opCtx, const ShardId& shardId);
+void recvCoordinateCommit(OperationContext* opCtx,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const std::set<ShardId>& participantList);
+void recvVoteCommit(OperationContext* opCtx,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const ShardId& shardId,
+ int prepareTimestamp);
+void recvVoteAbort(OperationContext* opCtx,
+ std::shared_ptr<TransactionCoordinator> coordinator,
+ const ShardId& shardId);
} // namespace txn
} // namespace mongo
diff --git a/src/mongo/db/transaction_coordinator_commands_impl_test.cpp b/src/mongo/db/transaction_coordinator_commands_impl_test.cpp
index 61722a019b3..3d460abb5a9 100644
--- a/src/mongo/db/transaction_coordinator_commands_impl_test.cpp
+++ b/src/mongo/db/transaction_coordinator_commands_impl_test.cpp
@@ -67,22 +67,7 @@ protected:
void setUp() final {
ShardServerTestFixture::setUp();
- SessionCatalog::get(getServiceContext())->onStepUp(operationContext());
- auto scopedSession =
- SessionCatalog::get(operationContext())->getOrCreateSession(operationContext(), _lsid);
-
- // Simulate that a regular transaction statement containing 'coordinator: true' was already
- // sent to this shard.
- operationContext()->setLogicalSessionId(_lsid);
- operationContext()->setTxnNumber(_txnNumber);
- {
- OperationContextSessionMongod checkOutSession(
- operationContext(), true, false, true, true);
-
- auto txnParticipant = TransactionParticipant::get(operationContext());
- txnParticipant->unstashTransactionResources(operationContext(), "dummy");
- txnParticipant->stashTransactionResources(operationContext());
- }
+ _coordinator = std::make_shared<TransactionCoordinator>();
for_each(shardIds.begin(), shardIds.end(), [this](const ShardId& shardId) {
auto shardTargeter = RemoteCommandTargeterMock::get(
@@ -94,6 +79,7 @@ protected:
void tearDown() final {
SessionCatalog::get(getServiceContext())->reset_forTest();
+ _coordinator.reset();
ShardServerTestFixture::tearDown();
}
@@ -112,7 +98,7 @@ protected:
auto opCtxPtr = cc().makeOperationContext();
auto opCtx = opCtxPtr.get();
- // Required to be able to check out the session later.
+ // Required to be able to send abort and commit commands
opCtx->setLogicalSessionId(_lsid);
opCtx->setTxnNumber(_txnNumber);
@@ -127,19 +113,20 @@ protected:
}
auto receiveCoordinateCommit(std::set<ShardId> participantList) {
- auto commandFn =
- std::bind(txn::recvCoordinateCommit, std::placeholders::_1, participantList);
+ auto commandFn = std::bind(
+ txn::recvCoordinateCommit, std::placeholders::_1, _coordinator, participantList);
return simulateHandleRequest(commandFn);
}
auto receiveVoteCommit(ShardId shardId, int prepareTimestamp) {
- auto commandFn =
- std::bind(txn::recvVoteCommit, std::placeholders::_1, shardId, prepareTimestamp);
+ auto commandFn = std::bind(
+ txn::recvVoteCommit, std::placeholders::_1, _coordinator, shardId, prepareTimestamp);
return simulateHandleRequest(commandFn);
}
auto receiveVoteAbort(ShardId shardId) {
- auto commandFn = std::bind(txn::recvVoteAbort, std::placeholders::_1, shardId);
+ auto commandFn =
+ std::bind(txn::recvVoteAbort, std::placeholders::_1, _coordinator, shardId);
return simulateHandleRequest(commandFn);
}
@@ -205,6 +192,7 @@ private:
return stdx::make_unique<StaticCatalogClient>();
}
+ std::shared_ptr<TransactionCoordinator> _coordinator;
const LogicalSessionId _lsid{makeLogicalSessionIdForTest()};
const TxnNumber _txnNumber{0};
};
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index a72dc1992bd..9e5dc5a9bb0 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -62,10 +62,15 @@ TransactionCoordinatorService* TransactionCoordinatorService::get(ServiceContext
void TransactionCoordinatorService::createCoordinator(LogicalSessionId lsid,
TxnNumber txnNumber,
Date_t commitDeadline) {
-
// TODO (SERVER-37021): Validate lsid and txnNumber against latest txnNumber on session in the
// catalog.
+ auto latestTxnNumAndCoordinator = _coordinatorCatalog.getLatestOnSession(lsid);
+ // TODO (SERVER-37039): The below removal logic for a coordinator will change/be removed once we
+ // allow multiple coordinators for a session.
+ if (latestTxnNumAndCoordinator) {
+ _coordinatorCatalog.remove(lsid, latestTxnNumAndCoordinator->first);
+ }
_coordinatorCatalog.create(lsid, txnNumber);
// TODO (SERVER-37024): Schedule abort task on executor to execute at commitDeadline.
@@ -84,11 +89,7 @@ TransactionCoordinatorService::CommitDecision TransactionCoordinatorService::coo
}
// TODO (SERVER-37017): Execute this asynchronously.
- txn::recvCoordinateCommit(opCtx, participantList);
- // TODO (SERVER-37017): Once coordinate commit is asynchronous and/or returns after deciding to
- // commit instead of after finishing commit, removal of the coordinator from the catalog will
- // need to be done somewhere else.
- _coordinatorCatalog.remove(lsid, txnNumber);
+ txn::recvCoordinateCommit(opCtx, coordinator.get(), participantList);
// TODO (SERVER-36640): Return a notification wrapping the decision that the caller can wait on.
return TransactionCoordinatorService::CommitDecision::kAbort;
@@ -102,10 +103,11 @@ void TransactionCoordinatorService::voteCommit(OperationContext* opCtx,
auto coordinator = _coordinatorCatalog.get(lsid, txnNumber);
if (!coordinator) {
// TODO (SERVER-37018): Send abort to the participant who sent this vote (shardId)
+ return;
}
// TODO (SERVER-37017): Execute this asynchronously
- txn::recvVoteCommit(opCtx, shardId, prepareTimestamp);
+ txn::recvVoteCommit(opCtx, coordinator.get(), shardId, prepareTimestamp);
}
void TransactionCoordinatorService::voteAbort(OperationContext* opCtx,
@@ -116,7 +118,7 @@ void TransactionCoordinatorService::voteAbort(OperationContext* opCtx,
if (coordinator) {
// TODO (SERVER-37017): Execute this asynchronously.
- txn::recvVoteAbort(opCtx, shardId);
+ txn::recvVoteAbort(opCtx, coordinator.get(), shardId);
}
}