diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2019-03-12 16:59:13 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2019-03-13 19:10:21 -0400 |
commit | 41eac7940607f4750f078d89962240bc88fa1359 (patch) | |
tree | 0ac8894863da0163b49a3e6db0950d8369a1b092 | |
parent | cd44c20c13d2452c78d1131fdedebd3107ecffea (diff) | |
download | mongo-41eac7940607f4750f078d89962240bc88fa1359.tar.gz |
SERVER-39877 Clean up transaction_router_test.cpp and transaction_router.cpp
-rw-r--r-- | src/mongo/s/commands/cluster_commit_transaction_cmd.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 74 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.h | 12 | ||||
-rw-r--r-- | src/mongo/s/transaction_router_test.cpp | 58 |
4 files changed, 75 insertions, 76 deletions
diff --git a/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp b/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp index a3e78983249..38d6061b61b 100644 --- a/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp +++ b/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp @@ -77,9 +77,10 @@ public: "commitTransaction can only be run within a session", txnRouter != nullptr); - auto commitCmd = CommitTransaction::parse(IDLParserErrorContext("commit cmd"), cmdObj); - auto cmdResponse = txnRouter->commitTransaction(opCtx, commitCmd.getRecoveryToken()); - CommandHelpers::filterCommandReplyForPassthrough(cmdResponse.response, &result); + const auto commitCmd = + CommitTransaction::parse(IDLParserErrorContext("commit cmd"), cmdObj); + auto commitRes = txnRouter->commitTransaction(opCtx, commitCmd.getRecoveryToken()); + CommandHelpers::filterCommandReplyForPassthrough(commitRes, &result); return true; } diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 58826816209..754dc91094f 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -607,7 +607,7 @@ const LogicalSessionId& TransactionRouter::_sessionId() const { return owningSession->getSessionId(); } -Shard::CommandResponse TransactionRouter::_commitSingleShardTransaction(OperationContext* opCtx) { +BSONObj TransactionRouter::_commitSingleShardTransaction(OperationContext* opCtx) { auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto citer = _participants.cbegin(); @@ -618,23 +618,24 @@ Shard::CommandResponse TransactionRouter::_commitSingleShardTransaction(Operatio auto shard = uassertStatusOK(shardRegistry->getShard(opCtx, shardId)); LOG(0) << txnIdToString() - << " Committing single shard transaction, single participant: " << shardId; + << " Committing single-shard transaction, single participant: " << shardId; CommitTransaction commitCmd; commitCmd.setDbName(NamespaceString::kAdminDb); return uassertStatusOK(shard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - participant.attachTxnFieldsIfNeeded( - commitCmd.toBSON( - BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON())), - false), - Shard::RetryPolicy::kIdempotent)); -} - -Shard::CommandResponse TransactionRouter::_commitMultiShardTransaction(OperationContext* opCtx) { + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + participant.attachTxnFieldsIfNeeded( + commitCmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField + << opCtx->getWriteConcern().toBSON())), + false), + Shard::RetryPolicy::kIdempotent)) + .response; +} + +BSONObj TransactionRouter::_commitMultiShardTransaction(OperationContext* opCtx) { invariant(_coordinatorId); auto coordinatorIter = _participants.find(*_coordinatorId); invariant(coordinatorIter != _participants.end()); @@ -696,20 +697,22 @@ Shard::CommandResponse TransactionRouter::_commitMultiShardTransaction(Operation _initiatedTwoPhaseCommit = true; LOG(0) << txnIdToString() - << " Committing multi shard transaction, coordinator: " << *_coordinatorId; - - return uassertStatusOK(coordinatorShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - coordinatorIter->second.attachTxnFieldsIfNeeded( - coordinateCommitCmd.toBSON( - BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON())), - false), - Shard::RetryPolicy::kIdempotent)); -} - -Shard::CommandResponse TransactionRouter::commitTransaction( + << " Committing multi-shard transaction, coordinator: " << *_coordinatorId; + + return uassertStatusOK( + coordinatorShard->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + coordinatorIter->second.attachTxnFieldsIfNeeded( + coordinateCommitCmd.toBSON(BSON(WriteConcernOptions::kWriteConcernField + << opCtx->getWriteConcern().toBSON())), + false), + Shard::RetryPolicy::kIdempotent)) + .response; +} + +BSONObj TransactionRouter::commitTransaction( OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken) { if (_isRecoveringCommit) { uassert(50940, @@ -725,7 +728,7 @@ Shard::CommandResponse TransactionRouter::commitTransaction( uassert(ErrorCodes::IllegalOperation, "Cannot commit without participants", _txnNumber != kUninitializedTxnNumber); - return {boost::none, BSON("ok" << true), Status::OK(), Status::OK()}; + return BSON("ok" << 1); } if (_participants.size() == 1) { @@ -800,8 +803,8 @@ void TransactionRouter::appendRecoveryToken(BSONObjBuilder* builder) const { recoveryTokenBuilder.doneFast(); } -Shard::CommandResponse TransactionRouter::_commitWithRecoveryToken( - OperationContext* opCtx, const TxnRecoveryToken& recoveryToken) { +BSONObj TransactionRouter::_commitWithRecoveryToken(OperationContext* opCtx, + const TxnRecoveryToken& recoveryToken) { const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); const auto& coordinatorId = recoveryToken.getShardId(); @@ -823,11 +826,12 @@ Shard::CommandResponse TransactionRouter::_commitWithRecoveryToken( auto coordinatorShard = uassertStatusOK(shardRegistry->getShard(opCtx, coordinatorId)); return uassertStatusOK(coordinatorShard->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - coordinateCommitCmd, - Shard::RetryPolicy::kIdempotent)); + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + coordinateCommitCmd, + Shard::RetryPolicy::kIdempotent)) + .response; } } // namespace mongo diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index eb7f7c48874..ce349fe40e2 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -234,8 +234,8 @@ public: * Commits the transaction. For transactions with multiple participants, this will initiate * the two phase commit procedure. */ - Shard::CommandResponse commitTransaction( - OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken); + BSONObj commitTransaction(OperationContext* opCtx, + const boost::optional<TxnRecoveryToken>& recoveryToken); /** * Sends abort to all participants and returns the responses from all shards. @@ -281,15 +281,15 @@ private: /** * Run basic commit for transactions that touched a single shard. */ - Shard::CommandResponse _commitSingleShardTransaction(OperationContext* opCtx); + BSONObj _commitSingleShardTransaction(OperationContext* opCtx); - Shard::CommandResponse _commitWithRecoveryToken(OperationContext* opCtx, - const TxnRecoveryToken& recoveryToken); + BSONObj _commitWithRecoveryToken(OperationContext* opCtx, + const TxnRecoveryToken& recoveryToken); /** * Run two phase commit for transactions that touched multiple shards. */ - Shard::CommandResponse _commitMultiShardTransaction(OperationContext* opCtx); + BSONObj _commitMultiShardTransaction(OperationContext* opCtx); /** * Sets the given logical time as the atClusterTime for the transaction to be the greater of the diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 65288bcbbdc..0c9764180cf 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -663,7 +663,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter } } -TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipants) { +TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsOrRecoveryToken) { TxnNumber txnNum{3}; auto& txnRouter(*TransactionRouter::get(operationContext())); @@ -702,24 +702,21 @@ void checkWriteConcern(const BSONObj& cmdObj, const WriteConcernOptions& expecte ASSERT_BSONOBJ_EQ(expectedWC.toBSON(), writeCocernElem.Obj()); } -TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) { - LogicalSessionId lsid(makeLogicalSessionIdForTest()); +TEST_F(TransactionRouterTestWithDefaultSession, + SendCommitDirectlyForSingleParticipantThatDidAWrite) { TxnNumber txnNum{3}; - auto opCtx = operationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setTxnNumber(txnNum); - - RouterOperationContextSession scopedSession(opCtx); - auto txnRouter = TransactionRouter::get(opCtx); + auto& txnRouter(*TransactionRouter::get(operationContext())); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(shard1, {}); - txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.attachTxnFieldsIfNeeded(shard1, {}); + txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + TxnRecoveryToken recoveryToken(shard1); auto future = - launchAsync([&] { txnRouter->commitTransaction(operationContext(), boost::none); }); + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -728,7 +725,7 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) { auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); ASSERT_EQ(cmdName, "commitTransaction"); - checkSessionDetails(request.cmdObj, lsid, txnNum, true); + checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); return BSON("ok" << 1); }); @@ -736,29 +733,26 @@ TEST_F(TransactionRouterTest, SendCommitDirectlyForSingleParticipants) { future.timed_get(kFutureTimeout); } -TEST_F(TransactionRouterTest, SendCoordinateCommitForMultipleParticipantsWithRecoveryToken) { - LogicalSessionId lsid(makeLogicalSessionIdForTest()); +TEST_F(TransactionRouterTestWithDefaultSession, + SendCoordinateCommitForMultipleParticipantsAllDidWrites) { TxnNumber txnNum{3}; - auto opCtx = operationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setTxnNumber(txnNum); - - RouterOperationContextSession scopedSession(opCtx); - auto txnRouter = TransactionRouter::get(opCtx); + auto& txnRouter(*TransactionRouter::get(operationContext())); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(shard1, {}); - txnRouter->attachTxnFieldsIfNeeded(shard2, {}); - txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse); - txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter.attachTxnFieldsIfNeeded(shard1, {}); + txnRouter.attachTxnFieldsIfNeeded(shard2, {}); + txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn( + operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); TxnRecoveryToken recoveryToken(shard1); auto future = - launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); }); + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -777,7 +771,7 @@ TEST_F(TransactionRouterTest, SendCoordinateCommitForMultipleParticipantsWithRec expectedParticipants.erase(shardId); } - checkSessionDetails(request.cmdObj, lsid, txnNum, true); + checkSessionDetails(request.cmdObj, getSessionId(), txnNum, true); return BSON("ok" << 1); }); |