diff options
author | Esha Maharishi <esha.maharishi@mongodb.com> | 2019-05-14 13:46:43 -0400 |
---|---|---|
committer | Esha Maharishi <esha.maharishi@mongodb.com> | 2019-05-16 14:07:44 -0400 |
commit | 8ab4e38d8ad3eb6901f79dfbd67b8b2cf6646c2d (patch) | |
tree | 17d980977fa6210c815e32d463b5c7ea8393cdc2 /src | |
parent | 102f13ddd63f90d00dd8489280522a94d49ca5e9 (diff) | |
download | mongo-8ab4e38d8ad3eb6901f79dfbd67b8b2cf6646c2d.tar.gz |
SERVER-40001 abortTransaction through mongos does not properly set error labels
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/s/commands/cluster_abort_transaction_cmd.cpp | 7 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 54 | ||||
-rw-r--r-- | src/mongo/s/transaction_router.h | 16 | ||||
-rw-r--r-- | src/mongo/s/transaction_router_test.cpp | 110 |
4 files changed, 162 insertions, 25 deletions
diff --git a/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp b/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp index f5323d1c906..7defb877a0b 100644 --- a/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp +++ b/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp @@ -76,10 +76,9 @@ public: "abortTransaction can only be run within a session", txnRouter); - auto response = txnRouter->abortTransaction(opCtx); - - std::string errMsg; - return appendRawResponses(opCtx, &errMsg, &result, response); + auto abortRes = txnRouter->abortTransaction(opCtx); + CommandHelpers::filterCommandReplyForPassthrough(abortRes, &result); + return true; } } clusterAbortTransactionCmd; diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 9cc6343f601..3ab52099e4c 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -780,8 +780,7 @@ BSONObj TransactionRouter::commitTransaction( return _handOffCommitToCoordinator(opCtx); } -std::vector<AsyncRequestsSender::Response> TransactionRouter::abortTransaction( - OperationContext* opCtx, bool isImplicit) { +BSONObj TransactionRouter::abortTransaction(OperationContext* opCtx) { // The router has yet to send any commands to a remote shard for this transaction. // Return the same error that would have been returned by a shard. uassert(ErrorCodes::NoSuchTransaction, @@ -789,23 +788,41 @@ std::vector<AsyncRequestsSender::Response> TransactionRouter::abortTransaction( !_participants.empty()); auto abortCmd = BSON("abortTransaction" << 1); - std::vector<AsyncRequestsSender::Request> abortRequests; for (const auto& participantEntry : _participants) { abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd); } - // Implicit aborts log earlier. - if (!isImplicit) { - LOG(3) << txnIdToString() << " Aborting transaction on " << _participants.size() - << " shard(s)"; + LOG(3) << txnIdToString() << " Aborting transaction on " << _participants.size() << " shard(s)"; + + const auto responses = gatherResponses(opCtx, + NamespaceString::kAdminDb, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + Shard::RetryPolicy::kIdempotent, + abortRequests); + + BSONObj lastResult; + for (const auto& response : responses) { + uassertStatusOK(response.swResponse); + + lastResult = response.swResponse.getValue().data; + + // If any shard returned an error, return the error immediately. + const auto commandStatus = getStatusFromCommandResult(lastResult); + if (!commandStatus.isOK()) { + return lastResult; + } + + // If any participant had a writeConcern error, return the participant's writeConcern + // error immediately. + const auto writeConcernStatus = getWriteConcernStatusFromCommandResult(lastResult); + if (!writeConcernStatus.isOK()) { + return lastResult; + } } - return gatherResponses(opCtx, - NamespaceString::kAdminDb, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - Shard::RetryPolicy::kIdempotent, - abortRequests); + // If all the responses were ok, return the last response. + return lastResult; } void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx, @@ -821,11 +838,22 @@ void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx, return; } + auto abortCmd = BSON("abortTransaction" << 1); + std::vector<AsyncRequestsSender::Request> abortRequests; + for (const auto& participantEntry : _participants) { + abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd); + } + LOG(3) << txnIdToString() << " Implicitly aborting transaction on " << _participants.size() << " shard(s) due to error: " << errorStatus; try { - abortTransaction(opCtx, true /*isImplicit*/); + // Ignore the responses. + gatherResponses(opCtx, + NamespaceString::kAdminDb, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + Shard::RetryPolicy::kIdempotent, + abortRequests); } catch (const DBException& ex) { LOG(3) << txnIdToString() << " Implicitly aborting transaction failed " << causedBy(ex.toStatus()); diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index 934bd8cfcc7..1cd3cd0dac7 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -238,17 +238,23 @@ public: const boost::optional<ShardId>& getRecoveryShardId() const; /** - * Commits the transaction. For transactions that performed writes to multiple shards, this will - * hand off the participant list to the coordinator to do two-phase commit. + * Commits the transaction. + * + * For transactions that only did reads or only wrote to one shard, sends commit directly to the + * participants and returns the first error response or the last (success) response. + * + * For transactions that performed writes to multiple shards, hands off the participant list to + * the coordinator to do two-phase commit, and returns the coordinator's response. */ BSONObj commitTransaction(OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken); /** - * Sends abort to all participants and returns the responses from all shards. + * Sends abort to all participants. + * + * Returns the first error response or the last (success) response. */ - std::vector<AsyncRequestsSender::Response> abortTransaction(OperationContext* opCtx, - bool isImplicit = false); + BSONObj abortTransaction(OperationContext* opCtx); /** * Sends abort to all shards in the current participant list. Will retry on retryable errors, diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index d6dd2c2fe36..da7a80b5130 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -52,6 +52,8 @@ using executor::RemoteCommandRequest; const BSONObj kOkReadOnlyFalseResponse = BSON("ok" << 1 << "readOnly" << false); const BSONObj kOkReadOnlyTrueResponse = BSON("ok" << 1 << "readOnly" << true); +const BSONObj kNoSuchTransactionResponse = + BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction); class TransactionRouterTest : public ShardingTestFixture { protected: @@ -1869,10 +1871,10 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) { }); auto response = future.default_timed_get(); - ASSERT_FALSE(response.empty()); + ASSERT_BSONOBJ_EQ(kOkReadOnlyFalseResponse, response); } -TEST_F(TransactionRouterTest, AbortForMultipleParticipants) { +TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) { LogicalSessionId lsid(makeLogicalSessionIdForTest()); TxnNumber txnNum{3}; @@ -1912,7 +1914,109 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipants) { } auto response = future.default_timed_get(); - ASSERT_FALSE(response.empty()); + ASSERT_BSONOBJ_EQ(kOkReadOnlyFalseResponse, response); +} + +TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransaction) { + LogicalSessionId lsid(makeLogicalSessionIdForTest()); + TxnNumber txnNum{3}; + + auto opCtx = operationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setTxnNumber(txnNum); + + RouterOperationContextSession scopedSession(opCtx); + auto txnRouter = TransactionRouter::get(opCtx); + + txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter->setDefaultAtClusterTime(operationContext()); + txnRouter->attachTxnFieldsIfNeeded(shard1, {}); + txnRouter->attachTxnFieldsIfNeeded(shard2, {}); + txnRouter->attachTxnFieldsIfNeeded(shard3, {}); + txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter->processParticipantResponse(shard3, kOkReadOnlyFalseResponse); + + auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); }); + + std::map<HostAndPort, boost::optional<bool>> targets = { + {hostAndPort1, true}, {hostAndPort2, {}}, {hostAndPort3, {}}}; + + int count = 0; + while (!targets.empty()) { + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + auto target = targets.find(request.target); + ASSERT(target != targets.end()); + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "abortTransaction"); + + checkSessionDetails(request.cmdObj, lsid, txnNum, target->second); + + targets.erase(request.target); + + // The middle response is NoSuchTransaction, the rest are success. + return (count == 1 ? kNoSuchTransactionResponse : kOkReadOnlyFalseResponse); + }); + count++; + } + + auto response = future.default_timed_get(); + ASSERT_BSONOBJ_EQ(kNoSuchTransactionResponse, response); +} + +TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError) { + LogicalSessionId lsid(makeLogicalSessionIdForTest()); + TxnNumber txnNum{3}; + + auto opCtx = operationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setTxnNumber(txnNum); + + RouterOperationContextSession scopedSession(opCtx); + auto txnRouter = TransactionRouter::get(opCtx); + + txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter->setDefaultAtClusterTime(operationContext()); + txnRouter->attachTxnFieldsIfNeeded(shard1, {}); + txnRouter->attachTxnFieldsIfNeeded(shard2, {}); + txnRouter->attachTxnFieldsIfNeeded(shard3, {}); + txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter->processParticipantResponse(shard3, kOkReadOnlyFalseResponse); + + auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); }); + + std::map<HostAndPort, boost::optional<bool>> targets = { + {hostAndPort1, true}, {hostAndPort2, {}}, {hostAndPort3, {}}}; + + int count = 0; + while (!targets.empty()) { + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) -> StatusWith<BSONObj> { + auto target = targets.find(request.target); + ASSERT(target != targets.end()); + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "abortTransaction"); + + checkSessionDetails(request.cmdObj, lsid, txnNum, target->second); + + targets.erase(request.target); + + // The middle response is a "network error", the rest are success. Use InternalError as + // the "network error" because the server will retry three times on actual network + // errors; this just skips the retries. + if (count == 1) { + return Status{ErrorCodes::InternalError, "dummy"}; + } + return kOkReadOnlyFalseResponse; + }); + count++; + } + + ASSERT_THROWS_CODE(future.default_timed_get(), AssertionException, ErrorCodes::InternalError); } TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNewParticipants) { |