summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-06-27 17:51:14 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2018-06-29 10:05:59 -0400
commit0a973568da55c069980c54c64285abc7ea2ac3fe (patch)
treeba65c3fb524241989033b3aa0f879d8a3666d137 /src/mongo
parent958feda39142fbd9861c9ba4c2232e5f6df61fc8 (diff)
downloadmongo-0a973568da55c069980c54c64285abc7ea2ac3fe.tar.gz
SERVER-35853 Track coordinator shard in RouterSessionRuntimeState
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/command_generic_argument.cpp3
-rw-r--r--src/mongo/s/transaction/router_session_runtime_state.cpp28
-rw-r--r--src/mongo/s/transaction/router_session_runtime_state.h13
-rw-r--r--src/mongo/s/transaction/router_session_runtime_state_test.cpp60
4 files changed, 102 insertions, 2 deletions
diff --git a/src/mongo/db/command_generic_argument.cpp b/src/mongo/db/command_generic_argument.cpp
index ecd0213865b..8a97110af05 100644
--- a/src/mongo/db/command_generic_argument.cpp
+++ b/src/mongo/db/command_generic_argument.cpp
@@ -50,7 +50,7 @@ struct SpecialArgRecord {
// If that changes, it should be added. When you add to this list, consider whether you
// should also change the filterCommandRequestForPassthrough() function.
// clang-format off
-static constexpr std::array<SpecialArgRecord, 24> specials{{
+static constexpr std::array<SpecialArgRecord, 25> specials{{
// /-isGeneric
// | /-stripFromRequest
// | | /-stripFromReply
@@ -73,6 +73,7 @@ static constexpr std::array<SpecialArgRecord, 24> specials{{
{"lsid"_sd, 1, 0, 0},
{"txnNumber"_sd, 1, 0, 0},
{"autocommit"_sd, 1, 1, 0},
+ {"coordinator"_sd, 1, 1, 0},
{"startTransaction"_sd, 1, 1, 0},
{"stmtId"_sd, 1, 0, 0},
{"$gleStats"_sd, 0, 0, 1},
diff --git a/src/mongo/s/transaction/router_session_runtime_state.cpp b/src/mongo/s/transaction/router_session_runtime_state.cpp
index 38c208a3a35..9ac21b751f9 100644
--- a/src/mongo/s/transaction/router_session_runtime_state.cpp
+++ b/src/mongo/s/transaction/router_session_runtime_state.cpp
@@ -40,6 +40,7 @@ namespace mongo {
namespace {
const char kAutoCommitField[] = "autocommit";
+const char kCoordinatorField[] = "coordinator";
const char kStartTransactionField[] = "startTransaction";
class RouterSessionCatalog {
@@ -109,6 +110,9 @@ bool isTransactionCommand(const BSONObj& cmd) {
} // unnamed namespace
+TransactionParticipant::TransactionParticipant(bool isCoordinator)
+ : _isCoordinator(isCoordinator) {}
+
BSONObj TransactionParticipant::attachTxnFieldsIfNeeded(BSONObj cmd) {
auto isTxnCmd = isTransactionCommand(cmd); // check first before moving cmd.
BSONObjBuilder newCmd(std::move(cmd));
@@ -117,6 +121,10 @@ BSONObj TransactionParticipant::attachTxnFieldsIfNeeded(BSONObj cmd) {
newCmd.append(kStartTransactionField, true);
}
+ if (_isCoordinator) {
+ newCmd.append(kCoordinatorField, true);
+ }
+
newCmd.append(kAutoCommitField, false);
// TODO: append readConcern
@@ -128,6 +136,10 @@ TransactionParticipant::State TransactionParticipant::getState() {
return _state;
}
+bool TransactionParticipant::isCoordinator() {
+ return _isCoordinator;
+}
+
void TransactionParticipant::markAsCommandSent() {
if (_state == State::kMustStart) {
_state = State::kStarted;
@@ -158,6 +170,10 @@ bool RouterSessionRuntimeState::isCheckedOut() {
return _isCheckedOut;
}
+boost::optional<ShardId> RouterSessionRuntimeState::getCoordinatorId() const {
+ return _coordinatorId;
+}
+
TransactionParticipant& RouterSessionRuntimeState::getOrCreateParticipant(const ShardId& shard) {
auto iter = _participants.find(shard.toString());
@@ -165,7 +181,16 @@ TransactionParticipant& RouterSessionRuntimeState::getOrCreateParticipant(const
return iter->second;
}
- auto resultPair = _participants.try_emplace(shard.toString(), TransactionParticipant());
+ // The first participant is chosen as the coordinator.
+ auto isFirstParticipant = _participants.empty();
+ if (isFirstParticipant) {
+ invariant(!_coordinatorId);
+ _coordinatorId = shard.toString();
+ }
+
+ auto resultPair =
+ _participants.try_emplace(shard.toString(), TransactionParticipant(isFirstParticipant));
+
return resultPair.first->second;
}
@@ -207,6 +232,7 @@ void RouterSessionRuntimeState::beginOrContinueTxn(TxnNumber txnNumber, bool sta
_txnNumber = txnNumber;
_participants.clear();
+ _coordinatorId.reset();
}
ScopedRouterSession::ScopedRouterSession(OperationContext* opCtx) : _opCtx(opCtx) {
diff --git a/src/mongo/s/transaction/router_session_runtime_state.h b/src/mongo/s/transaction/router_session_runtime_state.h
index d9d9b40c512..8a6c8340de2 100644
--- a/src/mongo/s/transaction/router_session_runtime_state.h
+++ b/src/mongo/s/transaction/router_session_runtime_state.h
@@ -42,6 +42,8 @@ namespace mongo {
class TransactionParticipant {
public:
+ explicit TransactionParticipant(bool isCoordinator);
+
enum class State {
// Next transaction should include startTransaction.
kMustStart,
@@ -57,12 +59,18 @@ public:
State getState();
/**
+ * True if the participant has been chosen as the coordinator for its transaction.
+ */
+ bool isCoordinator();
+
+ /**
* Mark this participant as a node that has been successfully sent a command.
*/
void markAsCommandSent();
private:
State _state{State::kMustStart};
+ bool _isCoordinator{false};
};
/**
@@ -89,6 +97,8 @@ public:
const LogicalSessionId& getSessionId() const;
+ boost::optional<ShardId> getCoordinatorId() const;
+
/**
* Extract the runtimne state attached to the operation context. Returns nullptr if none is
* attached.
@@ -104,6 +114,9 @@ private:
// Map of current participants of the current transaction.
StringMap<TransactionParticipant> _participants;
+
+ // The id of coordinator participant, used to construct prepare requests.
+ boost::optional<ShardId> _coordinatorId;
};
/**
diff --git a/src/mongo/s/transaction/router_session_runtime_state_test.cpp b/src/mongo/s/transaction/router_session_runtime_state_test.cpp
index 86e91448b14..bfa83b7dde3 100644
--- a/src/mongo/s/transaction/router_session_runtime_state_test.cpp
+++ b/src/mongo/s/transaction/router_session_runtime_state_test.cpp
@@ -45,6 +45,8 @@ TEST(RouterSessionRuntimeStateTest, BasicStartTxn) {
<< "test"
<< "startTransaction"
<< true
+ << "coordinator"
+ << true
<< "autocommit"
<< false);
@@ -71,6 +73,8 @@ TEST(RouterSessionRuntimeStateTest, BasicStartTxn) {
<< "test"));
ASSERT_BSONOBJ_EQ(BSON("update"
<< "test"
+ << "coordinator"
+ << true
<< "autocommit"
<< false),
newCmd);
@@ -99,6 +103,8 @@ TEST(RouterSessionRuntimeStateTest, NewParticipantMustAttachTxn) {
<< "test"
<< "startTransaction"
<< true
+ << "coordinator"
+ << true
<< "autocommit"
<< false);
@@ -116,12 +122,22 @@ TEST(RouterSessionRuntimeStateTest, NewParticipantMustAttachTxn) {
<< "test"));
ASSERT_BSONOBJ_EQ(BSON("update"
<< "test"
+ << "coordinator"
+ << true
<< "autocommit"
<< false),
newCmd);
}
ShardId shard2("b");
+
+ expectedNewObj = BSON("insert"
+ << "test"
+ << "startTransaction"
+ << true
+ << "autocommit"
+ << false);
+
{
auto& participant = sessionState.getOrCreateParticipant(shard2);
auto newCmd = participant.attachTxnFieldsIfNeeded(BSON("insert"
@@ -158,6 +174,8 @@ TEST(RouterSessionRuntimeStateTest, StartingNewTxnShouldClearState) {
<< "test"));
ASSERT_BSONOBJ_EQ(BSON("update"
<< "test"
+ << "coordinator"
+ << true
<< "autocommit"
<< false),
newCmd);
@@ -170,6 +188,8 @@ TEST(RouterSessionRuntimeStateTest, StartingNewTxnShouldClearState) {
<< "test"
<< "startTransaction"
<< true
+ << "coordinator"
+ << true
<< "autocommit"
<< false);
@@ -181,5 +201,45 @@ TEST(RouterSessionRuntimeStateTest, StartingNewTxnShouldClearState) {
}
}
+TEST(RouterSessionRuntimeStateTest, FirstParticipantIsCoordinator) {
+ TxnNumber txnNum{3};
+
+ RouterSessionRuntimeState sessionState({});
+ sessionState.checkOut();
+ sessionState.beginOrContinueTxn(txnNum, true);
+
+ ASSERT_FALSE(sessionState.getCoordinatorId());
+
+ ShardId shard1("a");
+
+ {
+ auto& participant = sessionState.getOrCreateParticipant(shard1);
+ ASSERT(participant.isCoordinator());
+ ASSERT(sessionState.getCoordinatorId());
+ ASSERT_EQ(*sessionState.getCoordinatorId(), shard1);
+ }
+
+ ShardId shard2("b");
+
+ {
+ auto& participant = sessionState.getOrCreateParticipant(shard2);
+ ASSERT_FALSE(participant.isCoordinator());
+ ASSERT(sessionState.getCoordinatorId());
+ ASSERT_EQ(*sessionState.getCoordinatorId(), shard1);
+ }
+
+ TxnNumber txnNum2{5};
+ sessionState.beginOrContinueTxn(txnNum2, true);
+
+ ASSERT_FALSE(sessionState.getCoordinatorId());
+
+ {
+ auto& participant = sessionState.getOrCreateParticipant(shard2);
+ ASSERT(participant.isCoordinator());
+ ASSERT(sessionState.getCoordinatorId());
+ ASSERT_EQ(*sessionState.getCoordinatorId(), shard2);
+ }
+}
+
} // unnamed namespace
} // namespace mongo