summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-09-18 13:53:27 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2018-09-20 18:39:54 -0400
commit2f58283213f8a80a37f78b6de2c527951306f2b5 (patch)
tree6caafc6b812908983bcfc648ad41542adf8a7b41
parent1b7b5200e158345d8818b00b039fccdccbed7aa2 (diff)
downloadmongo-2f58283213f8a80a37f78b6de2c527951306f2b5.tar.gz
SERVER-35707 Add helper for clearing participants, improve error messages, and update comments
-rw-r--r--src/mongo/s/commands/strategy.cpp11
-rw-r--r--src/mongo/s/transaction/at_cluster_time_util.cpp3
-rw-r--r--src/mongo/s/transaction/transaction_router.cpp74
-rw-r--r--src/mongo/s/transaction/transaction_router.h20
-rw-r--r--src/mongo/s/transaction/transaction_router_test.cpp24
5 files changed, 57 insertions, 75 deletions
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 0a36636b1fd..8e1f5785915 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -413,7 +413,8 @@ void runCommand(OperationContext* opCtx,
str::stream() << "Transaction " << opCtx->getTxnNumber()
<< " was aborted after "
<< kMaxNumStaleVersionRetries
- << " failed retries",
+ << " failed retries. The latest attempt failed with: "
+ << ex.toStatus(),
canRetry);
}
@@ -435,7 +436,8 @@ void runCommand(OperationContext* opCtx,
str::stream() << "Transaction " << opCtx->getTxnNumber()
<< " was aborted after "
<< kMaxNumStaleVersionRetries
- << " failed retries",
+ << " failed retries. The latest attempt failed with: "
+ << ex.toStatus(),
canRetry);
}
@@ -443,7 +445,7 @@ void runCommand(OperationContext* opCtx,
continue;
}
throw;
- } catch (const ExceptionForCat<ErrorCategory::SnapshotError>&) {
+ } catch (const ExceptionForCat<ErrorCategory::SnapshotError>& ex) {
// Simple retry on any type of snapshot error.
// Update transaction tracking state for a possible retry. Throws if the transaction
@@ -455,7 +457,8 @@ void runCommand(OperationContext* opCtx,
str::stream() << "Transaction " << opCtx->getTxnNumber()
<< " was aborted after "
<< kMaxNumStaleVersionRetries
- << " failed retries",
+ << " failed retries. The latest attempt failed with: "
+ << ex.toStatus(),
canRetry);
}
diff --git a/src/mongo/s/transaction/at_cluster_time_util.cpp b/src/mongo/s/transaction/at_cluster_time_util.cpp
index 63f54d5a054..db1ccee124d 100644
--- a/src/mongo/s/transaction/at_cluster_time_util.cpp
+++ b/src/mongo/s/transaction/at_cluster_time_util.cpp
@@ -70,6 +70,9 @@ BSONObj appendAtClusterTime(BSONObj cmdObj, LogicalTime atClusterTime) {
// Transactions will upconvert a read concern with afterClusterTime but no level to have
// level snapshot, so a command may have a read concern field with no level.
+ //
+ // TODO SERVER-37237: Once read concern handling has been consolidated on mongos, this
+ // assertion can probably be removed.
if (!readConcernBob.hasField(repl::ReadConcernArgs::kLevelFieldName)) {
readConcernBob.append(repl::ReadConcernArgs::kLevelFieldName,
kReadConcernLevelSnapshotName);
diff --git a/src/mongo/s/transaction/transaction_router.cpp b/src/mongo/s/transaction/transaction_router.cpp
index a9c592a5600..7689bd18167 100644
--- a/src/mongo/s/transaction/transaction_router.cpp
+++ b/src/mongo/s/transaction/transaction_router.cpp
@@ -125,6 +125,9 @@ BSONObj appendReadConcernForTxn(BSONObj cmd,
dassert(existingReadConcernArgs.initialize(cmd));
// There may be no read concern level if the user only specified afterClusterTime and the
// transaction provided the default level.
+ //
+ // TODO SERVER-37237: Once read concern handling has been consolidated on mongos, this
+ // assertion can probably be simplified or removed.
dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel() ||
!existingReadConcernArgs.hasLevel());
@@ -154,7 +157,8 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd,
// Commands that are idempotent in a transaction context and can be blindly retried in the middle of
// a transaction. Aggregate with $out is disallowed in a transaction, so aggregates must be read
-// operations.
+// operations. Note: aggregate and find do have the side-effect of creating cursors, but any
+// established during an unsuccessful attempt are best-effort killed.
const StringMap<int> alwaysRetryableCmds = {
{"aggregate", 1}, {"distinct", 1}, {"find", 1}, {"getMore", 1}, {"killCursors", 1}};
@@ -248,12 +252,9 @@ boost::optional<ShardId> TransactionRouter::getCoordinatorId() const {
}
TransactionRouter::Participant& TransactionRouter::getOrCreateParticipant(const ShardId& shard) {
- // Remove the shard from the abort list if it is present.
- _orphanedParticipants.erase(shard.toString());
-
auto iter = _participants.find(shard.toString());
if (iter != _participants.end()) {
- // TODO SERVER-36589: Once mongos aborts transactions by only sending abortTransaction to
+ // TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to
// shards that have been successfully contacted we should be able to add an invariant here
// to ensure the atClusterTime on the participant matches that on the transaction router.
return iter->second;
@@ -269,7 +270,7 @@ TransactionRouter::Participant& TransactionRouter::getOrCreateParticipant(const
// The transaction must have been started with a readConcern.
invariant(!_readConcernArgs.isEmpty());
- // TODO SERVER-36589: Once mongos aborts transactions by only sending abortTransaction to shards
+ // TODO SERVER-37223: Once mongos aborts transactions by only sending abortTransaction to shards
// that have been successfully contacted we should be able to add an invariant here to ensure
// that an atClusterTime has been chosen if the read concern level is snapshot.
@@ -287,6 +288,26 @@ const LogicalSessionId& TransactionRouter::getSessionId() const {
return _sessionId;
}
+void TransactionRouter::_clearPendingParticipants() {
+ for (auto&& it = _participants.begin(); it != _participants.end();) {
+ auto participant = it++;
+ if (participant->second.getStmtIdCreatedAt() == _latestStmtId) {
+ _participants.erase(participant);
+ }
+ }
+
+ // If there are no more participants, also clear the coordinator id because a new one must be
+ // chosen by the retry.
+ if (_participants.empty()) {
+ _coordinatorId.reset();
+ return;
+ }
+
+ // If participants were created by an earlier command, the coordinator must be one of them.
+ invariant(_coordinatorId);
+ invariant(_participants.count(*_coordinatorId) == 1);
+}
+
bool TransactionRouter::_canContinueOnStaleShardOrDbError(StringData cmdName) const {
// We can always retry on the first overall statement.
if (_latestStmtId == _firstStmtId) {
@@ -306,28 +327,9 @@ void TransactionRouter::onStaleShardOrDbError(StringData cmdName) {
"Transaction was aborted due to cluster data placement change",
_canContinueOnStaleShardOrDbError(cmdName));
- // Remove each participant created at the most recent statement id and add them to the orphaned
- // list because the retry attempt isn't guaranteed to retarget them. Participants created
- // earlier are already fixed in the participant list, so they should not be removed.
- for (auto&& it = _participants.begin(); it != _participants.end();) {
- auto participant = it++;
- if (participant->second.getStmtIdCreatedAt() == _latestStmtId) {
- _orphanedParticipants.try_emplace(participant->first);
- _participants.erase(participant);
- }
- }
-
- // If there are no more participants, also clear the coordinator id because a new one must be
- // chosen by the retry.
- if (_participants.empty()) {
- _coordinatorId.reset();
- return;
- }
-
- // If this is not the first command, the coordinator must have been chosen and successfully
- // contacted in an earlier command, and thus must not be in the orphaned list.
- invariant(_coordinatorId);
- invariant(_orphanedParticipants.count(*_coordinatorId) == 0);
+ // Remove participants created during the current statement so they are sent the correct options
+ // if they are targeted again by the retry.
+ _clearPendingParticipants();
}
bool TransactionRouter::_canContinueOnSnapshotError() const {
@@ -340,16 +342,12 @@ void TransactionRouter::onSnapshotError() {
"Transaction was aborted due to snapshot error on subsequent transaction statement",
_canContinueOnSnapshotError());
- // Add each participant to the orphaned list because the retry attempt isn't guaranteed to
- // re-target it.
- for (const auto& participant : _participants) {
- _orphanedParticipants.try_emplace(participant.first);
- }
-
- // New transactions must be started on each contacted participant since the retry will select a
- // new read timestamp.
- _participants.clear();
- _coordinatorId.reset();
+ // The transaction must be restarted on all participants because a new read timestamp will be
+ // selected, so clear all pending participants. Snapshot errors are only retryable on the first
+ // client statement, so all participants should be cleared, including the coordinator.
+ _clearPendingParticipants();
+ invariant(_participants.empty());
+ invariant(!_coordinatorId);
// Reset the global snapshot timestamp so the retry will select a new one.
_atClusterTime.reset();
diff --git a/src/mongo/s/transaction/transaction_router.h b/src/mongo/s/transaction/transaction_router.h
index a47a1f92db7..7be619cfcf3 100644
--- a/src/mongo/s/transaction/transaction_router.h
+++ b/src/mongo/s/transaction/transaction_router.h
@@ -143,8 +143,8 @@ public:
/**
* Resets the transaction state to allow for a retry attempt. This includes clearing all
- * participants and adding them to the orphaned list, clearing the coordinator, and resetting
- * the global read timestamp. Will throw if the transaction cannot be continued.
+ * participants, clearing the coordinator, and resetting the global read timestamp. Will throw
+ * if the transaction cannot be continued.
*/
void onSnapshotError();
@@ -179,10 +179,6 @@ public:
boost::optional<ShardId> getCoordinatorId() const;
- const StringMap<bool>& getOrphanedParticipants() const {
- return _orphanedParticipants;
- }
-
/**
* Commits the transaction. For transactions with multiple participants, this will initiate
* the two phase commit procedure.
@@ -228,6 +224,11 @@ private:
*/
bool _canContinueOnSnapshotError() const;
+ /**
+ * Removes all participants created during the current statement from the participant list.
+ */
+ void _clearPendingParticipants();
+
const LogicalSessionId _sessionId;
TxnNumber _txnNumber{kUninitializedTxnNumber};
@@ -237,13 +238,6 @@ private:
// Map of current participants of the current transaction.
StringMap<Participant> _participants;
- // Map of participants that have been sent startTransaction, but are not in the current
- // participant list.
- //
- // TODO SERVER-36589: Send abortTransaction to each shard in the orphaned list when committing
- // or aborting a transaction to avoid leaving open orphaned transactions.
- StringMap<bool> _orphanedParticipants;
-
// The id of coordinator participant, used to construct prepare requests.
boost::optional<ShardId> _coordinatorId;
diff --git a/src/mongo/s/transaction/transaction_router_test.cpp b/src/mongo/s/transaction/transaction_router_test.cpp
index f74f3ab0935..c076648cf05 100644
--- a/src/mongo/s/transaction/transaction_router_test.cpp
+++ b/src/mongo/s/transaction/transaction_router_test.cpp
@@ -887,7 +887,7 @@ TEST_F(TransactionRouterTest, CannotChangeAtClusterTimeWithoutSnapshotError) {
}
}
-TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) {
+TEST_F(TransactionRouterTest, SnapshotErrorsClearsAllParticipants) {
TxnNumber txnNum{3};
TransactionRouter txnRouter({});
@@ -911,15 +911,12 @@ TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) {
ASSERT(txnRouter.getCoordinatorId());
ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1);
- ASSERT(txnRouter.getOrphanedParticipants().empty());
-
// Simulate a snapshot error and an internal retry that only re-targets one of the original two
// shards.
txnRouter.onSnapshotError();
ASSERT_FALSE(txnRouter.getCoordinatorId());
- ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U);
{
auto& participant = txnRouter.getOrCreateParticipant(shard2);
@@ -928,11 +925,9 @@ TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) {
ASSERT_FALSE(participant.mustStartTransaction());
}
- // There is a new coordinator and shard1 is still in the orphaned list.
+ // There is a new coordinator.
ASSERT(txnRouter.getCoordinatorId());
ASSERT_EQ(*txnRouter.getCoordinatorId(), shard2);
- ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 1U);
- ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 1U);
{
// Shard1 has not started a transaction.
@@ -1018,14 +1013,11 @@ TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOn
ASSERT(txnRouter.getCoordinatorId());
ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1);
- ASSERT(txnRouter.getOrphanedParticipants().empty());
-
// Simulate stale error and internal retry that only re-targets one of the original shards.
txnRouter.onStaleShardOrDbError("find");
ASSERT_FALSE(txnRouter.getCoordinatorId());
- ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U);
{
auto& participant = txnRouter.getOrCreateParticipant(shard2);
@@ -1034,17 +1026,15 @@ TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOn
ASSERT_FALSE(participant.mustStartTransaction());
}
- // There is a new coordinator and shard1 is still in the orphaned list.
+ // There is a new coordinator.
ASSERT(txnRouter.getCoordinatorId());
ASSERT_EQ(*txnRouter.getCoordinatorId(), shard2);
- ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 1U);
- ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 1U);
// Shard1 has not started a transaction.
ASSERT(txnRouter.getOrCreateParticipant(shard1).mustStartTransaction());
}
-TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsAddedToOrphanedListOnStaleError) {
+TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsClearedOnStaleError) {
TxnNumber txnNum{3};
TransactionRouter txnRouter({});
@@ -1058,8 +1048,6 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsAddedToOrphanedListOnS
ASSERT(txnRouter.getCoordinatorId());
ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1);
- ASSERT(txnRouter.getOrphanedParticipants().empty());
-
// Start a subsequent statement that targets two new shards and encounters a stale error from at
// least one of them.
@@ -1071,10 +1059,6 @@ TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsAddedToOrphanedListOnS
txnRouter.onStaleShardOrDbError("find");
- // Only the two new shards are in the orphaned list.
- ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U);
- ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 0U);
-
// Shards 2 and 3 must start a transaction, but shard 1 must not.
ASSERT_FALSE(txnRouter.getOrCreateParticipant(shard1).mustStartTransaction());
ASSERT(txnRouter.getOrCreateParticipant(shard2).mustStartTransaction());