summaryrefslogtreecommitdiff
path: root/src/mongo/s/transaction_router.cpp
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2019-01-28 15:02:09 -0500
committerJack Mulrow <jack.mulrow@mongodb.com>2019-01-29 16:48:48 -0500
commit7380d6e9ad0a90f6c5758b52d166d4069dd5a502 (patch)
tree5bf2f22b518176d09bff84c0f5704450b3b9fa7b /src/mongo/s/transaction_router.cpp
parent9c83ead9257d0d41d7b7ec92adf441bf34b0c3a8 (diff)
downloadmongo-7380d6e9ad0a90f6c5758b52d166d4069dd5a502.tar.gz
SERVER-39124 Mongos should abort cleared participants between retries of transaction statements
Diffstat (limited to 'src/mongo/s/transaction_router.cpp')
-rw-r--r--src/mongo/s/transaction_router.cpp75
1 files changed, 64 insertions, 11 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 4283915b631..b6ad3722c77 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -330,13 +330,64 @@ TransactionRouter::Participant& TransactionRouter::_createParticipant(const Shar
return resultPair.first->second;
}
-void TransactionRouter::_clearPendingParticipants() {
- for (auto&& it = _participants.begin(); it != _participants.end();) {
- auto participant = it++;
- if (participant->second.stmtIdCreatedAt == _latestStmtId) {
- _participants.erase(participant);
+void TransactionRouter::_assertAbortStatusIsOkOrNoSuchTransaction(
+ const AsyncRequestsSender::Response& response) const {
+ auto shardResponse = uassertStatusOKWithContext(
+ std::move(response.swResponse),
+ str::stream() << "Failed to send abort to shard " << response.shardId
+ << " between retries of statement "
+ << _latestStmtId);
+
+ auto status = getStatusFromCommandResult(shardResponse.data);
+ uassert(ErrorCodes::NoSuchTransaction,
+ str::stream() << _txnIdToString() << "Transaction aborted between retries of statement "
+ << _latestStmtId
+ << " due to error: "
+ << status
+ << " from shard: "
+ << response.shardId,
+ status.isOK() || status.code() == ErrorCodes::NoSuchTransaction);
+
+ // abortTransaction is sent with no write concern, so there's no need to check for a write
+ // concern error.
+}
+
+std::vector<ShardId> TransactionRouter::_getPendingParticipants() const {
+ std::vector<ShardId> pendingParticipants;
+ for (const auto& participant : _participants) {
+ if (participant.second.stmtIdCreatedAt == _latestStmtId) {
+ pendingParticipants.emplace_back(ShardId(participant.first));
}
}
+ return pendingParticipants;
+}
+
+void TransactionRouter::_clearPendingParticipants(OperationContext* opCtx) {
+ const auto pendingParticipants = _getPendingParticipants();
+
+ // Send abort to each pending participant. This resets their transaction state and guarantees no
+ // transactions will be left open if the retry does not re-target any of these shards.
+ std::vector<AsyncRequestsSender::Request> abortRequests;
+ for (const auto& participant : pendingParticipants) {
+ abortRequests.emplace_back(participant, BSON("abortTransaction" << 1));
+ }
+ auto responses = gatherResponses(opCtx,
+ NamespaceString::kAdminDb,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ Shard::RetryPolicy::kIdempotent,
+ abortRequests);
+
+ // Verify each abort succeeded or failed with NoSuchTransaction, which may happen if the
+ // transaction was already implicitly aborted on the shard.
+ for (const auto& response : responses) {
+ _assertAbortStatusIsOkOrNoSuchTransaction(response);
+ }
+
+ // Remove each aborted participant from the participant list. Remove after sending abort, so
+ // they are not added back to the participant list by the transaction tracking inside the ARS.
+ for (const auto& participant : pendingParticipants) {
+ invariant(_participants.erase(participant));
+ }
// If there are no more participants, also clear the coordinator id because a new one must be
// chosen by the retry.
@@ -363,7 +414,9 @@ bool TransactionRouter::_canContinueOnStaleShardOrDbError(StringData cmdName) co
return false;
}
-void TransactionRouter::onStaleShardOrDbError(StringData cmdName, const Status& errorStatus) {
+void TransactionRouter::onStaleShardOrDbError(OperationContext* opCtx,
+ StringData cmdName,
+ const Status& errorStatus) {
uassert(ErrorCodes::NoSuchTransaction,
str::stream() << "Transaction " << _txnNumber << " was aborted on statement "
<< _latestStmtId
@@ -376,10 +429,10 @@ void TransactionRouter::onStaleShardOrDbError(StringData cmdName, const Status&
// Remove participants created during the current statement so they are sent the correct options
// if they are targeted again by the retry.
- _clearPendingParticipants();
+ _clearPendingParticipants(opCtx);
}
-void TransactionRouter::onViewResolutionError(const NamespaceString& nss) {
+void TransactionRouter::onViewResolutionError(OperationContext* opCtx, const NamespaceString& nss) {
// The router can always retry on a view resolution error.
LOG(0) << _txnIdToString()
@@ -388,14 +441,14 @@ void TransactionRouter::onViewResolutionError(const NamespaceString& nss) {
// Requests against views are always routed to the primary shard for its database, but the retry
// on the resolved namespace does not have to re-target the primary, so pending participants
// should be cleared.
- _clearPendingParticipants();
+ _clearPendingParticipants(opCtx);
}
bool TransactionRouter::_canContinueOnSnapshotError() const {
return _atClusterTime && _atClusterTime->canChange(_latestStmtId);
}
-void TransactionRouter::onSnapshotError(const Status& errorStatus) {
+void TransactionRouter::onSnapshotError(OperationContext* opCtx, const Status& errorStatus) {
uassert(ErrorCodes::NoSuchTransaction,
str::stream() << "Transaction " << _txnNumber << " was aborted on statement "
<< _latestStmtId
@@ -410,7 +463,7 @@ void TransactionRouter::onSnapshotError(const Status& errorStatus) {
// The transaction must be restarted on all participants because a new read timestamp will be
// selected, so clear all pending participants. Snapshot errors are only retryable on the first
// client statement, so all participants should be cleared, including the coordinator.
- _clearPendingParticipants();
+ _clearPendingParticipants(opCtx);
invariant(_participants.empty());
invariant(!_coordinatorId);