diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-09-18 13:53:27 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-09-20 18:39:54 -0400 |
commit | 2f58283213f8a80a37f78b6de2c527951306f2b5 (patch) | |
tree | 6caafc6b812908983bcfc648ad41542adf8a7b41 | |
parent | 1b7b5200e158345d8818b00b039fccdccbed7aa2 (diff) | |
download | mongo-2f58283213f8a80a37f78b6de2c527951306f2b5.tar.gz |
SERVER-35707 Add helper for clearing participants, improve error messages, and update comments
-rw-r--r-- | src/mongo/s/commands/strategy.cpp | 11 | ||||
-rw-r--r-- | src/mongo/s/transaction/at_cluster_time_util.cpp | 3 | ||||
-rw-r--r-- | src/mongo/s/transaction/transaction_router.cpp | 74 | ||||
-rw-r--r-- | src/mongo/s/transaction/transaction_router.h | 20 | ||||
-rw-r--r-- | src/mongo/s/transaction/transaction_router_test.cpp | 24 |
5 files changed, 57 insertions, 75 deletions
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 0a36636b1fd..8e1f5785915 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -413,7 +413,8 @@ void runCommand(OperationContext* opCtx, str::stream() << "Transaction " << opCtx->getTxnNumber() << " was aborted after " << kMaxNumStaleVersionRetries - << " failed retries", + << " failed retries. The latest attempt failed with: " + << ex.toStatus(), canRetry); } @@ -435,7 +436,8 @@ void runCommand(OperationContext* opCtx, str::stream() << "Transaction " << opCtx->getTxnNumber() << " was aborted after " << kMaxNumStaleVersionRetries - << " failed retries", + << " failed retries. The latest attempt failed with: " + << ex.toStatus(), canRetry); } @@ -443,7 +445,7 @@ void runCommand(OperationContext* opCtx, continue; } throw; - } catch (const ExceptionForCat<ErrorCategory::SnapshotError>&) { + } catch (const ExceptionForCat<ErrorCategory::SnapshotError>& ex) { // Simple retry on any type of snapshot error. // Update transaction tracking state for a possible retry. Throws if the transaction @@ -455,7 +457,8 @@ void runCommand(OperationContext* opCtx, str::stream() << "Transaction " << opCtx->getTxnNumber() << " was aborted after " << kMaxNumStaleVersionRetries - << " failed retries", + << " failed retries. The latest attempt failed with: " + << ex.toStatus(), canRetry); } diff --git a/src/mongo/s/transaction/at_cluster_time_util.cpp b/src/mongo/s/transaction/at_cluster_time_util.cpp index 63f54d5a054..db1ccee124d 100644 --- a/src/mongo/s/transaction/at_cluster_time_util.cpp +++ b/src/mongo/s/transaction/at_cluster_time_util.cpp @@ -70,6 +70,9 @@ BSONObj appendAtClusterTime(BSONObj cmdObj, LogicalTime atClusterTime) { // Transactions will upconvert a read concern with afterClusterTime but no level to have // level snapshot, so a command may have a read concern field with no level. + // + // TODO SERVER-37237: Once read concern handling has been consolidated on mongos, this + // assertion can probably be removed. if (!readConcernBob.hasField(repl::ReadConcernArgs::kLevelFieldName)) { readConcernBob.append(repl::ReadConcernArgs::kLevelFieldName, kReadConcernLevelSnapshotName); diff --git a/src/mongo/s/transaction/transaction_router.cpp b/src/mongo/s/transaction/transaction_router.cpp index a9c592a5600..7689bd18167 100644 --- a/src/mongo/s/transaction/transaction_router.cpp +++ b/src/mongo/s/transaction/transaction_router.cpp @@ -125,6 +125,9 @@ BSONObj appendReadConcernForTxn(BSONObj cmd, dassert(existingReadConcernArgs.initialize(cmd)); // There may be no read concern level if the user only specified afterClusterTime and the // transaction provided the default level. + // + // TODO SERVER-37237: Once read concern handling has been consolidated on mongos, this + // assertion can probably be simplified or removed. dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel() || !existingReadConcernArgs.hasLevel()); @@ -154,7 +157,8 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd, // Commands that are idempotent in a transaction context and can be blindly retried in the middle of // a transaction. Aggregate with $out is disallowed in a transaction, so aggregates must be read -// operations. +// operations. Note: aggregate and find do have the side-effect of creating cursors, but any +// established during an unsuccessful attempt are best-effort killed. const StringMap<int> alwaysRetryableCmds = { {"aggregate", 1}, {"distinct", 1}, {"find", 1}, {"getMore", 1}, {"killCursors", 1}}; @@ -248,12 +252,9 @@ boost::optional<ShardId> TransactionRouter::getCoordinatorId() const { } TransactionRouter::Participant& TransactionRouter::getOrCreateParticipant(const ShardId& shard) { - // Remove the shard from the abort list if it is present. - _orphanedParticipants.erase(shard.toString()); - auto iter = _participants.find(shard.toString()); if (iter != _participants.end()) { - // TODO SERVER-36589: Once mongos aborts transactions by only sending abortTransaction to + // TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to // shards that have been successfully contacted we should be able to add an invariant here // to ensure the atClusterTime on the participant matches that on the transaction router. return iter->second; @@ -269,7 +270,7 @@ TransactionRouter::Participant& TransactionRouter::getOrCreateParticipant(const // The transaction must have been started with a readConcern. invariant(!_readConcernArgs.isEmpty()); - // TODO SERVER-36589: Once mongos aborts transactions by only sending abortTransaction to shards + // TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to shards // that have been successfully contacted we should be able to add an invariant here to ensure // that an atClusterTime has been chosen if the read concern level is snapshot. @@ -287,6 +288,26 @@ const LogicalSessionId& TransactionRouter::getSessionId() const { return _sessionId; } +void TransactionRouter::_clearPendingParticipants() { + for (auto&& it = _participants.begin(); it != _participants.end();) { + auto participant = it++; + if (participant->second.getStmtIdCreatedAt() == _latestStmtId) { + _participants.erase(participant); + } + } + + // If there are no more participants, also clear the coordinator id because a new one must be + // chosen by the retry. + if (_participants.empty()) { + _coordinatorId.reset(); + return; + } + + // If participants were created by an earlier command, the coordinator must be one of them. + invariant(_coordinatorId); + invariant(_participants.count(*_coordinatorId) == 1); +} + bool TransactionRouter::_canContinueOnStaleShardOrDbError(StringData cmdName) const { // We can always retry on the first overall statement. if (_latestStmtId == _firstStmtId) { @@ -306,28 +327,9 @@ void TransactionRouter::onStaleShardOrDbError(StringData cmdName) { "Transaction was aborted due to cluster data placement change", _canContinueOnStaleShardOrDbError(cmdName)); - // Remove each participant created at the most recent statement id and add them to the orphaned - // list because the retry attempt isn't guaranteed to retarget them. Participants created - // earlier are already fixed in the participant list, so they should not be removed. - for (auto&& it = _participants.begin(); it != _participants.end();) { - auto participant = it++; - if (participant->second.getStmtIdCreatedAt() == _latestStmtId) { - _orphanedParticipants.try_emplace(participant->first); - _participants.erase(participant); - } - } - - // If there are no more participants, also clear the coordinator id because a new one must be - // chosen by the retry. - if (_participants.empty()) { - _coordinatorId.reset(); - return; - } - - // If this is not the first command, the coordinator must have been chosen and successfully - // contacted in an earlier command, and thus must not be in the orphaned list. - invariant(_coordinatorId); - invariant(_orphanedParticipants.count(*_coordinatorId) == 0); + // Remove participants created during the current statement so they are sent the correct options + // if they are targeted again by the retry. + _clearPendingParticipants(); } bool TransactionRouter::_canContinueOnSnapshotError() const { @@ -340,16 +342,12 @@ void TransactionRouter::onSnapshotError() { "Transaction was aborted due to snapshot error on subsequent transaction statement", _canContinueOnSnapshotError()); - // Add each participant to the orphaned list because the retry attempt isn't guaranteed to - // re-target it. - for (const auto& participant : _participants) { - _orphanedParticipants.try_emplace(participant.first); - } - - // New transactions must be started on each contacted participant since the retry will select a - // new read timestamp. - _participants.clear(); - _coordinatorId.reset(); + // The transaction must be restarted on all participants because a new read timestamp will be + // selected, so clear all pending participants. Snapshot errors are only retryable on the first + // client statement, so all participants should be cleared, including the coordinator. + _clearPendingParticipants(); + invariant(_participants.empty()); + invariant(!_coordinatorId); // Reset the global snapshot timestamp so the retry will select a new one. _atClusterTime.reset(); diff --git a/src/mongo/s/transaction/transaction_router.h b/src/mongo/s/transaction/transaction_router.h index a47a1f92db7..7be619cfcf3 100644 --- a/src/mongo/s/transaction/transaction_router.h +++ b/src/mongo/s/transaction/transaction_router.h @@ -143,8 +143,8 @@ public: /** * Resets the transaction state to allow for a retry attempt. This includes clearing all - * participants and adding them to the orphaned list, clearing the coordinator, and resetting - * the global read timestamp. Will throw if the transaction cannot be continued. + * participants, clearing the coordinator, and resetting the global read timestamp. Will throw + * if the transaction cannot be continued. */ void onSnapshotError(); @@ -179,10 +179,6 @@ public: boost::optional<ShardId> getCoordinatorId() const; - const StringMap<bool>& getOrphanedParticipants() const { - return _orphanedParticipants; - } - /** * Commits the transaction. For transactions with multiple participants, this will initiate * the two phase commit procedure. @@ -228,6 +224,11 @@ private: */ bool _canContinueOnSnapshotError() const; + /** + * Removes all participants created during the current statement from the participant list. + */ + void _clearPendingParticipants(); + const LogicalSessionId _sessionId; TxnNumber _txnNumber{kUninitializedTxnNumber}; @@ -237,13 +238,6 @@ private: // Map of current participants of the current transaction. StringMap<Participant> _participants; - // Map of participants that have been sent startTransaction, but are not in the current - // participant list. - // - // TODO SERVER-36589: Send abortTransaction to each shard in the orphaned list when committing - // or aborting a transaction to avoid leaving open orphaned transactions. - StringMap<bool> _orphanedParticipants; - // The id of coordinator participant, used to construct prepare requests. boost::optional<ShardId> _coordinatorId; diff --git a/src/mongo/s/transaction/transaction_router_test.cpp b/src/mongo/s/transaction/transaction_router_test.cpp index f74f3ab0935..c076648cf05 100644 --- a/src/mongo/s/transaction/transaction_router_test.cpp +++ b/src/mongo/s/transaction/transaction_router_test.cpp @@ -887,7 +887,7 @@ TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) { } } -TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) { +TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) { TxnNumber txnNum{3}; TransactionRouter txnRouter({}); @@ -911,15 +911,12 @@ TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) { ASSERT(txnRouter.getCoordinatorId()); ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1); - ASSERT(txnRouter.getOrphanedParticipants().empty()); - // Simulate a snapshot error and an internal retry that only re-targets one of the original two // shards. txnRouter.onSnapshotError(); ASSERT_FALSE(txnRouter.getCoordinatorId()); - ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U); { auto& participant = txnRouter.getOrCreateParticipant(shard2); @@ -928,11 +925,9 @@ TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) { ASSERT_FALSE(participant.mustStartTransaction()); } - // There is a new coordinator and shard1 is still in the orphaned list. + // There is a new coordinator. ASSERT(txnRouter.getCoordinatorId()); ASSERT_EQ(*txnRouter.getCoordinatorId(), shard2); - ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 1U); - ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 1U); { // Shard1 has not started a transaction. @@ -1018,14 +1013,11 @@ TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOn ASSERT(txnRouter.getCoordinatorId()); ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1); - ASSERT(txnRouter.getOrphanedParticipants().empty()); - // Simulate stale error and internal retry that only re-targets one of the original shards. txnRouter.onStaleShardOrDbError("find"); ASSERT_FALSE(txnRouter.getCoordinatorId()); - ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U); { auto& participant = txnRouter.getOrCreateParticipant(shard2); @@ -1034,17 +1026,15 @@ TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOn ASSERT_FALSE(participant.mustStartTransaction()); } - // There is a new coordinator and shard1 is still in the orphaned list. + // There is a new coordinator. ASSERT(txnRouter.getCoordinatorId()); ASSERT_EQ(*txnRouter.getCoordinatorId(), shard2); - ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 1U); - ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 1U); // Shard1 has not started a transaction. ASSERT(txnRouter.getOrCreateParticipant(shard1).mustStartTransaction()); } -TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsAddedToOrphanedListOnStaleError) { +TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsClearedOnStaleError) { TxnNumber txnNum{3}; TransactionRouter txnRouter({}); @@ -1058,8 +1048,6 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsAddedToOrphanedListOnS ASSERT(txnRouter.getCoordinatorId()); ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1); - ASSERT(txnRouter.getOrphanedParticipants().empty()); - // Start a subsequent statement that targets two new shards and encounters a stale error from at // least one of them. @@ -1071,10 +1059,6 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsAddedToOrphanedListOnS txnRouter.onStaleShardOrDbError("find"); - // Only the two new shards are in the orphaned list. - ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U); - ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 0U); - // Shards 2 and 3 must start a transaction, but shard 1 must not. ASSERT_FALSE(txnRouter.getOrCreateParticipant(shard1).mustStartTransaction()); ASSERT(txnRouter.getOrCreateParticipant(shard2).mustStartTransaction()); |