summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2019-03-12 16:59:13 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2019-03-13 19:10:21 -0400
commit41eac7940607f4750f078d89962240bc88fa1359 (patch)
tree0ac8894863da0163b49a3e6db0950d8369a1b092
parentcd44c20c13d2452c78d1131fdedebd3107ecffea (diff)
downloadmongo-41eac7940607f4750f078d89962240bc88fa1359.tar.gz
SERVER-39877 Clean up transaction_router_test.cpp and transaction_router.cpp
-rw-r--r--src/mongo/s/commands/cluster_commit_transaction_cmd.cpp7
-rw-r--r--src/mongo/s/transaction_router.cpp74
-rw-r--r--src/mongo/s/transaction_router.h12
-rw-r--r--src/mongo/s/transaction_router_test.cpp58
4 files changed, 75 insertions, 76 deletions
diff --git a/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp b/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp
index a3e78983249..38d6061b61b 100644
--- a/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp
+++ b/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp
@@ -77,9 +77,10 @@ public:
"commitTransaction can only be run within a session",
txnRouter != nullptr);
- auto commitCmd = CommitTransaction::parse(IDLParserErrorContext("commit cmd"), cmdObj);
- auto cmdResponse = txnRouter->commitTransaction(opCtx, commitCmd.getRecoveryToken());
- CommandHelpers::filterCommandReplyForPassthrough(cmdResponse.response, &result);
+ const auto commitCmd =
+ CommitTransaction::parse(IDLParserErrorContext("commit cmd"), cmdObj);
+ auto commitRes = txnRouter->commitTransaction(opCtx, commitCmd.getRecoveryToken());
+ CommandHelpers::filterCommandReplyForPassthrough(commitRes, &result);
return true;
}
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 58826816209..754dc91094f 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -607,7 +607,7 @@ const LogicalSessionId& TransactionRouter::_sessionId() const {
return owningSession->getSessionId();
}
-Shard::CommandResponse TransactionRouter::_commitSingleShardTransaction(OperationContext* opCtx) {
+BSONObj TransactionRouter::_commitSingleShardTransaction(OperationContext* opCtx) {
auto shardRegistry = Grid::get(opCtx)->shardRegistry();
const auto citer = _participants.cbegin();
@@ -618,23 +618,24 @@ Shard::CommandResponse TransactionRouter::_commitSingleShardTransaction(Operatio
auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId));
LOG(0) << txnIdToString()
- << " Committing single shard transaction, single participant: " << shardId;
+ << " Committing single-shard transaction, single participant: " << shardId;
CommitTransaction commitCmd;
commitCmd.setDbName(NamespaceString::kAdminDb);
return uassertStatusOK(shard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- participant.attachTxnFieldsIfNeeded(
- commitCmd.toBSON(
- BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON())),
- false),
- Shard::RetryPolicy::kIdempotent));
-}
-
-Shard::CommandResponse TransactionRouter::_commitMultiShardTransaction(OperationContext* opCtx) {
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ participant.attachTxnFieldsIfNeeded(
+ commitCmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField
+ << opCtx->getWriteConcern().toBSON())),
+ false),
+ Shard::RetryPolicy::kIdempotent))
+ .response;
+}
+
+BSONObj TransactionRouter::_commitMultiShardTransaction(OperationContext* opCtx) {
invariant(_coordinatorId);
auto coordinatorIter = _participants.find(*_coordinatorId);
invariant(coordinatorIter != _participants.end());
@@ -696,20 +697,22 @@ Shard::CommandResponse TransactionRouter::_commitMultiShardTransaction(Operation
_initiatedTwoPhaseCommit = true;
LOG(0) << txnIdToString()
- << " Committing multi shard transaction, coordinator: " << *_coordinatorId;
-
- return uassertStatusOK(coordinatorShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- coordinatorIter->second.attachTxnFieldsIfNeeded(
- coordinateCommitCmd.toBSON(
- BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON())),
- false),
- Shard::RetryPolicy::kIdempotent));
-}
-
-Shard::CommandResponse TransactionRouter::commitTransaction(
+ << " Committing multi-shard transaction, coordinator: " << *_coordinatorId;
+
+ return uassertStatusOK(
+ coordinatorShard->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ coordinatorIter->second.attachTxnFieldsIfNeeded(
+ coordinateCommitCmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField
+ << opCtx->getWriteConcern().toBSON())),
+ false),
+ Shard::RetryPolicy::kIdempotent))
+ .response;
+}
+
+BSONObj TransactionRouter::commitTransaction(
OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken) {
if (_isRecoveringCommit) {
uassert(50940,
@@ -725,7 +728,7 @@ Shard::CommandResponse TransactionRouter::commitTransaction(
uassert(ErrorCodes::IllegalOperation,
"Cannot commit without participants",
_txnNumber != kUninitializedTxnNumber);
- return {boost::none, BSON("ok" << true), Status::OK(), Status::OK()};
+ return BSON("ok" << 1);
}
if (_participants.size() == 1) {
@@ -800,8 +803,8 @@ void TransactionRouter::appendRecoveryToken(BSONObjBuilder* builder) const {
recoveryTokenBuilder.doneFast();
}
-Shard::CommandResponse TransactionRouter::_commitWithRecoveryToken(
- OperationContext* opCtx, const TxnRecoveryToken& recoveryToken) {
+BSONObj TransactionRouter::_commitWithRecoveryToken(OperationContext* opCtx,
+ const TxnRecoveryToken& recoveryToken) {
const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
const auto& coordinatorId = recoveryToken.getShardId();
@@ -823,11 +826,12 @@ Shard::CommandResponse TransactionRouter::_commitWithRecoveryToken(
auto coordinatorShard = uassertStatusOK(shardRegistry->getShard(opCtx, coordinatorId));
return uassertStatusOK(coordinatorShard->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- coordinateCommitCmd,
- Shard::RetryPolicy::kIdempotent));
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ coordinateCommitCmd,
+ Shard::RetryPolicy::kIdempotent))
+ .response;
}
} // namespace mongo
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index eb7f7c48874..ce349fe40e2 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -234,8 +234,8 @@ public:
* Commits the transaction. For transactions with multiple participants, this will initiate
* the two phase commit procedure.
*/
- Shard::CommandResponse commitTransaction(
- OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken);
+ BSONObj commitTransaction(OperationContext* opCtx,
+ const boost::optional<TxnRecoveryToken>& recoveryToken);
/**
* Sends abort to all participants and returns the responses from all shards.
@@ -281,15 +281,15 @@ private:
/**
* Run basic commit for transactions that touched a single shard.
*/
- Shard::CommandResponse _commitSingleShardTransaction(OperationContext* opCtx);
+ BSONObj _commitSingleShardTransaction(OperationContext* opCtx);
- Shard::CommandResponse _commitWithRecoveryToken(OperationContext* opCtx,
- const TxnRecoveryToken& recoveryToken);
+ BSONObj _commitWithRecoveryToken(OperationContext* opCtx,
+ const TxnRecoveryToken& recoveryToken);
/**
* Run two phase commit for transactions that touched multiple shards.
*/
- Shard::CommandResponse _commitMultiShardTransaction(OperationContext* opCtx);
+ BSONObj _commitMultiShardTransaction(OperationContext* opCtx);
/**
* Sets the given logical time as the atClusterTime for the transaction to be the greater of the
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 65288bcbbdc..0c9764180cf 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -663,7 +663,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
}
}
-TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipants) {
+TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsOrRecoveryToken) {
TxnNumber txnNum{3};
auto& txnRouter(*TransactionRouter::get(operationContext()));
@@ -702,24 +702,21 @@ void checkWriteConcern(const BSONObj& cmdObj, const WriteConcernOptions& expecte
ASSERT_BSONOBJ_EQ(expectedWC.toBSON(), writeCocernElem.Obj());
}
-TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) {
- LogicalSessionId lsid(makeLogicalSessionIdForTest());
+TEST_F(TransactionRouterTestWithDefaultSession,
+ SendCommitDirectlyForSingleParticipantThatDidAWrite) {
TxnNumber txnNum{3};
- auto opCtx = operationContext();
- opCtx->setLogicalSessionId(lsid);
- opCtx->setTxnNumber(txnNum);
-
- RouterOperationContextSession scopedSession(opCtx);
- auto txnRouter = TransactionRouter::get(opCtx);
+ auto& txnRouter(*TransactionRouter::get(operationContext()));
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(shard1, {});
- txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.attachTxnFieldsIfNeeded(shard1, {});
+ txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ TxnRecoveryToken recoveryToken(shard1);
auto future =
- launchAsync([&] { txnRouter->commitTransaction(operationContext(), boost::none); });
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -728,7 +725,7 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) {
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
ASSERT_EQ(cmdName, "commitTransaction");
- checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+ checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
return BSON("ok" << 1);
});
@@ -736,29 +733,26 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) {
future.timed_get(kFutureTimeout);
}
-TEST_F(TransactionRouterTest, SendCoordinateCommitForMultipleParticipantsWithRecoveryToken) {
- LogicalSessionId lsid(makeLogicalSessionIdForTest());
+TEST_F(TransactionRouterTestWithDefaultSession,
+ SendCoordinateCommitForMultipleParticipantsAllDidWrites) {
TxnNumber txnNum{3};
- auto opCtx = operationContext();
- opCtx->setLogicalSessionId(lsid);
- opCtx->setTxnNumber(txnNum);
-
- RouterOperationContextSession scopedSession(opCtx);
- auto txnRouter = TransactionRouter::get(opCtx);
+ auto& txnRouter(*TransactionRouter::get(operationContext()));
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(shard1, {});
- txnRouter->attachTxnFieldsIfNeeded(shard2, {});
- txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
- txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter.attachTxnFieldsIfNeeded(shard1, {});
+ txnRouter.attachTxnFieldsIfNeeded(shard2, {});
+ txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
TxnRecoveryToken recoveryToken(shard1);
auto future =
- launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); });
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -777,7 +771,7 @@ TEST_F(TransactionRouterTest, SendCoordinateCommitForMultipleParticipantsWithRec
expectedParticipants.erase(shardId);
}
- checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+ checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true);
return BSON("ok" << 1);
});