summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorEsha Maharishi <esha.maharishi@mongodb.com>2019-05-14 13:46:43 -0400
committerEsha Maharishi <esha.maharishi@mongodb.com>2019-05-16 14:07:44 -0400
commit8ab4e38d8ad3eb6901f79dfbd67b8b2cf6646c2d (patch)
tree17d980977fa6210c815e32d463b5c7ea8393cdc2 /src
parent102f13ddd63f90d00dd8489280522a94d49ca5e9 (diff)
downloadmongo-8ab4e38d8ad3eb6901f79dfbd67b8b2cf6646c2d.tar.gz
SERVER-40001 abortTransaction through mongos does not properly set error labels
Diffstat (limited to 'src')
-rw-r--r--src/mongo/s/commands/cluster_abort_transaction_cmd.cpp7
-rw-r--r--src/mongo/s/transaction_router.cpp54
-rw-r--r--src/mongo/s/transaction_router.h16
-rw-r--r--src/mongo/s/transaction_router_test.cpp110
4 files changed, 162 insertions, 25 deletions
diff --git a/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp b/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp
index f5323d1c906..7defb877a0b 100644
--- a/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp
+++ b/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp
@@ -76,10 +76,9 @@ public:
"abortTransaction can only be run within a session",
txnRouter);
- auto response = txnRouter->abortTransaction(opCtx);
-
- std::string errMsg;
- return appendRawResponses(opCtx, &errMsg, &result, response);
+ auto abortRes = txnRouter->abortTransaction(opCtx);
+ CommandHelpers::filterCommandReplyForPassthrough(abortRes, &result);
+ return true;
}
} clusterAbortTransactionCmd;
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 9cc6343f601..3ab52099e4c 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -780,8 +780,7 @@ BSONObj TransactionRouter::commitTransaction(
return _handOffCommitToCoordinator(opCtx);
}
-std::vector<AsyncRequestsSender::Response> TransactionRouter::abortTransaction(
- OperationContext* opCtx, bool isImplicit) {
+BSONObj TransactionRouter::abortTransaction(OperationContext* opCtx) {
// The router has yet to send any commands to a remote shard for this transaction.
// Return the same error that would have been returned by a shard.
uassert(ErrorCodes::NoSuchTransaction,
@@ -789,23 +788,41 @@ std::vector<AsyncRequestsSender::Response> TransactionRouter::abortTransaction(
!_participants.empty());
auto abortCmd = BSON("abortTransaction" << 1);
-
std::vector<AsyncRequestsSender::Request> abortRequests;
for (const auto& participantEntry : _participants) {
abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd);
}
- // Implicit aborts log earlier.
- if (!isImplicit) {
- LOG(3) << txnIdToString() << " Aborting transaction on " << _participants.size()
- << " shard(s)";
+ LOG(3) << txnIdToString() << " Aborting transaction on " << _participants.size() << " shard(s)";
+
+ const auto responses = gatherResponses(opCtx,
+ NamespaceString::kAdminDb,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ Shard::RetryPolicy::kIdempotent,
+ abortRequests);
+
+ BSONObj lastResult;
+ for (const auto& response : responses) {
+ uassertStatusOK(response.swResponse);
+
+ lastResult = response.swResponse.getValue().data;
+
+ // If any shard returned an error, return the error immediately.
+ const auto commandStatus = getStatusFromCommandResult(lastResult);
+ if (!commandStatus.isOK()) {
+ return lastResult;
+ }
+
+ // If any participant had a writeConcern error, return the participant's writeConcern
+ // error immediately.
+ const auto writeConcernStatus = getWriteConcernStatusFromCommandResult(lastResult);
+ if (!writeConcernStatus.isOK()) {
+ return lastResult;
+ }
}
- return gatherResponses(opCtx,
- NamespaceString::kAdminDb,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- Shard::RetryPolicy::kIdempotent,
- abortRequests);
+ // If all the responses were ok, return the last response.
+ return lastResult;
}
void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx,
@@ -821,11 +838,22 @@ void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx,
return;
}
+ auto abortCmd = BSON("abortTransaction" << 1);
+ std::vector<AsyncRequestsSender::Request> abortRequests;
+ for (const auto& participantEntry : _participants) {
+ abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd);
+ }
+
LOG(3) << txnIdToString() << " Implicitly aborting transaction on " << _participants.size()
<< " shard(s) due to error: " << errorStatus;
try {
- abortTransaction(opCtx, true /*isImplicit*/);
+ // Ignore the responses.
+ gatherResponses(opCtx,
+ NamespaceString::kAdminDb,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ Shard::RetryPolicy::kIdempotent,
+ abortRequests);
} catch (const DBException& ex) {
LOG(3) << txnIdToString() << " Implicitly aborting transaction failed "
<< causedBy(ex.toStatus());
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index 934bd8cfcc7..1cd3cd0dac7 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -238,17 +238,23 @@ public:
const boost::optional<ShardId>& getRecoveryShardId() const;
/**
- * Commits the transaction. For transactions that performed writes to multiple shards, this will
- * hand off the participant list to the coordinator to do two-phase commit.
+ * Commits the transaction.
+ *
+ * For transactions that only did reads or only wrote to one shard, sends commit directly to the
+ * participants and returns the first error response or the last (success) response.
+ *
+ * For transactions that performed writes to multiple shards, hands off the participant list to
+ * the coordinator to do two-phase commit, and returns the coordinator's response.
*/
BSONObj commitTransaction(OperationContext* opCtx,
const boost::optional<TxnRecoveryToken>& recoveryToken);
/**
- * Sends abort to all participants and returns the responses from all shards.
+ * Sends abort to all participants.
+ *
+ * Returns the first error response or the last (success) response.
*/
- std::vector<AsyncRequestsSender::Response> abortTransaction(OperationContext* opCtx,
- bool isImplicit = false);
+ BSONObj abortTransaction(OperationContext* opCtx);
/**
* Sends abort to all shards in the current participant list. Will retry on retryable errors,
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index d6dd2c2fe36..da7a80b5130 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -52,6 +52,8 @@ using executor::RemoteCommandRequest;
const BSONObj kOkReadOnlyFalseResponse = BSON("ok" << 1 << "readOnly" << false);
const BSONObj kOkReadOnlyTrueResponse = BSON("ok" << 1 << "readOnly" << true);
+const BSONObj kNoSuchTransactionResponse =
+ BSON("ok" << 0 << "code" << ErrorCodes::NoSuchTransaction);
class TransactionRouterTest : public ShardingTestFixture {
protected:
@@ -1869,10 +1871,10 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) {
});
auto response = future.default_timed_get();
- ASSERT_FALSE(response.empty());
+ ASSERT_BSONOBJ_EQ(kOkReadOnlyFalseResponse, response);
}
-TEST_F(TransactionRouterTest, AbortForMultipleParticipants) {
+TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) {
LogicalSessionId lsid(makeLogicalSessionIdForTest());
TxnNumber txnNum{3};
@@ -1912,7 +1914,109 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipants) {
}
auto response = future.default_timed_get();
- ASSERT_FALSE(response.empty());
+ ASSERT_BSONOBJ_EQ(kOkReadOnlyFalseResponse, response);
+}
+
+TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransaction) {
+ LogicalSessionId lsid(makeLogicalSessionIdForTest());
+ TxnNumber txnNum{3};
+
+ auto opCtx = operationContext();
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNum);
+
+ RouterOperationContextSession scopedSession(opCtx);
+ auto txnRouter = TransactionRouter::get(opCtx);
+
+ txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter->setDefaultAtClusterTime(operationContext());
+ txnRouter->attachTxnFieldsIfNeeded(shard1, {});
+ txnRouter->attachTxnFieldsIfNeeded(shard2, {});
+ txnRouter->attachTxnFieldsIfNeeded(shard3, {});
+ txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter->processParticipantResponse(shard3, kOkReadOnlyFalseResponse);
+
+ auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); });
+
+ std::map<HostAndPort, boost::optional<bool>> targets = {
+ {hostAndPort1, true}, {hostAndPort2, {}}, {hostAndPort3, {}}};
+
+ int count = 0;
+ while (!targets.empty()) {
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ auto target = targets.find(request.target);
+ ASSERT(target != targets.end());
+ ASSERT_EQ("admin", request.dbname);
+
+ auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
+ ASSERT_EQ(cmdName, "abortTransaction");
+
+ checkSessionDetails(request.cmdObj, lsid, txnNum, target->second);
+
+ targets.erase(request.target);
+
+ // The middle response is NoSuchTransaction, the rest are success.
+ return (count == 1 ? kNoSuchTransactionResponse : kOkReadOnlyFalseResponse);
+ });
+ count++;
+ }
+
+ auto response = future.default_timed_get();
+ ASSERT_BSONOBJ_EQ(kNoSuchTransactionResponse, response);
+}
+
+TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError) {
+ LogicalSessionId lsid(makeLogicalSessionIdForTest());
+ TxnNumber txnNum{3};
+
+ auto opCtx = operationContext();
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNum);
+
+ RouterOperationContextSession scopedSession(opCtx);
+ auto txnRouter = TransactionRouter::get(opCtx);
+
+ txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter->setDefaultAtClusterTime(operationContext());
+ txnRouter->attachTxnFieldsIfNeeded(shard1, {});
+ txnRouter->attachTxnFieldsIfNeeded(shard2, {});
+ txnRouter->attachTxnFieldsIfNeeded(shard3, {});
+ txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter->processParticipantResponse(shard3, kOkReadOnlyFalseResponse);
+
+ auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); });
+
+ std::map<HostAndPort, boost::optional<bool>> targets = {
+ {hostAndPort1, true}, {hostAndPort2, {}}, {hostAndPort3, {}}};
+
+ int count = 0;
+ while (!targets.empty()) {
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) -> StatusWith<BSONObj> {
+ auto target = targets.find(request.target);
+ ASSERT(target != targets.end());
+ ASSERT_EQ("admin", request.dbname);
+
+ auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
+ ASSERT_EQ(cmdName, "abortTransaction");
+
+ checkSessionDetails(request.cmdObj, lsid, txnNum, target->second);
+
+ targets.erase(request.target);
+
+ // The middle response is a "network error", the rest are success. Use InternalError as
+ // the "network error" because the server will retry three times on actual network
+ // errors; this just skips the retries.
+ if (count == 1) {
+ return Status{ErrorCodes::InternalError, "dummy"};
+ }
+ return kOkReadOnlyFalseResponse;
+ });
+ count++;
+ }
+
+ ASSERT_THROWS_CODE(future.default_timed_get(), AssertionException, ErrorCodes::InternalError);
}
TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNewParticipants) {