diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2019-01-28 15:02:09 -0500 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2019-01-29 16:48:48 -0500 |
commit | 7380d6e9ad0a90f6c5758b52d166d4069dd5a502 (patch) | |
tree | 5bf2f22b518176d09bff84c0f5704450b3b9fa7b /src/mongo/s/transaction_router.cpp | |
parent | 9c83ead9257d0d41d7b7ec92adf441bf34b0c3a8 (diff) | |
download | mongo-7380d6e9ad0a90f6c5758b52d166d4069dd5a502.tar.gz |
SERVER-39124 Mongos should abort cleared participants between retries of transaction statements
Diffstat (limited to 'src/mongo/s/transaction_router.cpp')
-rw-r--r-- | src/mongo/s/transaction_router.cpp | 75 |
1 files changed, 64 insertions, 11 deletions
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 4283915b631..b6ad3722c77 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -330,13 +330,64 @@ TransactionRouter::Participant& TransactionRouter::_createParticipant(const Shar return resultPair.first->second; } -void TransactionRouter::_clearPendingParticipants() { - for (auto&& it = _participants.begin(); it != _participants.end();) { - auto participant = it++; - if (participant->second.stmtIdCreatedAt == _latestStmtId) { - _participants.erase(participant); +void TransactionRouter::_assertAbortStatusIsOkOrNoSuchTransaction( + const AsyncRequestsSender::Response& response) const { + auto shardResponse = uassertStatusOKWithContext( + std::move(response.swResponse), + str::stream() << "Failed to send abort to shard " << response.shardId + << " between retries of statement " + << _latestStmtId); + + auto status = getStatusFromCommandResult(shardResponse.data); + uassert(ErrorCodes::NoSuchTransaction, + str::stream() << _txnIdToString() << "Transaction aborted between retries of statement " + << _latestStmtId + << " due to error: " + << status + << " from shard: " + << response.shardId, + status.isOK() || status.code() == ErrorCodes::NoSuchTransaction); + + // abortTransaction is sent with no write concern, so there's no need to check for a write + // concern error. +} + +std::vector<ShardId> TransactionRouter::_getPendingParticipants() const { + std::vector<ShardId> pendingParticipants; + for (const auto& participant : _participants) { + if (participant.second.stmtIdCreatedAt == _latestStmtId) { + pendingParticipants.emplace_back(ShardId(participant.first)); } } + return pendingParticipants; +} + +void TransactionRouter::_clearPendingParticipants(OperationContext* opCtx) { + const auto pendingParticipants = _getPendingParticipants(); + + // Send abort to each pending participant. This resets their transaction state and guarantees no + // transactions will be left open if the retry does not re-target any of these shards. + std::vector<AsyncRequestsSender::Request> abortRequests; + for (const auto& participant : pendingParticipants) { + abortRequests.emplace_back(participant, BSON("abortTransaction" << 1)); + } + auto responses = gatherResponses(opCtx, + NamespaceString::kAdminDb, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + Shard::RetryPolicy::kIdempotent, + abortRequests); + + // Verify each abort succeeded or failed with NoSuchTransaction, which may happen if the + // transaction was already implicitly aborted on the shard. + for (const auto& response : responses) { + _assertAbortStatusIsOkOrNoSuchTransaction(response); + } + + // Remove each aborted participant from the participant list. Remove after sending abort, so + // they are not added back to the participant list by the transaction tracking inside the ARS. + for (const auto& participant : pendingParticipants) { + invariant(_participants.erase(participant)); + } // If there are no more participants, also clear the coordinator id because a new one must be // chosen by the retry. @@ -363,7 +414,9 @@ bool TransactionRouter::_canContinueOnStaleShardOrDbError(StringData cmdName) co return false; } -void TransactionRouter::onStaleShardOrDbError(StringData cmdName, const Status& errorStatus) { +void TransactionRouter::onStaleShardOrDbError(OperationContext* opCtx, + StringData cmdName, + const Status& errorStatus) { uassert(ErrorCodes::NoSuchTransaction, str::stream() << "Transaction " << _txnNumber << " was aborted on statement " << _latestStmtId @@ -376,10 +429,10 @@ void TransactionRouter::onStaleShardOrDbError(StringData cmdName, const Status& // Remove participants created during the current statement so they are sent the correct options // if they are targeted again by the retry. - _clearPendingParticipants(); + _clearPendingParticipants(opCtx); } -void TransactionRouter::onViewResolutionError(const NamespaceString& nss) { +void TransactionRouter::onViewResolutionError(OperationContext* opCtx, const NamespaceString& nss) { // The router can always retry on a view resolution error. LOG(0) << _txnIdToString() @@ -388,14 +441,14 @@ void TransactionRouter::onViewResolutionError(const NamespaceString& nss) { // Requests against views are always routed to the primary shard for its database, but the retry // on the resolved namespace does not have to re-target the primary, so pending participants // should be cleared. - _clearPendingParticipants(); + _clearPendingParticipants(opCtx); } bool TransactionRouter::_canContinueOnSnapshotError() const { return _atClusterTime && _atClusterTime->canChange(_latestStmtId); } -void TransactionRouter::onSnapshotError(const Status& errorStatus) { +void TransactionRouter::onSnapshotError(OperationContext* opCtx, const Status& errorStatus) { uassert(ErrorCodes::NoSuchTransaction, str::stream() << "Transaction " << _txnNumber << " was aborted on statement " << _latestStmtId @@ -410,7 +463,7 @@ void TransactionRouter::onSnapshotError(const Status& errorStatus) { // The transaction must be restarted on all participants because a new read timestamp will be // selected, so clear all pending participants. Snapshot errors are only retryable on the first // client statement, so all participants should be cleared, including the coordinator. - _clearPendingParticipants(); + _clearPendingParticipants(opCtx); invariant(_participants.empty()); invariant(!_coordinatorId); |