diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-06-27 17:51:14 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-06-29 10:05:59 -0400 |
commit | 0a973568da55c069980c54c64285abc7ea2ac3fe (patch) | |
tree | ba65c3fb524241989033b3aa0f879d8a3666d137 /src/mongo | |
parent | 958feda39142fbd9861c9ba4c2232e5f6df61fc8 (diff) | |
download | mongo-0a973568da55c069980c54c64285abc7ea2ac3fe.tar.gz |
SERVER-35853 Track coordinator shard in RouterSessionRuntimeState
Diffstat (limited to 'src/mongo')
4 files changed, 102 insertions, 2 deletions
diff --git a/src/mongo/db/command_generic_argument.cpp b/src/mongo/db/command_generic_argument.cpp index ecd0213865b..8a97110af05 100644 --- a/src/mongo/db/command_generic_argument.cpp +++ b/src/mongo/db/command_generic_argument.cpp @@ -50,7 +50,7 @@ struct SpecialArgRecord { // If that changes, it should be added. When you add to this list, consider whether you // should also change the filterCommandRequestForPassthrough() function. // clang-format off -static constexpr std::array<SpecialArgRecord, 24> specials{{ +static constexpr std::array<SpecialArgRecord, 25> specials{{ // /-isGeneric // | /-stripFromRequest // | | /-stripFromReply @@ -73,6 +73,7 @@ static constexpr std::array<SpecialArgRecord, 24> specials{{ {"lsid"_sd, 1, 0, 0}, {"txnNumber"_sd, 1, 0, 0}, {"autocommit"_sd, 1, 1, 0}, + {"coordinator"_sd, 1, 1, 0}, {"startTransaction"_sd, 1, 1, 0}, {"stmtId"_sd, 1, 0, 0}, {"$gleStats"_sd, 0, 0, 1}, diff --git a/src/mongo/s/transaction/router_session_runtime_state.cpp b/src/mongo/s/transaction/router_session_runtime_state.cpp index 38c208a3a35..9ac21b751f9 100644 --- a/src/mongo/s/transaction/router_session_runtime_state.cpp +++ b/src/mongo/s/transaction/router_session_runtime_state.cpp @@ -40,6 +40,7 @@ namespace mongo { namespace { const char kAutoCommitField[] = "autocommit"; +const char kCoordinatorField[] = "coordinator"; const char kStartTransactionField[] = "startTransaction"; class RouterSessionCatalog { @@ -109,6 +110,9 @@ bool isTransactionCommand(const BSONObj& cmd) { } // unnamed namespace +TransactionParticipant::TransactionParticipant(bool isCoordinator) + : _isCoordinator(isCoordinator) {} + BSONObj TransactionParticipant::attachTxnFieldsIfNeeded(BSONObj cmd) { auto isTxnCmd = isTransactionCommand(cmd); // check first before moving cmd. BSONObjBuilder newCmd(std::move(cmd)); @@ -117,6 +121,10 @@ BSONObj TransactionParticipant::attachTxnFieldsIfNeeded(BSONObj cmd) { newCmd.append(kStartTransactionField, true); } + if (_isCoordinator) { + newCmd.append(kCoordinatorField, true); + } + newCmd.append(kAutoCommitField, false); // TODO: append readConcern @@ -128,6 +136,10 @@ TransactionParticipant::State TransactionParticipant::getState() { return _state; } +bool TransactionParticipant::isCoordinator() { + return _isCoordinator; +} + void TransactionParticipant::markAsCommandSent() { if (_state == State::kMustStart) { _state = State::kStarted; @@ -158,6 +170,10 @@ bool RouterSessionRuntimeState::isCheckedOut() { return _isCheckedOut; } +boost::optional<ShardId> RouterSessionRuntimeState::getCoordinatorId() const { + return _coordinatorId; +} + TransactionParticipant& RouterSessionRuntimeState::getOrCreateParticipant(const ShardId& shard) { auto iter = _participants.find(shard.toString()); @@ -165,7 +181,16 @@ TransactionParticipant& RouterSessionRuntimeState::getOrCreateParticipant(const return iter->second; } - auto resultPair = _participants.try_emplace(shard.toString(), TransactionParticipant()); + // The first participant is chosen as the coordinator. + auto isFirstParticipant = _participants.empty(); + if (isFirstParticipant) { + invariant(!_coordinatorId); + _coordinatorId = shard.toString(); + } + + auto resultPair = + _participants.try_emplace(shard.toString(), TransactionParticipant(isFirstParticipant)); + return resultPair.first->second; } @@ -207,6 +232,7 @@ void RouterSessionRuntimeState::beginOrContinueTxn(TxnNumber txnNumber, bool sta _txnNumber = txnNumber; _participants.clear(); + _coordinatorId.reset(); } ScopedRouterSession::ScopedRouterSession(OperationContext* opCtx) : _opCtx(opCtx) { diff --git a/src/mongo/s/transaction/router_session_runtime_state.h b/src/mongo/s/transaction/router_session_runtime_state.h index d9d9b40c512..8a6c8340de2 100644 --- a/src/mongo/s/transaction/router_session_runtime_state.h +++ b/src/mongo/s/transaction/router_session_runtime_state.h @@ -42,6 +42,8 @@ namespace mongo { class TransactionParticipant { public: + explicit TransactionParticipant(bool isCoordinator); + enum class State { // Next transaction should include startTransaction. kMustStart, @@ -57,12 +59,18 @@ public: State getState(); /** + * True if the participant has been chosen as the coordinator for its transaction. + */ + bool isCoordinator(); + + /** * Mark this participant as a node that has been successfully sent a command. */ void markAsCommandSent(); private: State _state{State::kMustStart}; + bool _isCoordinator{false}; }; /** @@ -89,6 +97,8 @@ public: const LogicalSessionId& getSessionId() const; + boost::optional<ShardId> getCoordinatorId() const; + /** * Extract the runtimne state attached to the operation context. Returns nullptr if none is * attached. @@ -104,6 +114,9 @@ private: // Map of current participants of the current transaction. StringMap<TransactionParticipant> _participants; + + // The id of coordinator participant, used to construct prepare requests. + boost::optional<ShardId> _coordinatorId; }; /** diff --git a/src/mongo/s/transaction/router_session_runtime_state_test.cpp b/src/mongo/s/transaction/router_session_runtime_state_test.cpp index 86e91448b14..bfa83b7dde3 100644 --- a/src/mongo/s/transaction/router_session_runtime_state_test.cpp +++ b/src/mongo/s/transaction/router_session_runtime_state_test.cpp @@ -45,6 +45,8 @@ TEST(RouterSessionRuntimeStateTest, BasicStartTxn) { << "test" << "startTransaction" << true + << "coordinator" + << true << "autocommit" << false); @@ -71,6 +73,8 @@ TEST(RouterSessionRuntimeStateTest, BasicStartTxn) { << "test")); ASSERT_BSONOBJ_EQ(BSON("update" << "test" + << "coordinator" + << true << "autocommit" << false), newCmd); @@ -99,6 +103,8 @@ TEST(RouterSessionRuntimeStateTest, NewParticipantMustAttachTxn) { << "test" << "startTransaction" << true + << "coordinator" + << true << "autocommit" << false); @@ -116,12 +122,22 @@ TEST(RouterSessionRuntimeStateTest, NewParticipantMustAttachTxn) { << "test")); ASSERT_BSONOBJ_EQ(BSON("update" << "test" + << "coordinator" + << true << "autocommit" << false), newCmd); } ShardId shard2("b"); + + expectedNewObj = BSON("insert" + << "test" + << "startTransaction" + << true + << "autocommit" + << false); + { auto& participant = sessionState.getOrCreateParticipant(shard2); auto newCmd = participant.attachTxnFieldsIfNeeded(BSON("insert" @@ -158,6 +174,8 @@ TEST(RouterSessionRuntimeStateTest, StartingNewTxnShouldClearState) { << "test")); ASSERT_BSONOBJ_EQ(BSON("update" << "test" + << "coordinator" + << true << "autocommit" << false), newCmd); @@ -170,6 +188,8 @@ TEST(RouterSessionRuntimeStateTest, StartingNewTxnShouldClearState) { << "test" << "startTransaction" << true + << "coordinator" + << true << "autocommit" << false); @@ -181,5 +201,45 @@ TEST(RouterSessionRuntimeStateTest, StartingNewTxnShouldClearState) { } } +TEST(RouterSessionRuntimeStateTest, FirstParticipantIsCoordinator) { + TxnNumber txnNum{3}; + + RouterSessionRuntimeState sessionState({}); + sessionState.checkOut(); + sessionState.beginOrContinueTxn(txnNum, true); + + ASSERT_FALSE(sessionState.getCoordinatorId()); + + ShardId shard1("a"); + + { + auto& participant = sessionState.getOrCreateParticipant(shard1); + ASSERT(participant.isCoordinator()); + ASSERT(sessionState.getCoordinatorId()); + ASSERT_EQ(*sessionState.getCoordinatorId(), shard1); + } + + ShardId shard2("b"); + + { + auto& participant = sessionState.getOrCreateParticipant(shard2); + ASSERT_FALSE(participant.isCoordinator()); + ASSERT(sessionState.getCoordinatorId()); + ASSERT_EQ(*sessionState.getCoordinatorId(), shard1); + } + + TxnNumber txnNum2{5}; + sessionState.beginOrContinueTxn(txnNum2, true); + + ASSERT_FALSE(sessionState.getCoordinatorId()); + + { + auto& participant = sessionState.getOrCreateParticipant(shard2); + ASSERT(participant.isCoordinator()); + ASSERT(sessionState.getCoordinatorId()); + ASSERT_EQ(*sessionState.getCoordinatorId(), shard2); + } +} + } // unnamed namespace } // namespace mongo |