summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-09-05 19:35:51 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2018-09-19 13:20:05 -0400
commit3bab189695c705ff163721652add910b32c2659e (patch)
tree0c8a116ff0744c943ff5d7844395717fa7e81270
parent5f61edb2c73e569cb4b265ca8f453536e7b7b016 (diff)
downloadmongo-3bab189695c705ff163721652add910b32c2659e.tar.gz
SERVER-35707 Allow mongos to retry on re-targeting errors in a transaction
-rw-r--r--buildscripts/resmokeconfig/suites/sharded_core_txns.yml4
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml2
-rw-r--r--buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml2
-rw-r--r--buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j22
-rw-r--r--jstests/core/txns/concurrent_drops_and_creates.js4
-rw-r--r--jstests/core/txns/list_collections_not_blocked_by_txn.js8
-rw-r--r--jstests/core/txns/statement_ids_accepted.js129
-rw-r--r--jstests/noPassthrough/readConcern_snapshot_mongos.js4
-rw-r--r--jstests/sharding/libs/sharded_transactions_helpers.js10
-rw-r--r--jstests/sharding/snapshot_cursor_commands_mongos.js16
-rw-r--r--jstests/sharding/transactions_snapshot_errors_first_statement.js6
-rw-r--r--jstests/sharding/transactions_snapshot_errors_subsequent_statements.js6
-rw-r--r--jstests/sharding/transactions_stale_database_version_errors.js114
-rw-r--r--jstests/sharding/transactions_stale_shard_version_errors.js251
-rw-r--r--src/mongo/db/handle_request_response.cpp2
-rw-r--r--src/mongo/db/s/shard_filtering_metadata_refresh.cpp12
-rw-r--r--src/mongo/s/commands/strategy.cpp53
-rw-r--r--src/mongo/s/query/cluster_find.cpp8
-rw-r--r--src/mongo/s/transaction/append_at_cluster_time_test.cpp23
-rw-r--r--src/mongo/s/transaction/at_cluster_time_util.cpp34
-rw-r--r--src/mongo/s/transaction/transaction_router.cpp72
-rw-r--r--src/mongo/s/transaction/transaction_router.h44
-rw-r--r--src/mongo/s/transaction/transaction_router_test.cpp201
23 files changed, 859 insertions, 148 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml
index d0a027b2e0a..87f14bfbbe0 100644
--- a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml
+++ b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml
@@ -62,10 +62,8 @@ selector:
# TODO SERVER-35825: Mongos should only allow readConcern on writes that start transactions.
- jstests/core/txns/no_read_concern_snapshot_outside_txn.js
- # TODO SERVER-35707: Figure out transaction abort state on re-targeting exceptions.
+ # TODO SERVER-37209: Allow mongos to retry on view errors.
- jstests/core/txns/view_reads_in_transaction.js
- - jstests/core/txns/list_collections_not_blocked_by_txn.js
- - jstests/core/txns/statement_ids_accepted.js
# Uses hangAfterCollectionInserts failpoint not available on mongos.
- jstests/core/txns/speculative_snapshot_includes_all_writes.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
index 154680cef50..8bb47be31a5 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml
@@ -44,6 +44,8 @@ selector:
- jstests/sharding/snapshot_cursor_commands_mongos.js
- jstests/sharding/transactions_snapshot_errors_first_statement.js
- jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
+ - jstests/sharding/transactions_stale_database_version_errors.js
+ - jstests/sharding/transactions_stale_shard_version_errors.js
- jstests/sharding/txn_basic_two_phase_commit.js
- jstests/sharding/txn_coordinator_commands_basic_requirements.js
- jstests/sharding/update_sharded.js
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
index fc814445595..9a02d418dc4 100644
--- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
+++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml
@@ -359,6 +359,8 @@ selector:
- jstests/sharding/snapshot_cursor_commands_mongos.js
- jstests/sharding/transactions_snapshot_errors_first_statement.js
- jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
+ - jstests/sharding/transactions_stale_database_version_errors.js
+ - jstests/sharding/transactions_stale_shard_version_errors.js
- jstests/sharding/txn_basic_two_phase_commit.js
- jstests/sharding/txn_coordinator_commands_basic_requirements.js
- jstests/sharding/update_sharded.js
diff --git a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
index 1d34a2a3a89..6eb6ffdfb60 100644
--- a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
+++ b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2
@@ -57,6 +57,8 @@ selector:
- jstests/sharding/snapshot_cursor_commands_mongos.js
- jstests/sharding/transactions_snapshot_errors_first_statement.js
- jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
+ - jstests/sharding/transactions_stale_database_version_errors.js
+ - jstests/sharding/transactions_stale_shard_version_errors.js
- jstests/sharding/txn_basic_two_phase_commit.js
- jstests/sharding/txn_coordinator_commands_basic_requirements.js
- jstests/sharding/update_sharded.js
diff --git a/jstests/core/txns/concurrent_drops_and_creates.js b/jstests/core/txns/concurrent_drops_and_creates.js
index 5ddf7ca260b..8db02f1ae98 100644
--- a/jstests/core/txns/concurrent_drops_and_creates.js
+++ b/jstests/core/txns/concurrent_drops_and_creates.js
@@ -64,7 +64,9 @@
// since the transaction's read timestamp. Since our implementation of the in-memory collection
// catalog always has the most recent collection metadata, we do not allow you to read from a
// collection at a time prior to its most recent catalog changes.
- assert.commandFailedWithCode(sessionCollB.insert({}), ErrorCodes.SnapshotUnavailable);
+ const isMongos = assert.commandWorked(db.runCommand("ismaster")).msg === "isdbgrid";
+ const expectedCode = isMongos ? ErrorCodes.NoSuchTransaction : ErrorCodes.SnapshotUnavailable;
+ assert.commandFailedWithCode(sessionCollB.insert({}), expectedCode);
assert.commandFailedWithCode(session.abortTransaction_forTesting(),
ErrorCodes.NoSuchTransaction);
diff --git a/jstests/core/txns/list_collections_not_blocked_by_txn.js b/jstests/core/txns/list_collections_not_blocked_by_txn.js
index fc4637abd5d..33a6d9eb766 100644
--- a/jstests/core/txns/list_collections_not_blocked_by_txn.js
+++ b/jstests/core/txns/list_collections_not_blocked_by_txn.js
@@ -12,6 +12,14 @@
assert.commandWorked(mydb.createCollection("foo", {writeConcern: {w: "majority"}}));
session.startTransaction({readConcern: {level: "snapshot"}});
+
+ const isMongos = assert.commandWorked(db.runCommand("ismaster")).msg === "isdbgrid";
+ if (isMongos) {
+ // Force the shard to refresh its database version, because this requires a database
+ // exclusive lock, which will block behind the transaction.
+ sessionDb.foo.distinct("x");
+ }
+
assert.commandWorked(sessionDb.foo.insert({x: 1}));
for (let nameOnly of[false, true]) {
diff --git a/jstests/core/txns/statement_ids_accepted.js b/jstests/core/txns/statement_ids_accepted.js
index 360d42bc3f5..51795d984d9 100644
--- a/jstests/core/txns/statement_ids_accepted.js
+++ b/jstests/core/txns/statement_ids_accepted.js
@@ -162,38 +162,6 @@
autocommit: false
}));
- jsTestLog("Check that geoSearch accepts a statement ID");
- assert.writeOK(testColl.insert({geo: {type: "Point", coordinates: [0, 0]}, a: 0}),
- {writeConcern: {w: "majority"}});
- assert.writeOK(testColl.insert({geoh: {lat: 0, long: 0}, b: 0}),
- {writeConcern: {w: "majority"}});
- assert.commandWorked(sessionDb.runCommand({
- createIndexes: collName,
- indexes: [
- {name: "geo", key: {geo: "2dsphere"}},
- {name: "geoh", key: {geoh: "geoHaystack", b: 1}, bucketSize: 1}
- ],
- writeConcern: {w: "majority"}
- }));
- // Ensure the snapshot is available following the index creation.
- assert.soonNoExcept(function() {
- testColl.find({}, {readConcern: {level: "snapshot"}});
- return true;
- });
-
- jsTestLog("Check that geoSearch accepts a statement ID");
- assert.commandWorked(sessionDb.runCommand({
- geoSearch: collName,
- search: {b: 0},
- near: [0, 0],
- maxDistance: 1,
- readConcern: {level: "snapshot"},
- txnNumber: NumberLong(txnNumber++),
- stmtId: NumberInt(0),
- startTransaction: true,
- autocommit: false
- }));
-
jsTestLog("Check that insert accepts a statement ID");
assert.commandWorked(sessionDb.runCommand({
insert: collName,
@@ -219,36 +187,73 @@
stmtId: NumberInt(0)
}));
- jsTestLog("Check that prepareTransaction accepts a statement ID");
- assert.commandWorked(sessionDb.runCommand({
- insert: collName,
- documents: [{_id: "doc2"}],
- readConcern: {level: "snapshot"},
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(0),
- startTransaction: true,
- autocommit: false
- }));
- // prepareTransaction can only be run on the admin database.
- assert.commandWorked(sessionDb.adminCommand({
- prepareTransaction: 1,
- txnNumber: NumberLong(txnNumber),
- stmtId: NumberInt(1),
- autocommit: false
- }));
- assert.commandWorked(sessionDb.adminCommand({
- abortTransaction: 1,
- txnNumber: NumberLong(txnNumber++),
- stmtId: NumberInt(2),
- autocommit: false
- }));
- assert.commandFailedWithCode(sessionDb.runCommand({
- prepareTransaction: 1,
- txnNumber: NumberLong(txnNumber++),
- stmtId: NumberInt(0),
- autocommit: false
- }),
- ErrorCodes.Unauthorized);
+ const isMongos = assert.commandWorked(db.runCommand("ismaster")).msg === "isdbgrid";
+ if (!isMongos) {
+ // Skip commands that do not exist on mongos.
+
+ jsTestLog("Check that geoSearch accepts a statement ID");
+ assert.writeOK(testColl.insert({geo: {type: "Point", coordinates: [0, 0]}, a: 0}),
+ {writeConcern: {w: "majority"}});
+ assert.writeOK(testColl.insert({geoh: {lat: 0, long: 0}, b: 0}),
+ {writeConcern: {w: "majority"}});
+ assert.commandWorked(sessionDb.runCommand({
+ createIndexes: collName,
+ indexes: [
+ {name: "geo", key: {geo: "2dsphere"}},
+ {name: "geoh", key: {geoh: "geoHaystack", b: 1}, bucketSize: 1}
+ ],
+ writeConcern: {w: "majority"}
+ }));
+ // Ensure the snapshot is available following the index creation.
+ assert.soonNoExcept(function() {
+ testColl.find({}, {readConcern: {level: "snapshot"}});
+ return true;
+ });
+
+ jsTestLog("Check that geoSearch accepts a statement ID");
+ assert.commandWorked(sessionDb.runCommand({
+ geoSearch: collName,
+ search: {b: 0},
+ near: [0, 0],
+ maxDistance: 1,
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(txnNumber++),
+ stmtId: NumberInt(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+
+ jsTestLog("Check that prepareTransaction accepts a statement ID");
+ assert.commandWorked(sessionDb.runCommand({
+ insert: collName,
+ documents: [{_id: "doc2"}],
+ readConcern: {level: "snapshot"},
+ txnNumber: NumberLong(txnNumber),
+ stmtId: NumberInt(0),
+ startTransaction: true,
+ autocommit: false
+ }));
+ // prepareTransaction can only be run on the admin database.
+ assert.commandWorked(sessionDb.adminCommand({
+ prepareTransaction: 1,
+ txnNumber: NumberLong(txnNumber),
+ stmtId: NumberInt(1),
+ autocommit: false
+ }));
+ assert.commandWorked(sessionDb.adminCommand({
+ abortTransaction: 1,
+ txnNumber: NumberLong(txnNumber++),
+ stmtId: NumberInt(2),
+ autocommit: false
+ }));
+ assert.commandFailedWithCode(sessionDb.runCommand({
+ prepareTransaction: 1,
+ txnNumber: NumberLong(txnNumber++),
+ stmtId: NumberInt(0),
+ autocommit: false
+ }),
+ ErrorCodes.Unauthorized);
+ }
// refreshLogicalSessionCacheNow is intentionally omitted.
diff --git a/jstests/noPassthrough/readConcern_snapshot_mongos.js b/jstests/noPassthrough/readConcern_snapshot_mongos.js
index 2fb7e4387df..db11ca7d3ef 100644
--- a/jstests/noPassthrough/readConcern_snapshot_mongos.js
+++ b/jstests/noPassthrough/readConcern_snapshot_mongos.js
@@ -29,10 +29,6 @@
// Insert data to create the collection.
assert.writeOK(testDB[collName].insert({x: 1}));
- // TODO SERVER-35707: Re-targeting errors abort transactions. Run distinct on the unsharded
- // collection to force the shard and mongos to refresh their DB versions.
- assert.commandWorked(testDB.runCommand({distinct: collName, key: "x"}));
-
// noPassthrough tests
// readConcern 'snapshot' is not allowed outside session context.
diff --git a/jstests/sharding/libs/sharded_transactions_helpers.js b/jstests/sharding/libs/sharded_transactions_helpers.js
index 7876e2934b9..422bdac250d 100644
--- a/jstests/sharding/libs/sharded_transactions_helpers.js
+++ b/jstests/sharding/libs/sharded_transactions_helpers.js
@@ -19,13 +19,3 @@ function unsetFailCommandOnEachShard(st, numShards) {
shardConn.adminCommand({configureFailPoint: "failCommand", mode: "off"}));
}
}
-
-// TODO SERVER-35707: Routing table cache updates are necessary until mongos is able to retry on
-// stale shard and database version errors.
-function flushShardRoutingTableUpdates(st, dbName, ns, numShards) {
- for (let i = 0; i < numShards; i++) {
- const shardConn = st["rs" + i].getPrimary();
- assert.commandWorked(shardConn.adminCommand({_flushDatabaseCacheUpdates: dbName}));
- assert.commandWorked(shardConn.adminCommand({_flushRoutingTableCacheUpdates: ns}));
- }
-}
diff --git a/jstests/sharding/snapshot_cursor_commands_mongos.js b/jstests/sharding/snapshot_cursor_commands_mongos.js
index 3a65e9c7f44..30cee7728d1 100644
--- a/jstests/sharding/snapshot_cursor_commands_mongos.js
+++ b/jstests/sharding/snapshot_cursor_commands_mongos.js
@@ -100,15 +100,6 @@
assert.eq(
1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard2.shardName}));
- // Routing table cache updates are necessary until mongos retargeting is fixed.
- // TODO: SERVER-35707
- assert.commandWorked(
- st.shard0.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns}));
- assert.commandWorked(
- st.shard1.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns}));
- assert.commandWorked(
- st.shard2.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns}));
-
return st;
}
},
@@ -144,13 +135,6 @@
assert.eq(
1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard2.shardName}));
- // Routing table cache updates are necessary until mongos retargeting is fixed.
- // TODO: SERVER-35707
- assert.commandWorked(
- st.shard1.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns}));
- assert.commandWorked(
- st.shard2.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns}));
-
return st;
}
},
diff --git a/jstests/sharding/transactions_snapshot_errors_first_statement.js b/jstests/sharding/transactions_snapshot_errors_first_statement.js
index 220426f475b..7e31480d520 100644
--- a/jstests/sharding/transactions_snapshot_errors_first_statement.js
+++ b/jstests/sharding/transactions_snapshot_errors_first_statement.js
@@ -110,7 +110,8 @@
setFailCommandOnShards(st, "alwaysOn", [commandName], errorCode, numShardsToError);
session.startTransaction({readConcern: {level: "snapshot"}});
- const res = assert.commandFailedWithCode(sessionDB.runCommand(commandBody), errorCode);
+ const res = assert.commandFailedWithCode(sessionDB.runCommand(commandBody),
+ ErrorCodes.NoSuchTransaction);
assert.eq(res.errorLabels, ["TransientTransactionError"]);
session.abortTransaction();
@@ -125,7 +126,6 @@
assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}}));
st.ensurePrimaryShard(dbName, st.shard0.shardName);
- flushShardRoutingTableUpdates(st, dbName, ns, 2);
for (let errorCode of kSnapshotErrors) {
runTest(st, collName, 1, errorCode, false);
@@ -144,7 +144,6 @@
assert.eq(2, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName}));
assert.eq(0, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName}));
- flushShardRoutingTableUpdates(st, dbName, ns, 2);
for (let errorCode of kSnapshotErrors) {
runTest(st, collName, 1, errorCode, false);
@@ -156,7 +155,6 @@
st.s.adminCommand({moveChunk: ns, find: {_id: 15}, to: st.shard1.shardName}));
assert.eq(1, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName}));
assert.eq(1, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName}));
- flushShardRoutingTableUpdates(st, dbName, ns, 2);
for (let errorCode of kSnapshotErrors) {
runTest(st, collName, 2, errorCode, true);
diff --git a/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js b/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
index 96b74bc735a..f346a8eea48 100644
--- a/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
+++ b/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js
@@ -55,7 +55,8 @@
// Verify the command must fail on a snapshot error from a subsequent statement.
setFailCommandOnShards(st, {times: 1}, [commandName], errorCode, 1);
- const res = assert.commandFailedWithCode(sessionDB.runCommand(commandBody), errorCode);
+ const res = assert.commandFailedWithCode(sessionDB.runCommand(commandBody),
+ ErrorCodes.NoSuchTransaction);
assert.eq(res.errorLabels, ["TransientTransactionError"]);
session.abortTransaction();
@@ -68,7 +69,6 @@
assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}}));
st.ensurePrimaryShard(dbName, st.shard0.shardName);
- flushShardRoutingTableUpdates(st, dbName, ns, 2);
// Single shard case simulates the storage engine discarding an in-use snapshot.
for (let errorCode of kSnapshotErrors) {
@@ -88,7 +88,6 @@
assert.eq(2, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName}));
assert.eq(0, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName}));
- flushShardRoutingTableUpdates(st, dbName, ns, 2);
for (let errorCode of kSnapshotErrors) {
runTest(st, collName, errorCode);
@@ -100,7 +99,6 @@
st.s.adminCommand({moveChunk: ns, find: {_id: 15}, to: st.shard1.shardName}));
assert.eq(1, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName}));
assert.eq(1, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName}));
- flushShardRoutingTableUpdates(st, dbName, ns, 2);
// Multi shard case simulates adding a new participant that can no longer support the already
// chosen read timestamp.
diff --git a/jstests/sharding/transactions_stale_database_version_errors.js b/jstests/sharding/transactions_stale_database_version_errors.js
new file mode 100644
index 00000000000..02f1d37c113
--- /dev/null
+++ b/jstests/sharding/transactions_stale_database_version_errors.js
@@ -0,0 +1,114 @@
+// Tests mongos behavior on stale database version errors received in a transaction.
+//
+// @tags: [requires_sharding, uses_transactions]
+(function() {
+ "use strict";
+
+ const dbName = "test";
+ const collName = "foo";
+
+ const st = new ShardingTest({shards: 2, mongos: 1, config: 1});
+
+ // Set up two unsharded collections in different databases with shard0 as their primary.
+
+ assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 0}, {writeConcern: {w: "majority"}}));
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ st.ensurePrimaryShard(dbName, st.shard0.shardName);
+
+ const session = st.s.startSession();
+ const sessionDB = session.getDatabase(dbName);
+
+ //
+ // Stale database version on first overall command should succeed.
+ //
+
+ session.startTransaction();
+
+ // No database versioned requests have been sent to Shard0, so it is stale.
+ assert.commandWorked(sessionDB.runCommand({distinct: collName, key: "_id", query: {_id: 0}}));
+
+ // TODO SERVER-36304: Change this to commitTransaction once multi shard transactions can be
+ // committed through mongos.
+ session.abortTransaction();
+
+ //
+ // Stale database version on second command to a shard should fail.
+ //
+
+ st.ensurePrimaryShard(dbName, st.shard1.shardName);
+
+ session.startTransaction();
+
+ // Find is not database versioned so it will not trigger SDV or a refresh on Shard0.
+ assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 0}}));
+
+ // Distinct is database versioned, so it will trigger SDV. The router will retry and the retry
+ // will discover the transaction was aborted, because a previous statement had completed on
+ // Shard0.
+ let res = assert.commandFailedWithCode(
+ sessionDB.runCommand({distinct: collName, key: "_id", query: {_id: 0}}),
+ ErrorCodes.NoSuchTransaction);
+ assert.eq(res.errorLabels, ["TransientTransactionError"]);
+
+ session.abortTransaction();
+
+ //
+ // Stale database version on first command to a new shard should succeed.
+ //
+
+ // Create a new database on Shard0.
+ const otherDbName = "other_test";
+ const otherCollName = "bar";
+
+ assert.writeOK(
+ st.s.getDB(otherDbName)[otherCollName].insert({_id: 0}, {writeConcern: {w: "majority"}}));
+ assert.commandWorked(st.s.adminCommand({enableSharding: otherDbName}));
+ st.ensurePrimaryShard(otherDbName, st.shard0.shardName);
+
+ const sessionOtherDB = session.getDatabase(otherDbName);
+
+ // Advance the router's cached last committed opTime for Shard0, so it chooses a read timestamp
+ // after the collection is created on shard1, to avoid SnapshotUnavailable.
+ assert.commandWorked(
+ sessionOtherDB.runCommand({find: otherCollName})); // Not database versioned.
+ assert.writeOK(sessionDB[collName].insert({_id: 1}, {writeConcern: {w: "majority"}}));
+
+ session.startTransaction();
+
+ // Target the first database which is on Shard1.
+ assert.commandWorked(sessionDB.runCommand({distinct: collName, key: "_id", query: {_id: 0}}));
+
+ // Targets the new database on Shard0 which is stale, so a database versioned request should
+ // trigger SDV.
+ assert.commandWorked(
+ sessionOtherDB.runCommand({distinct: otherCollName, key: "_id", query: {_id: 0}}));
+
+ // TODO SERVER-36304: Change this to commitTransaction.
+ session.abortTransaction();
+
+ //
+ // NoSuchTransaction should be returned if the router exhausts its retries.
+ //
+
+ st.ensurePrimaryShard(dbName, st.shard0.shardName);
+
+ // Disable database metadata refreshes on the stale shard so it will indefinitely return a stale
+ // version error.
+ assert.commandWorked(st.rs0.getPrimary().adminCommand(
+ {configureFailPoint: "skipDatabaseVersionMetadataRefresh", mode: "alwaysOn"}));
+
+ session.startTransaction();
+
+ // Target the first database which is on Shard0. The shard is stale and won't refresh its
+ // metadata, so mongos should exhaust its retries and implicitly abort the transaction.
+ assert.commandFailedWithCode(
+ sessionDB.runCommand({distinct: collName, key: "_id", query: {_id: 0}}),
+ ErrorCodes.NoSuchTransaction);
+
+ session.abortTransaction();
+
+ assert.commandWorked(st.rs0.getPrimary().adminCommand(
+ {configureFailPoint: "skipDatabaseVersionMetadataRefresh", mode: "off"}));
+
+ st.stop();
+})();
diff --git a/jstests/sharding/transactions_stale_shard_version_errors.js b/jstests/sharding/transactions_stale_shard_version_errors.js
new file mode 100644
index 00000000000..4d9b7f7edf3
--- /dev/null
+++ b/jstests/sharding/transactions_stale_shard_version_errors.js
@@ -0,0 +1,251 @@
+// Tests mongos behavior on stale shard version errors received in a transaction.
+//
+// @tags: [requires_sharding, uses_transactions]
+(function() {
+ "use strict";
+
+ function expectChunks(st, ns, chunks) {
+ for (let i = 0; i < chunks.length; i++) {
+ assert.eq(chunks[i],
+ st.s.getDB("config").chunks.count({ns: ns, shard: st["shard" + i].shardName}),
+ "unexpected number of chunks on shard " + i);
+ }
+ }
+
+ const dbName = "test";
+ const collName = "foo";
+ const ns = dbName + '.' + collName;
+
+ const st = new ShardingTest({shards: 3, mongos: 2, config: 1});
+
+ // Disable the best-effort recipient metadata refresh after migrations to simplify simulating
+ // stale shard version errors.
+ assert.commandWorked(st.rs0.getPrimary().adminCommand(
+ {configureFailPoint: "doNotRefreshRecipientAfterCommit", mode: "alwaysOn"}));
+ assert.commandWorked(st.rs1.getPrimary().adminCommand(
+ {configureFailPoint: "doNotRefreshRecipientAfterCommit", mode: "alwaysOn"}));
+ assert.commandWorked(st.rs2.getPrimary().adminCommand(
+ {configureFailPoint: "doNotRefreshRecipientAfterCommit", mode: "alwaysOn"}));
+
+ // Shard two collections in the same database, each with 2 chunks, [minKey, 0), [0, maxKey),
+ // with one document each, all on Shard0.
+
+ assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: -5}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}}));
+
+ assert.commandWorked(st.s.adminCommand({enableSharding: dbName}));
+ st.ensurePrimaryShard(dbName, st.shard0.shardName);
+
+ assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}}));
+ assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}}));
+
+ expectChunks(st, ns, [2, 0, 0]);
+
+ const otherCollName = "bar";
+ const otherNs = dbName + "." + otherCollName;
+
+ assert.writeOK(
+ st.s.getDB(dbName)[otherCollName].insert({_id: -5}, {writeConcern: {w: "majority"}}));
+ assert.writeOK(
+ st.s.getDB(dbName)[otherCollName].insert({_id: 5}, {writeConcern: {w: "majority"}}));
+
+ assert.commandWorked(st.s.adminCommand({shardCollection: otherNs, key: {_id: 1}}));
+ assert.commandWorked(st.s.adminCommand({split: otherNs, middle: {_id: 0}}));
+
+ expectChunks(st, otherNs, [2, 0, 0]);
+
+ const session = st.s.startSession();
+ const sessionDB = session.getDatabase(dbName);
+
+ //
+ // Stale shard version on first overall command should succeed.
+ //
+
+ // Move a chunk in the first collection from Shard0 to Shard1 through the main mongos, so Shard1
+ // is stale but not the router.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName}));
+ expectChunks(st, ns, [1, 1, 0]);
+
+ session.startTransaction();
+
+ // Targets Shard1, which is stale.
+ assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 5}}));
+
+ // TODO SERVER-36304: Change this to commitTransaction once multi shard transactions can be
+ // committed through mongos.
+ session.abortTransaction();
+
+ //
+ // Stale shard version on second command to a shard should fail.
+ //
+
+ expectChunks(st, ns, [1, 1, 0]);
+
+ // Move a chunk in the other collection from Shard0 to Shard1 through the main mongos, so Shard1
+ // is stale for the other collection but not the router.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: otherNs, find: {_id: 5}, to: st.shard1.shardName}));
+ expectChunks(st, otherNs, [1, 1, 0]);
+
+ session.startTransaction();
+
+ // Targets Shard1 for the first ns, which is not stale.
+ assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 5}}));
+
+ // Targets the other sharded collection on Shard1, which is stale. Because a previous statement
+ // has executed on Shard1, the retry will not restart the transaction, and will fail when it
+ // finds the transaction has aborted because of the stale shard version.
+ let res =
+ assert.commandFailedWithCode(sessionDB.runCommand({find: otherCollName, filter: {_id: 5}}),
+ ErrorCodes.NoSuchTransaction);
+ assert.eq(res.errorLabels, ["TransientTransactionError"]);
+
+ session.abortTransaction();
+
+ //
+ // Stale shard version on first command to a new shard should succeed.
+ //
+
+ expectChunks(st, ns, [1, 1, 0]);
+
+ // Move a chunk for the other collection from Shard1 to Shard0 through the main mongos, so
+ // Shard0 is stale for it and the router is not.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: otherNs, find: {_id: 5}, to: st.shard0.shardName}));
+ expectChunks(st, otherNs, [2, 0, 0]);
+
+ session.startTransaction();
+
+ // Targets Shard1 for the first ns, which is not stale.
+ assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 5}}));
+
+ // Targets Shard0 for the other ns, which is stale.
+ assert.commandWorked(sessionDB.runCommand({find: otherCollName, filter: {_id: 5}}));
+
+ // TODO SERVER-36304: Change this to commitTransaction.
+ session.abortTransaction();
+
+ //
+ // Stale mongos aborts on old shard.
+ //
+
+ // Move a chunk in the first collection from Shard1 to Shard0 through the other mongos, so
+ // Shard1 and the main mongos are stale for it.
+ const otherMongos = st.s1;
+ assert.commandWorked(
+ otherMongos.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard0.shardName}));
+ expectChunks(st, ns, [2, 0, 0]);
+
+ session.startTransaction();
+
+ // Targets Shard1, which hits a stale version error, then re-targets Shard0, which is also
+ // stale but should succeed.
+ assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 5}}));
+
+ // TODO SERVER-36304: Change this to commitTransaction.
+ session.abortTransaction();
+
+ // Verify there is no in-progress transaction on Shard1.
+ res = assert.commandFailedWithCode(st.rs1.getPrimary().getDB(dbName).runCommand({
+ find: collName,
+ lsid: session.getSessionId(),
+ txnNumber: NumberLong(session.getTxnNumber_forTesting()),
+ autocommit: false,
+ }),
+ ErrorCodes.NoSuchTransaction);
+ assert.eq(res.errorLabels, ["TransientTransactionError"]);
+
+ //
+ // More than one stale shard version error.
+ //
+
+ // Move chunks for the first ns from Shard0 to Shard1 and Shard2 through the main mongos, so
+ // both are stale but not the router.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName}));
+ expectChunks(st, ns, [1, 0, 1]);
+
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: -5}, to: st.shard1.shardName}));
+ expectChunks(st, ns, [0, 1, 1]);
+
+ session.startTransaction();
+
+ // Targets all shards, two of which are stale.
+ assert.commandWorked(sessionDB.runCommand({find: collName}));
+
+ // TODO SERVER-36304: Change this to commitTransaction.
+ session.abortTransaction();
+
+ //
+ // Can retry a stale write on the first statement.
+ //
+
+ // Move a chunk to Shard1 to make it stale.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName}));
+ expectChunks(st, ns, [0, 2, 0]);
+
+ session.startTransaction();
+
+ // Targets Shard1, which is stale.
+ assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [{_id: 6}]}));
+
+ // TODO SERVER-36304: Change this to commitTransaction.
+ session.abortTransaction();
+
+ //
+ // Cannot retry a stale write past the first statement.
+ //
+ // TODO SERVER-37207: Change batch writes to retry only the failed writes in a batch, to allow
+ // retrying writes beyond the first overall statement.
+ //
+
+ // Move a chunk to Shard2 to make it stale.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName}));
+ expectChunks(st, ns, [0, 1, 1]);
+
+ session.startTransaction();
+
+ // Targets Shard1, which is not stale.
+ assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [{_id: -4}]}));
+
+ // Targets Shard2, which is stale.
+ res = assert.commandFailedWithCode(
+ sessionDB.runCommand({insert: collName, documents: [{_id: 7}]}),
+ ErrorCodes.NoSuchTransaction);
+ assert.eq(res.errorLabels, ["TransientTransactionError"]);
+
+ // TODO SERVER-36304: Change this to commitTransaction.
+ session.abortTransaction();
+
+ //
+ // NoSuchTransaction should be returned if the router exhausts its retries.
+ //
+
+ // Move a chunk to Shard0 to make it stale.
+ assert.commandWorked(
+ st.s.adminCommand({moveChunk: ns, find: {_id: -5}, to: st.shard0.shardName}));
+ expectChunks(st, ns, [1, 0, 1]);
+
+ // Disable metadata refreshes on the stale shard so it will indefinitely return a stale version
+ // error.
+ assert.commandWorked(st.rs0.getPrimary().adminCommand(
+ {configureFailPoint: "skipShardFilteringMetadataRefresh", mode: "alwaysOn"}));
+
+ session.startTransaction();
+
+ // Targets Shard0, which is stale and won't refresh its metadata, so mongos should exhaust its
+ // retries and implicitly abort the transaction.
+ assert.commandFailedWithCode(sessionDB.runCommand({find: collName, filter: {_id: -5}}),
+ ErrorCodes.NoSuchTransaction);
+
+ session.abortTransaction();
+
+ assert.commandWorked(st.rs0.getPrimary().adminCommand(
+ {configureFailPoint: "skipShardFilteringMetadataRefresh", mode: "off"}));
+
+ st.stop();
+})();
diff --git a/src/mongo/db/handle_request_response.cpp b/src/mongo/db/handle_request_response.cpp
index c5b27c6a106..cddf7ee8145 100644
--- a/src/mongo/db/handle_request_response.cpp
+++ b/src/mongo/db/handle_request_response.cpp
@@ -40,9 +40,7 @@ BSONObj getErrorLabels(const boost::optional<OperationSessionInfoFromClient>& se
bool isRetryable = ErrorCodes::isNotMasterError(code) || ErrorCodes::isShutdownError(code);
bool isTransientTransactionError = code == ErrorCodes::WriteConflict //
- || code == ErrorCodes::SnapshotTooOld //
|| code == ErrorCodes::SnapshotUnavailable //
- || code == ErrorCodes::StaleChunkHistory //
|| code == ErrorCodes::NoSuchTransaction //
|| code == ErrorCodes::LockTimeout //
|| code == ErrorCodes::PreparedTransactionInProgress //
diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
index 3475891e19d..8b670aeb4bc 100644
--- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
+++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp
@@ -43,10 +43,14 @@
#include "mongo/db/s/sharding_statistics.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/grid.h"
+#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
namespace mongo {
+MONGO_FAIL_POINT_DEFINE(skipDatabaseVersionMetadataRefresh);
+MONGO_FAIL_POINT_DEFINE(skipShardFilteringMetadataRefresh);
+
namespace {
void onShardVersionMismatch(OperationContext* opCtx,
@@ -84,6 +88,10 @@ void onShardVersionMismatch(OperationContext* opCtx,
// smaller than the one we know about. This means that the remote side is behind.
}
+ if (MONGO_FAIL_POINT(skipShardFilteringMetadataRefresh)) {
+ return;
+ }
+
forceShardFilteringMetadataRefresh(opCtx, nss, forceRefreshFromThisThread);
}
@@ -107,6 +115,10 @@ void onDbVersionMismatch(OperationContext* opCtx,
// StaleDatabaseVersion retry attempts while the movePrimary is being committed.
OperationShardingState::get(opCtx).waitForMovePrimaryCriticalSectionSignal(opCtx);
+ if (MONGO_FAIL_POINT(skipDatabaseVersionMetadataRefresh)) {
+ return;
+ }
+
forceDatabaseRefresh(opCtx, dbName);
}
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 166bd401b1d..0a36636b1fd 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -404,6 +404,19 @@ void runCommand(OperationContext* opCtx,
Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs);
+ // Update transaction tracking state for a possible retry. Throws if the transaction
+ // cannot continue.
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->onStaleShardOrDbError(commandName);
+ // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws.
+ uassert(ErrorCodes::NoSuchTransaction,
+ str::stream() << "Transaction " << opCtx->getTxnNumber()
+ << " was aborted after "
+ << kMaxNumStaleVersionRetries
+ << " failed retries",
+ canRetry);
+ }
+
if (canRetry) {
continue;
}
@@ -412,25 +425,41 @@ void runCommand(OperationContext* opCtx,
// Mark database entry in cache as stale.
Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(ex->getDb(),
ex->getVersionReceived());
+
+ // Update transaction tracking state for a possible retry. Throws if the transaction
+ // cannot continue.
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->onStaleShardOrDbError(commandName);
+ // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws.
+ uassert(ErrorCodes::NoSuchTransaction,
+ str::stream() << "Transaction " << opCtx->getTxnNumber()
+ << " was aborted after "
+ << kMaxNumStaleVersionRetries
+ << " failed retries",
+ canRetry);
+ }
+
if (canRetry) {
continue;
}
throw;
- } catch (ExceptionForCat<ErrorCategory::SnapshotError>& ex) {
+ } catch (const ExceptionForCat<ErrorCategory::SnapshotError>&) {
// Simple retry on any type of snapshot error.
- if (canRetry) {
- auto txnRouter = TransactionRouter::get(opCtx);
- invariant(txnRouter);
- if (txnRouter->canContinueOnSnapshotError()) {
- txnRouter->onSnapshotError();
- } else {
- // TODO SERVER-36589: Abort the entire transaction.
- ex.addContext(
- "Encountered snapshot error on subsequent transaction statement");
- throw;
- }
+ // Update transaction tracking state for a possible retry. Throws if the transaction
+ // cannot continue.
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->onSnapshotError();
+ // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws.
+ uassert(ErrorCodes::NoSuchTransaction,
+ str::stream() << "Transaction " << opCtx->getTxnNumber()
+ << " was aborted after "
+ << kMaxNumStaleVersionRetries
+ << " failed retries",
+ canRetry);
+ }
+ if (canRetry) {
continue;
}
throw;
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index ac827f7ae13..0f49c9a9f57 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -82,6 +82,8 @@ static const BSONObj kGeoNearDistanceMetaProjection = BSON("$meta"
// more than 8 decimal digits since the response is at most 16MB, and 16 * 1024 * 1024 < 1 * 10^8.
static const int kPerDocumentOverheadBytesUpperBound = 10;
+const char kFindCmdName[] = "find";
+
/**
* Given the QueryRequest 'qr' being executed by mongos, returns a copy of the query which is
* suitable for forwarding to the targeted hosts.
@@ -450,6 +452,12 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
<< " on attempt " << retries << " of " << kMaxRetries << ": " << redact(ex);
catalogCache->onStaleShardVersion(std::move(routingInfo));
+
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ // A transaction can always continue on a stale version error during find because
+ // the operation must be idempotent.
+ txnRouter->onStaleShardOrDbError(kFindCmdName);
+ }
}
}
diff --git a/src/mongo/s/transaction/append_at_cluster_time_test.cpp b/src/mongo/s/transaction/append_at_cluster_time_test.cpp
index 442fd513852..9422c3faf46 100644
--- a/src/mongo/s/transaction/append_at_cluster_time_test.cpp
+++ b/src/mongo/s/transaction/append_at_cluster_time_test.cpp
@@ -85,5 +85,28 @@ TEST(ClusterCommands, AddingAtClusterTimeOverwritesExistingAfterClusterTime) {
ASSERT_BSONOBJ_EQ(expectedCommand, newCommand);
}
+// Adding atClusterTime overwrites an existing afterClusterTime and will add level "snapshot" if it
+// is not there.
+TEST(ClusterCommands, AddingAtClusterTimeAddsLevelSnapshotIfNotThere) {
+ const auto existingAfterClusterTime = Timestamp(1, 1);
+ BSONObj command = BSON("aggregate"
+ << "testColl"
+ << "readConcern"
+ << BSON("afterClusterTime" << existingAfterClusterTime));
+
+ const auto computedAtClusterTime = Timestamp(2, 1);
+ BSONObj expectedCommand = BSON("aggregate"
+ << "testColl"
+ << "readConcern"
+ << BSON("level"
+ << "snapshot"
+ << "atClusterTime"
+ << computedAtClusterTime));
+
+ BSONObj newCommand =
+ at_cluster_time_util::appendAtClusterTime(command, LogicalTime(computedAtClusterTime));
+ ASSERT_BSONOBJ_EQ(expectedCommand, newCommand);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/transaction/at_cluster_time_util.cpp b/src/mongo/s/transaction/at_cluster_time_util.cpp
index 1264a785047..63f54d5a054 100644
--- a/src/mongo/s/transaction/at_cluster_time_util.cpp
+++ b/src/mongo/s/transaction/at_cluster_time_util.cpp
@@ -40,6 +40,20 @@ namespace mongo {
namespace at_cluster_time_util {
+namespace {
+const char kReadConcernLevelSnapshotName[] = "snapshot";
+
+LogicalTime _computeAtClusterTime(OperationContext* opCtx,
+ bool mustRunOnAll,
+ const std::set<ShardId>& shardIds,
+ const NamespaceString& nss,
+ const BSONObj query,
+ const BSONObj collation) {
+ // TODO: SERVER-31767
+ return LogicalClock::get(opCtx)->getClusterTime();
+}
+} // namespace
+
BSONObj appendAtClusterTime(BSONObj cmdObj, LogicalTime atClusterTime) {
BSONObjBuilder cmdAtClusterTimeBob;
for (auto el : cmdObj) {
@@ -54,6 +68,13 @@ BSONObj appendAtClusterTime(BSONObj cmdObj, LogicalTime atClusterTime) {
}
}
+ // Transactions will upconvert a read concern with afterClusterTime but no level to have
+ // level snapshot, so a command may have a read concern field with no level.
+ if (!readConcernBob.hasField(repl::ReadConcernArgs::kLevelFieldName)) {
+ readConcernBob.append(repl::ReadConcernArgs::kLevelFieldName,
+ kReadConcernLevelSnapshotName);
+ }
+
readConcernBob.append(repl::ReadConcernArgs::kAtClusterTimeFieldName,
atClusterTime.asTimestamp());
} else {
@@ -64,19 +85,6 @@ BSONObj appendAtClusterTime(BSONObj cmdObj, LogicalTime atClusterTime) {
return cmdAtClusterTimeBob.obj();
}
-namespace {
-
-LogicalTime _computeAtClusterTime(OperationContext* opCtx,
- bool mustRunOnAll,
- const std::set<ShardId>& shardIds,
- const NamespaceString& nss,
- const BSONObj query,
- const BSONObj collation) {
- // TODO: SERVER-31767
- return LogicalClock::get(opCtx)->getClusterTime();
-}
-}
-
boost::optional<LogicalTime> computeAtClusterTime(OperationContext* opCtx,
bool mustRunOnAll,
const std::set<ShardId>& shardIds,
diff --git a/src/mongo/s/transaction/transaction_router.cpp b/src/mongo/s/transaction/transaction_router.cpp
index 5359ffc6542..a9c592a5600 100644
--- a/src/mongo/s/transaction/transaction_router.cpp
+++ b/src/mongo/s/transaction/transaction_router.cpp
@@ -123,7 +123,11 @@ BSONObj appendReadConcernForTxn(BSONObj cmd,
if (cmd.hasField(repl::ReadConcernArgs::kReadConcernFieldName)) {
repl::ReadConcernArgs existingReadConcernArgs;
dassert(existingReadConcernArgs.initialize(cmd));
- dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel());
+ // There may be no read concern level if the user only specified afterClusterTime and the
+ // transaction provided the default level.
+ dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel() ||
+ !existingReadConcernArgs.hasLevel());
+
return atClusterTime
? at_cluster_time_util::appendAtClusterTime(std::move(cmd), *atClusterTime)
: cmd;
@@ -148,11 +152,20 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd,
return bob;
}
+// Commands that are idempotent in a transaction context and can be blindly retried in the middle of
+// a transaction. Aggregate with $out is disallowed in a transaction, so aggregates must be read
+// operations.
+const StringMap<int> alwaysRetryableCmds = {
+ {"aggregate", 1}, {"distinct", 1}, {"find", 1}, {"getMore", 1}, {"killCursors", 1}};
+
} // unnamed namespace
TransactionRouter::Participant::Participant(bool isCoordinator,
+ StmtId stmtIdCreatedAt,
SharedTransactionOptions sharedOptions)
- : _isCoordinator(isCoordinator), _sharedOptions(sharedOptions) {}
+ : _isCoordinator(isCoordinator),
+ _stmtIdCreatedAt(stmtIdCreatedAt),
+ _sharedOptions(sharedOptions) {}
BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(BSONObj cmd) {
auto isTxnCmd = isTransactionCommand(cmd); // check first before moving cmd.
@@ -202,6 +215,10 @@ void TransactionRouter::Participant::markAsCommandSent() {
}
}
+StmtId TransactionRouter::Participant::getStmtIdCreatedAt() const {
+ return _stmtIdCreatedAt;
+}
+
TransactionRouter* TransactionRouter::get(OperationContext* opCtx) {
auto& opCtxSession = getRouterSessionRuntimeState(opCtx);
if (!opCtxSession) {
@@ -260,6 +277,7 @@ TransactionRouter::Participant& TransactionRouter::getOrCreateParticipant(const
shard.toString(),
TransactionRouter::Participant(
isFirstParticipant,
+ _latestStmtId,
SharedTransactionOptions{_txnNumber, _readConcernArgs, _atClusterTime}));
return resultPair.first->second;
@@ -269,12 +287,58 @@ const LogicalSessionId& TransactionRouter::getSessionId() const {
return _sessionId;
}
-bool TransactionRouter::canContinueOnSnapshotError() const {
+bool TransactionRouter::_canContinueOnStaleShardOrDbError(StringData cmdName) const {
+ // We can always retry on the first overall statement.
+ if (_latestStmtId == _firstStmtId) {
+ return true;
+ }
+
+ if (alwaysRetryableCmds.count(cmdName)) {
+ return true;
+ }
+
+ return false;
+}
+
+void TransactionRouter::onStaleShardOrDbError(StringData cmdName) {
+ // TODO SERVER-37210: Implicitly abort the entire transaction if this uassert throws.
+ uassert(ErrorCodes::NoSuchTransaction,
+ "Transaction was aborted due to cluster data placement change",
+ _canContinueOnStaleShardOrDbError(cmdName));
+
+ // Remove each participant created at the most recent statement id and add them to the orphaned
+ // list because the retry attempt isn't guaranteed to retarget them. Participants created
+ // earlier are already fixed in the participant list, so they should not be removed.
+ for (auto&& it = _participants.begin(); it != _participants.end();) {
+ auto participant = it++;
+ if (participant->second.getStmtIdCreatedAt() == _latestStmtId) {
+ _orphanedParticipants.try_emplace(participant->first);
+ _participants.erase(participant);
+ }
+ }
+
+ // If there are no more participants, also clear the coordinator id because a new one must be
+ // chosen by the retry.
+ if (_participants.empty()) {
+ _coordinatorId.reset();
+ return;
+ }
+
+ // If this is not the first command, the coordinator must have been chosen and successfully
+ // contacted in an earlier command, and thus must not be in the orphaned list.
+ invariant(_coordinatorId);
+ invariant(_orphanedParticipants.count(*_coordinatorId) == 0);
+}
+
+bool TransactionRouter::_canContinueOnSnapshotError() const {
return _latestStmtId == _firstStmtId;
}
void TransactionRouter::onSnapshotError() {
- invariant(canContinueOnSnapshotError());
+ // TODO SERVER-37210: Implicitly abort the entire transaction if this uassert throws.
+ uassert(ErrorCodes::NoSuchTransaction,
+ "Transaction was aborted due to snapshot error on subsequent transaction statement",
+ _canContinueOnSnapshotError());
// Add each participant to the orphaned list because the retry attempt isn't guaranteed to
// re-target it.
diff --git a/src/mongo/s/transaction/transaction_router.h b/src/mongo/s/transaction/transaction_router.h
index 1294a82df38..a47a1f92db7 100644
--- a/src/mongo/s/transaction/transaction_router.h
+++ b/src/mongo/s/transaction/transaction_router.h
@@ -71,7 +71,9 @@ public:
*/
class Participant {
public:
- explicit Participant(bool isCoordinator, SharedTransactionOptions sharedOptions);
+ explicit Participant(bool isCoordinator,
+ StmtId stmtIdCreatedAt,
+ SharedTransactionOptions sharedOptions);
enum class State {
// Next transaction should include startTransaction.
@@ -101,9 +103,19 @@ public:
*/
void markAsCommandSent();
+ /**
+ * Returns the highest statement id of the command during which this participant was
+ * created.
+ */
+ StmtId getStmtIdCreatedAt() const;
+
private:
State _state{State::kMustStart};
const bool _isCoordinator{false};
+
+ // The highest statement id of the request during which this participant was created.
+ const StmtId _stmtIdCreatedAt{kUninitializedStmtId};
+
const SharedTransactionOptions _sharedOptions;
};
@@ -124,15 +136,15 @@ public:
void checkOut();
/**
- * Returns true if the current transaction can retry on a snapshot error. This is only true on
- * the first command recevied for a transaction.
+ * Updates the transaction state to allow for a retry of the current command on a stale version
+ * error. Will throw if the transaction cannot be continued.
*/
- bool canContinueOnSnapshotError() const;
+ void onStaleShardOrDbError(StringData cmdName);
/**
* Resets the transaction state to allow for a retry attempt. This includes clearing all
* participants and adding them to the orphaned list, clearing the coordinator, and resetting
- * the global read timestamp.
+ * the global read timestamp. Will throw if the transaction cannot be continued.
*/
void onSnapshotError();
@@ -194,6 +206,28 @@ private:
*/
Shard::CommandResponse _commitMultiShardTransaction(OperationContext* opCtx);
+ /**
+ * Returns true if the current transaction can retry on a stale version error from a contacted
+ * shard. This is always true except for an error received by a write that is not the first
+ * overall statement in the sharded transaction. This is because the entire command will be
+ * retried, and shards that were not stale and are targeted again may incorrectly execute the
+ * command a second time.
+ *
+ * Note: Even if this method returns true, the retry attempt may still fail, e.g. if one of the
+ * shards that returned a stale version error was involved in a previously completed a statement
+ * for this transaction.
+ *
+ * TODO SERVER-37207: Change batch writes to retry only the failed writes in a batch, to allow
+ * retrying writes beyond the first overall statement.
+ */
+ bool _canContinueOnStaleShardOrDbError(StringData cmdName) const;
+
+ /**
+ * Returns true if the current transaction can retry on a snapshot error. This is only true on
+ * the first command recevied for a transaction.
+ */
+ bool _canContinueOnSnapshotError() const;
+
const LogicalSessionId _sessionId;
TxnNumber _txnNumber{kUninitializedTxnNumber};
diff --git a/src/mongo/s/transaction/transaction_router_test.cpp b/src/mongo/s/transaction/transaction_router_test.cpp
index 3f5e7413c71..f74f3ab0935 100644
--- a/src/mongo/s/transaction/transaction_router_test.cpp
+++ b/src/mongo/s/transaction/transaction_router_test.cpp
@@ -52,6 +52,8 @@ protected:
const ShardId shard2 = ShardId("shard2");
const HostAndPort hostAndPort2 = HostAndPort("shard2:1234");
+ const ShardId shard3 = ShardId("shard3");
+
void setUp() {
ShardingTestFixture::setUp();
configTargeter()->setFindHostReturnValue(kTestConfigShardHost);
@@ -833,7 +835,6 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) {
LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTime);
// Simulate a snapshot error.
- ASSERT(txnRouter.canContinueOnSnapshotError());
txnRouter.onSnapshotError();
txnRouter.setAtClusterTimeToLatestTime(operationContext());
@@ -915,7 +916,6 @@ TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) {
// Simulate a snapshot error and an internal retry that only re-targets one of the original two
// shards.
- ASSERT(txnRouter.canContinueOnSnapshotError());
txnRouter.onSnapshotError();
ASSERT_FALSE(txnRouter.getCoordinatorId());
@@ -941,35 +941,220 @@ TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) {
}
}
-TEST_F(TransactionRouterTest, CanOnlyContinueOnSnapshotErrorOnFirstCommand) {
+TEST_F(TransactionRouterTest, OnSnapshotErrorThrowsAfterFirstCommand) {
TxnNumber txnNum{3};
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
- ASSERT(txnRouter.canContinueOnSnapshotError());
+ // Should not throw.
+ txnRouter.onSnapshotError();
+
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
+ ASSERT_THROWS_CODE(
+ txnRouter.onSnapshotError(), AssertionException, ErrorCodes::NoSuchTransaction);
+
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
+ ASSERT_THROWS_CODE(
+ txnRouter.onSnapshotError(), AssertionException, ErrorCodes::NoSuchTransaction);
+}
+
+TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) {
+ TransactionRouter txnRouter({});
+ txnRouter.checkOut();
+
+ TxnNumber txnNum{3};
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+
+ // Transaction 1 contacts shard1 and shard2 during the first command, then shard3 in the second
+ // command.
+
+ int initialStmtId = 0;
+ ASSERT_EQ(txnRouter.getOrCreateParticipant(shard1).getStmtIdCreatedAt(), initialStmtId);
+ ASSERT_EQ(txnRouter.getOrCreateParticipant(shard2).getStmtIdCreatedAt(), initialStmtId);
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
- ASSERT_FALSE(txnRouter.canContinueOnSnapshotError());
+
+ ShardId shard3("shard3");
+ ASSERT_EQ(txnRouter.getOrCreateParticipant(shard3).getStmtIdCreatedAt(), initialStmtId + 1);
+
+ ASSERT_EQ(txnRouter.getOrCreateParticipant(shard1).getStmtIdCreatedAt(), initialStmtId);
+ ASSERT_EQ(txnRouter.getOrCreateParticipant(shard2).getStmtIdCreatedAt(), initialStmtId);
+
+ // Transaction 2 contacts shard3 and shard2 during the first command, then shard1 in the second
+ // command.
+
+ repl::ReadConcernArgs::get(operationContext()) =
+ repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern);
+ TxnNumber txnNum2{5};
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true);
+
+ ASSERT_EQ(txnRouter.getOrCreateParticipant(shard3).getStmtIdCreatedAt(), initialStmtId);
+ ASSERT_EQ(txnRouter.getOrCreateParticipant(shard2).getStmtIdCreatedAt(), initialStmtId);
+
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum2, false);
+
+ ASSERT_EQ(txnRouter.getOrCreateParticipant(shard1).getStmtIdCreatedAt(), initialStmtId + 1);
+}
+
+TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOnFirstCommand) {
+ TxnNumber txnNum{3};
+
+ TransactionRouter txnRouter({});
+ txnRouter.checkOut();
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+
+ // Start a transaction on two shards, selecting one as the coordinator, but simulate a
+ // re-targeting error from at least one of them.
+
+ txnRouter.getOrCreateParticipant(shard1).markAsCommandSent();
+ txnRouter.getOrCreateParticipant(shard2).markAsCommandSent();
+
+ ASSERT(txnRouter.getCoordinatorId());
+ ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1);
+
+ ASSERT(txnRouter.getOrphanedParticipants().empty());
+
+ // Simulate stale error and internal retry that only re-targets one of the original shards.
+
+ txnRouter.onStaleShardOrDbError("find");
+
+ ASSERT_FALSE(txnRouter.getCoordinatorId());
+ ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U);
+
+ {
+ auto& participant = txnRouter.getOrCreateParticipant(shard2);
+ ASSERT(participant.mustStartTransaction());
+ participant.markAsCommandSent();
+ ASSERT_FALSE(participant.mustStartTransaction());
+ }
+
+ // There is a new coordinator and shard1 is still in the orphaned list.
+ ASSERT(txnRouter.getCoordinatorId());
+ ASSERT_EQ(*txnRouter.getCoordinatorId(), shard2);
+ ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 1U);
+ ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 1U);
+
+ // Shard1 has not started a transaction.
+ ASSERT(txnRouter.getOrCreateParticipant(shard1).mustStartTransaction());
+}
+
+TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsAddedToOrphanedListOnStaleError) {
+ TxnNumber txnNum{3};
+
+ TransactionRouter txnRouter({});
+ txnRouter.checkOut();
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+
+ // First statement successfully targets one shard, selecing it as the coordinator.
+
+ txnRouter.getOrCreateParticipant(shard1).markAsCommandSent();
+
+ ASSERT(txnRouter.getCoordinatorId());
+ ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1);
+
+ ASSERT(txnRouter.getOrphanedParticipants().empty());
+
+ // Start a subsequent statement that targets two new shards and encounters a stale error from at
+ // least one of them.
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
- ASSERT_FALSE(txnRouter.canContinueOnSnapshotError());
+
+ txnRouter.getOrCreateParticipant(shard2).markAsCommandSent();
+ txnRouter.getOrCreateParticipant(shard3).markAsCommandSent();
+
+ txnRouter.onStaleShardOrDbError("find");
+
+ // Only the two new shards are in the orphaned list.
+ ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U);
+ ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 0U);
+
+ // Shards 2 and 3 must start a transaction, but shard 1 must not.
+ ASSERT_FALSE(txnRouter.getOrCreateParticipant(shard1).mustStartTransaction());
+ ASSERT(txnRouter.getOrCreateParticipant(shard2).mustStartTransaction());
+ ASSERT(txnRouter.getOrCreateParticipant(shard3).mustStartTransaction());
+}
+
+TEST_F(TransactionRouterTest, RetryOnStaleErrorCannotPickNewAtClusterTime) {
+ TxnNumber txnNum{3};
+
+ TransactionRouter txnRouter({});
+ txnRouter.checkOut();
+ txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
+
+ BSONObj expectedReadConcern = BSON("level"
+ << "snapshot"
+ << "atClusterTime"
+ << kInMemoryLogicalTime.asTimestamp());
+
+ {
+ auto& participant = txnRouter.getOrCreateParticipant(shard1);
+ auto newCmd = participant.attachTxnFieldsIfNeeded(BSON("find"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
+ participant.markAsCommandSent();
+ }
+
+ // Advance the latest time in the logical clock, simulate a stale config/db error, and verify
+ // the retry attempt cannot pick a new atClusterTime.
+ LogicalTime laterTime(Timestamp(1000, 1));
+ ASSERT_GT(laterTime, kInMemoryLogicalTime);
+ LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTime);
+
+ txnRouter.onStaleShardOrDbError("find");
+
+ txnRouter.setAtClusterTimeToLatestTime(operationContext());
+
+ {
+ auto& participant = txnRouter.getOrCreateParticipant(shard1);
+ auto newCmd = participant.attachTxnFieldsIfNeeded(BSON("find"
+ << "test"));
+ ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj());
+ }
}
-DEATH_TEST_F(TransactionRouterTest, CannotCallOnSnapshotErrorAfterFirstCommand, "invariant") {
+TEST_F(TransactionRouterTest, WritesCanOnlyBeRetriedIfFirstOverallCommand) {
+ auto writeCmds = {"insert", "update", "delete", "findAndModify", "findandmodify"};
+ auto otherCmds = {"find", "distinct", "aggregate", "killCursors", "getMore"};
+
TxnNumber txnNum{3};
TransactionRouter txnRouter({});
txnRouter.checkOut();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, true);
+ txnRouter.getOrCreateParticipant(shard1).markAsCommandSent();
+
+ for (auto writeCmd : writeCmds) {
+ txnRouter.onStaleShardOrDbError(writeCmd); // Should not throw.
+ }
+
+ for (auto cmd : otherCmds) {
+ txnRouter.onStaleShardOrDbError(cmd); // Should not throw.
+ }
+
+ // Advance to the next command.
+
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
txnRouter.beginOrContinueTxn(operationContext(), txnNum, false);
- txnRouter.onSnapshotError();
+ for (auto writeCmd : writeCmds) {
+ ASSERT_THROWS_CODE(txnRouter.onStaleShardOrDbError(writeCmd),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+ }
+
+ for (auto cmd : otherCmds) {
+ txnRouter.onStaleShardOrDbError(cmd); // Should not throw.
+ }
}
} // unnamed namespace