diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-09-05 19:35:51 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-09-19 13:20:05 -0400 |
commit | 3bab189695c705ff163721652add910b32c2659e (patch) | |
tree | 0c8a116ff0744c943ff5d7844395717fa7e81270 | |
parent | 5f61edb2c73e569cb4b265ca8f453536e7b7b016 (diff) | |
download | mongo-3bab189695c705ff163721652add910b32c2659e.tar.gz |
SERVER-35707 Allow mongos to retry on re-targeting errors in a transaction
23 files changed, 859 insertions, 148 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml index d0a027b2e0a..87f14bfbbe0 100644 --- a/buildscripts/resmokeconfig/suites/sharded_core_txns.yml +++ b/buildscripts/resmokeconfig/suites/sharded_core_txns.yml @@ -62,10 +62,8 @@ selector: # TODO SERVER-35825: Mongos should only allow readConcern on writes that start transactions. - jstests/core/txns/no_read_concern_snapshot_outside_txn.js - # TODO SERVER-35707: Figure out transaction abort state on re-targeting exceptions. + # TODO SERVER-37209: Allow mongos to retry on view errors. - jstests/core/txns/view_reads_in_transaction.js - - jstests/core/txns/list_collections_not_blocked_by_txn.js - - jstests/core/txns/statement_ids_accepted.js # Uses hangAfterCollectionInserts failpoint not available on mongos. - jstests/core/txns/speculative_snapshot_includes_all_writes.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 154680cef50..8bb47be31a5 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -44,6 +44,8 @@ selector: - jstests/sharding/snapshot_cursor_commands_mongos.js - jstests/sharding/transactions_snapshot_errors_first_statement.js - jstests/sharding/transactions_snapshot_errors_subsequent_statements.js + - jstests/sharding/transactions_stale_database_version_errors.js + - jstests/sharding/transactions_stale_shard_version_errors.js - jstests/sharding/txn_basic_two_phase_commit.js - jstests/sharding/txn_coordinator_commands_basic_requirements.js - jstests/sharding/update_sharded.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index fc814445595..9a02d418dc4 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -359,6 +359,8 @@ selector: - jstests/sharding/snapshot_cursor_commands_mongos.js - jstests/sharding/transactions_snapshot_errors_first_statement.js - jstests/sharding/transactions_snapshot_errors_subsequent_statements.js + - jstests/sharding/transactions_stale_database_version_errors.js + - jstests/sharding/transactions_stale_shard_version_errors.js - jstests/sharding/txn_basic_two_phase_commit.js - jstests/sharding/txn_coordinator_commands_basic_requirements.js - jstests/sharding/update_sharded.js diff --git a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 index 1d34a2a3a89..6eb6ffdfb60 100644 --- a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 +++ b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 @@ -57,6 +57,8 @@ selector: - jstests/sharding/snapshot_cursor_commands_mongos.js - jstests/sharding/transactions_snapshot_errors_first_statement.js - jstests/sharding/transactions_snapshot_errors_subsequent_statements.js + - jstests/sharding/transactions_stale_database_version_errors.js + - jstests/sharding/transactions_stale_shard_version_errors.js - jstests/sharding/txn_basic_two_phase_commit.js - jstests/sharding/txn_coordinator_commands_basic_requirements.js - jstests/sharding/update_sharded.js diff --git a/jstests/core/txns/concurrent_drops_and_creates.js b/jstests/core/txns/concurrent_drops_and_creates.js index 5ddf7ca260b..8db02f1ae98 100644 --- a/jstests/core/txns/concurrent_drops_and_creates.js +++ b/jstests/core/txns/concurrent_drops_and_creates.js @@ -64,7 +64,9 @@ // since the transaction's read timestamp. Since our implementation of the in-memory collection // catalog always has the most recent collection metadata, we do not allow you to read from a // collection at a time prior to its most recent catalog changes. - assert.commandFailedWithCode(sessionCollB.insert({}), ErrorCodes.SnapshotUnavailable); + const isMongos = assert.commandWorked(db.runCommand("ismaster")).msg === "isdbgrid"; + const expectedCode = isMongos ? ErrorCodes.NoSuchTransaction : ErrorCodes.SnapshotUnavailable; + assert.commandFailedWithCode(sessionCollB.insert({}), expectedCode); assert.commandFailedWithCode(session.abortTransaction_forTesting(), ErrorCodes.NoSuchTransaction); diff --git a/jstests/core/txns/list_collections_not_blocked_by_txn.js b/jstests/core/txns/list_collections_not_blocked_by_txn.js index fc4637abd5d..33a6d9eb766 100644 --- a/jstests/core/txns/list_collections_not_blocked_by_txn.js +++ b/jstests/core/txns/list_collections_not_blocked_by_txn.js @@ -12,6 +12,14 @@ assert.commandWorked(mydb.createCollection("foo", {writeConcern: {w: "majority"}})); session.startTransaction({readConcern: {level: "snapshot"}}); + + const isMongos = assert.commandWorked(db.runCommand("ismaster")).msg === "isdbgrid"; + if (isMongos) { + // Force the shard to refresh its database version, because this requires a database + // exclusive lock, which will block behind the transaction. + sessionDb.foo.distinct("x"); + } + assert.commandWorked(sessionDb.foo.insert({x: 1})); for (let nameOnly of[false, true]) { diff --git a/jstests/core/txns/statement_ids_accepted.js b/jstests/core/txns/statement_ids_accepted.js index 360d42bc3f5..51795d984d9 100644 --- a/jstests/core/txns/statement_ids_accepted.js +++ b/jstests/core/txns/statement_ids_accepted.js @@ -162,38 +162,6 @@ autocommit: false })); - jsTestLog("Check that geoSearch accepts a statement ID"); - assert.writeOK(testColl.insert({geo: {type: "Point", coordinates: [0, 0]}, a: 0}), - {writeConcern: {w: "majority"}}); - assert.writeOK(testColl.insert({geoh: {lat: 0, long: 0}, b: 0}), - {writeConcern: {w: "majority"}}); - assert.commandWorked(sessionDb.runCommand({ - createIndexes: collName, - indexes: [ - {name: "geo", key: {geo: "2dsphere"}}, - {name: "geoh", key: {geoh: "geoHaystack", b: 1}, bucketSize: 1} - ], - writeConcern: {w: "majority"} - })); - // Ensure the snapshot is available following the index creation. - assert.soonNoExcept(function() { - testColl.find({}, {readConcern: {level: "snapshot"}}); - return true; - }); - - jsTestLog("Check that geoSearch accepts a statement ID"); - assert.commandWorked(sessionDb.runCommand({ - geoSearch: collName, - search: {b: 0}, - near: [0, 0], - maxDistance: 1, - readConcern: {level: "snapshot"}, - txnNumber: NumberLong(txnNumber++), - stmtId: NumberInt(0), - startTransaction: true, - autocommit: false - })); - jsTestLog("Check that insert accepts a statement ID"); assert.commandWorked(sessionDb.runCommand({ insert: collName, @@ -219,36 +187,73 @@ stmtId: NumberInt(0) })); - jsTestLog("Check that prepareTransaction accepts a statement ID"); - assert.commandWorked(sessionDb.runCommand({ - insert: collName, - documents: [{_id: "doc2"}], - readConcern: {level: "snapshot"}, - txnNumber: NumberLong(txnNumber), - stmtId: NumberInt(0), - startTransaction: true, - autocommit: false - })); - // prepareTransaction can only be run on the admin database. - assert.commandWorked(sessionDb.adminCommand({ - prepareTransaction: 1, - txnNumber: NumberLong(txnNumber), - stmtId: NumberInt(1), - autocommit: false - })); - assert.commandWorked(sessionDb.adminCommand({ - abortTransaction: 1, - txnNumber: NumberLong(txnNumber++), - stmtId: NumberInt(2), - autocommit: false - })); - assert.commandFailedWithCode(sessionDb.runCommand({ - prepareTransaction: 1, - txnNumber: NumberLong(txnNumber++), - stmtId: NumberInt(0), - autocommit: false - }), - ErrorCodes.Unauthorized); + const isMongos = assert.commandWorked(db.runCommand("ismaster")).msg === "isdbgrid"; + if (!isMongos) { + // Skip commands that do not exist on mongos. + + jsTestLog("Check that geoSearch accepts a statement ID"); + assert.writeOK(testColl.insert({geo: {type: "Point", coordinates: [0, 0]}, a: 0}), + {writeConcern: {w: "majority"}}); + assert.writeOK(testColl.insert({geoh: {lat: 0, long: 0}, b: 0}), + {writeConcern: {w: "majority"}}); + assert.commandWorked(sessionDb.runCommand({ + createIndexes: collName, + indexes: [ + {name: "geo", key: {geo: "2dsphere"}}, + {name: "geoh", key: {geoh: "geoHaystack", b: 1}, bucketSize: 1} + ], + writeConcern: {w: "majority"} + })); + // Ensure the snapshot is available following the index creation. + assert.soonNoExcept(function() { + testColl.find({}, {readConcern: {level: "snapshot"}}); + return true; + }); + + jsTestLog("Check that geoSearch accepts a statement ID"); + assert.commandWorked(sessionDb.runCommand({ + geoSearch: collName, + search: {b: 0}, + near: [0, 0], + maxDistance: 1, + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(txnNumber++), + stmtId: NumberInt(0), + startTransaction: true, + autocommit: false + })); + + jsTestLog("Check that prepareTransaction accepts a statement ID"); + assert.commandWorked(sessionDb.runCommand({ + insert: collName, + documents: [{_id: "doc2"}], + readConcern: {level: "snapshot"}, + txnNumber: NumberLong(txnNumber), + stmtId: NumberInt(0), + startTransaction: true, + autocommit: false + })); + // prepareTransaction can only be run on the admin database. + assert.commandWorked(sessionDb.adminCommand({ + prepareTransaction: 1, + txnNumber: NumberLong(txnNumber), + stmtId: NumberInt(1), + autocommit: false + })); + assert.commandWorked(sessionDb.adminCommand({ + abortTransaction: 1, + txnNumber: NumberLong(txnNumber++), + stmtId: NumberInt(2), + autocommit: false + })); + assert.commandFailedWithCode(sessionDb.runCommand({ + prepareTransaction: 1, + txnNumber: NumberLong(txnNumber++), + stmtId: NumberInt(0), + autocommit: false + }), + ErrorCodes.Unauthorized); + } // refreshLogicalSessionCacheNow is intentionally omitted. diff --git a/jstests/noPassthrough/readConcern_snapshot_mongos.js b/jstests/noPassthrough/readConcern_snapshot_mongos.js index 2fb7e4387df..db11ca7d3ef 100644 --- a/jstests/noPassthrough/readConcern_snapshot_mongos.js +++ b/jstests/noPassthrough/readConcern_snapshot_mongos.js @@ -29,10 +29,6 @@ // Insert data to create the collection. assert.writeOK(testDB[collName].insert({x: 1})); - // TODO SERVER-35707: Re-targeting errors abort transactions. Run distinct on the unsharded - // collection to force the shard and mongos to refresh their DB versions. - assert.commandWorked(testDB.runCommand({distinct: collName, key: "x"})); - // noPassthrough tests // readConcern 'snapshot' is not allowed outside session context. diff --git a/jstests/sharding/libs/sharded_transactions_helpers.js b/jstests/sharding/libs/sharded_transactions_helpers.js index 7876e2934b9..422bdac250d 100644 --- a/jstests/sharding/libs/sharded_transactions_helpers.js +++ b/jstests/sharding/libs/sharded_transactions_helpers.js @@ -19,13 +19,3 @@ function unsetFailCommandOnEachShard(st, numShards) { shardConn.adminCommand({configureFailPoint: "failCommand", mode: "off"})); } } - -// TODO SERVER-35707: Routing table cache updates are necessary until mongos is able to retry on -// stale shard and database version errors. -function flushShardRoutingTableUpdates(st, dbName, ns, numShards) { - for (let i = 0; i < numShards; i++) { - const shardConn = st["rs" + i].getPrimary(); - assert.commandWorked(shardConn.adminCommand({_flushDatabaseCacheUpdates: dbName})); - assert.commandWorked(shardConn.adminCommand({_flushRoutingTableCacheUpdates: ns})); - } -} diff --git a/jstests/sharding/snapshot_cursor_commands_mongos.js b/jstests/sharding/snapshot_cursor_commands_mongos.js index 3a65e9c7f44..30cee7728d1 100644 --- a/jstests/sharding/snapshot_cursor_commands_mongos.js +++ b/jstests/sharding/snapshot_cursor_commands_mongos.js @@ -100,15 +100,6 @@ assert.eq( 1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard2.shardName})); - // Routing table cache updates are necessary until mongos retargeting is fixed. - // TODO: SERVER-35707 - assert.commandWorked( - st.shard0.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns})); - assert.commandWorked( - st.shard1.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns})); - assert.commandWorked( - st.shard2.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns})); - return st; } }, @@ -144,13 +135,6 @@ assert.eq( 1, mongos.getDB('config').chunks.count({ns: ns, shard: st.shard2.shardName})); - // Routing table cache updates are necessary until mongos retargeting is fixed. - // TODO: SERVER-35707 - assert.commandWorked( - st.shard1.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns})); - assert.commandWorked( - st.shard2.getDB('admin').runCommand({_flushRoutingTableCacheUpdates: ns})); - return st; } }, diff --git a/jstests/sharding/transactions_snapshot_errors_first_statement.js b/jstests/sharding/transactions_snapshot_errors_first_statement.js index 220426f475b..7e31480d520 100644 --- a/jstests/sharding/transactions_snapshot_errors_first_statement.js +++ b/jstests/sharding/transactions_snapshot_errors_first_statement.js @@ -110,7 +110,8 @@ setFailCommandOnShards(st, "alwaysOn", [commandName], errorCode, numShardsToError); session.startTransaction({readConcern: {level: "snapshot"}}); - const res = assert.commandFailedWithCode(sessionDB.runCommand(commandBody), errorCode); + const res = assert.commandFailedWithCode(sessionDB.runCommand(commandBody), + ErrorCodes.NoSuchTransaction); assert.eq(res.errorLabels, ["TransientTransactionError"]); session.abortTransaction(); @@ -125,7 +126,6 @@ assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}})); st.ensurePrimaryShard(dbName, st.shard0.shardName); - flushShardRoutingTableUpdates(st, dbName, ns, 2); for (let errorCode of kSnapshotErrors) { runTest(st, collName, 1, errorCode, false); @@ -144,7 +144,6 @@ assert.eq(2, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName})); assert.eq(0, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName})); - flushShardRoutingTableUpdates(st, dbName, ns, 2); for (let errorCode of kSnapshotErrors) { runTest(st, collName, 1, errorCode, false); @@ -156,7 +155,6 @@ st.s.adminCommand({moveChunk: ns, find: {_id: 15}, to: st.shard1.shardName})); assert.eq(1, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName})); assert.eq(1, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName})); - flushShardRoutingTableUpdates(st, dbName, ns, 2); for (let errorCode of kSnapshotErrors) { runTest(st, collName, 2, errorCode, true); diff --git a/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js b/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js index 96b74bc735a..f346a8eea48 100644 --- a/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js +++ b/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js @@ -55,7 +55,8 @@ // Verify the command must fail on a snapshot error from a subsequent statement. setFailCommandOnShards(st, {times: 1}, [commandName], errorCode, 1); - const res = assert.commandFailedWithCode(sessionDB.runCommand(commandBody), errorCode); + const res = assert.commandFailedWithCode(sessionDB.runCommand(commandBody), + ErrorCodes.NoSuchTransaction); assert.eq(res.errorLabels, ["TransientTransactionError"]); session.abortTransaction(); @@ -68,7 +69,6 @@ assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}})); st.ensurePrimaryShard(dbName, st.shard0.shardName); - flushShardRoutingTableUpdates(st, dbName, ns, 2); // Single shard case simulates the storage engine discarding an in-use snapshot. for (let errorCode of kSnapshotErrors) { @@ -88,7 +88,6 @@ assert.eq(2, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName})); assert.eq(0, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName})); - flushShardRoutingTableUpdates(st, dbName, ns, 2); for (let errorCode of kSnapshotErrors) { runTest(st, collName, errorCode); @@ -100,7 +99,6 @@ st.s.adminCommand({moveChunk: ns, find: {_id: 15}, to: st.shard1.shardName})); assert.eq(1, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard0.shardName})); assert.eq(1, st.s.getDB('config').chunks.count({ns: ns, shard: st.shard1.shardName})); - flushShardRoutingTableUpdates(st, dbName, ns, 2); // Multi shard case simulates adding a new participant that can no longer support the already // chosen read timestamp. diff --git a/jstests/sharding/transactions_stale_database_version_errors.js b/jstests/sharding/transactions_stale_database_version_errors.js new file mode 100644 index 00000000000..02f1d37c113 --- /dev/null +++ b/jstests/sharding/transactions_stale_database_version_errors.js @@ -0,0 +1,114 @@ +// Tests mongos behavior on stale database version errors received in a transaction. +// +// @tags: [requires_sharding, uses_transactions] +(function() { + "use strict"; + + const dbName = "test"; + const collName = "foo"; + + const st = new ShardingTest({shards: 2, mongos: 1, config: 1}); + + // Set up two unsharded collections in different databases with shard0 as their primary. + + assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 0}, {writeConcern: {w: "majority"}})); + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + st.ensurePrimaryShard(dbName, st.shard0.shardName); + + const session = st.s.startSession(); + const sessionDB = session.getDatabase(dbName); + + // + // Stale database version on first overall command should succeed. + // + + session.startTransaction(); + + // No database versioned requests have been sent to Shard0, so it is stale. + assert.commandWorked(sessionDB.runCommand({distinct: collName, key: "_id", query: {_id: 0}})); + + // TODO SERVER-36304: Change this to commitTransaction once multi shard transactions can be + // committed through mongos. + session.abortTransaction(); + + // + // Stale database version on second command to a shard should fail. + // + + st.ensurePrimaryShard(dbName, st.shard1.shardName); + + session.startTransaction(); + + // Find is not database versioned so it will not trigger SDV or a refresh on Shard0. + assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 0}})); + + // Distinct is database versioned, so it will trigger SDV. The router will retry and the retry + // will discover the transaction was aborted, because a previous statement had completed on + // Shard0. + let res = assert.commandFailedWithCode( + sessionDB.runCommand({distinct: collName, key: "_id", query: {_id: 0}}), + ErrorCodes.NoSuchTransaction); + assert.eq(res.errorLabels, ["TransientTransactionError"]); + + session.abortTransaction(); + + // + // Stale database version on first command to a new shard should succeed. + // + + // Create a new database on Shard0. + const otherDbName = "other_test"; + const otherCollName = "bar"; + + assert.writeOK( + st.s.getDB(otherDbName)[otherCollName].insert({_id: 0}, {writeConcern: {w: "majority"}})); + assert.commandWorked(st.s.adminCommand({enableSharding: otherDbName})); + st.ensurePrimaryShard(otherDbName, st.shard0.shardName); + + const sessionOtherDB = session.getDatabase(otherDbName); + + // Advance the router's cached last committed opTime for Shard0, so it chooses a read timestamp + // after the collection is created on shard1, to avoid SnapshotUnavailable. + assert.commandWorked( + sessionOtherDB.runCommand({find: otherCollName})); // Not database versioned. + assert.writeOK(sessionDB[collName].insert({_id: 1}, {writeConcern: {w: "majority"}})); + + session.startTransaction(); + + // Target the first database which is on Shard1. + assert.commandWorked(sessionDB.runCommand({distinct: collName, key: "_id", query: {_id: 0}})); + + // Targets the new database on Shard0 which is stale, so a database versioned request should + // trigger SDV. + assert.commandWorked( + sessionOtherDB.runCommand({distinct: otherCollName, key: "_id", query: {_id: 0}})); + + // TODO SERVER-36304: Change this to commitTransaction. + session.abortTransaction(); + + // + // NoSuchTransaction should be returned if the router exhausts its retries. + // + + st.ensurePrimaryShard(dbName, st.shard0.shardName); + + // Disable database metadata refreshes on the stale shard so it will indefinitely return a stale + // version error. + assert.commandWorked(st.rs0.getPrimary().adminCommand( + {configureFailPoint: "skipDatabaseVersionMetadataRefresh", mode: "alwaysOn"})); + + session.startTransaction(); + + // Target the first database which is on Shard0. The shard is stale and won't refresh its + // metadata, so mongos should exhaust its retries and implicitly abort the transaction. + assert.commandFailedWithCode( + sessionDB.runCommand({distinct: collName, key: "_id", query: {_id: 0}}), + ErrorCodes.NoSuchTransaction); + + session.abortTransaction(); + + assert.commandWorked(st.rs0.getPrimary().adminCommand( + {configureFailPoint: "skipDatabaseVersionMetadataRefresh", mode: "off"})); + + st.stop(); +})(); diff --git a/jstests/sharding/transactions_stale_shard_version_errors.js b/jstests/sharding/transactions_stale_shard_version_errors.js new file mode 100644 index 00000000000..4d9b7f7edf3 --- /dev/null +++ b/jstests/sharding/transactions_stale_shard_version_errors.js @@ -0,0 +1,251 @@ +// Tests mongos behavior on stale shard version errors received in a transaction. +// +// @tags: [requires_sharding, uses_transactions] +(function() { + "use strict"; + + function expectChunks(st, ns, chunks) { + for (let i = 0; i < chunks.length; i++) { + assert.eq(chunks[i], + st.s.getDB("config").chunks.count({ns: ns, shard: st["shard" + i].shardName}), + "unexpected number of chunks on shard " + i); + } + } + + const dbName = "test"; + const collName = "foo"; + const ns = dbName + '.' + collName; + + const st = new ShardingTest({shards: 3, mongos: 2, config: 1}); + + // Disable the best-effort recipient metadata refresh after migrations to simplify simulating + // stale shard version errors. + assert.commandWorked(st.rs0.getPrimary().adminCommand( + {configureFailPoint: "doNotRefreshRecipientAfterCommit", mode: "alwaysOn"})); + assert.commandWorked(st.rs1.getPrimary().adminCommand( + {configureFailPoint: "doNotRefreshRecipientAfterCommit", mode: "alwaysOn"})); + assert.commandWorked(st.rs2.getPrimary().adminCommand( + {configureFailPoint: "doNotRefreshRecipientAfterCommit", mode: "alwaysOn"})); + + // Shard two collections in the same database, each with 2 chunks, [minKey, 0), [0, maxKey), + // with one document each, all on Shard0. + + assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: -5}, {writeConcern: {w: "majority"}})); + assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 5}, {writeConcern: {w: "majority"}})); + + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + st.ensurePrimaryShard(dbName, st.shard0.shardName); + + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); + assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); + + expectChunks(st, ns, [2, 0, 0]); + + const otherCollName = "bar"; + const otherNs = dbName + "." + otherCollName; + + assert.writeOK( + st.s.getDB(dbName)[otherCollName].insert({_id: -5}, {writeConcern: {w: "majority"}})); + assert.writeOK( + st.s.getDB(dbName)[otherCollName].insert({_id: 5}, {writeConcern: {w: "majority"}})); + + assert.commandWorked(st.s.adminCommand({shardCollection: otherNs, key: {_id: 1}})); + assert.commandWorked(st.s.adminCommand({split: otherNs, middle: {_id: 0}})); + + expectChunks(st, otherNs, [2, 0, 0]); + + const session = st.s.startSession(); + const sessionDB = session.getDatabase(dbName); + + // + // Stale shard version on first overall command should succeed. + // + + // Move a chunk in the first collection from Shard0 to Shard1 through the main mongos, so Shard1 + // is stale but not the router. + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName})); + expectChunks(st, ns, [1, 1, 0]); + + session.startTransaction(); + + // Targets Shard1, which is stale. + assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 5}})); + + // TODO SERVER-36304: Change this to commitTransaction once multi shard transactions can be + // committed through mongos. + session.abortTransaction(); + + // + // Stale shard version on second command to a shard should fail. + // + + expectChunks(st, ns, [1, 1, 0]); + + // Move a chunk in the other collection from Shard0 to Shard1 through the main mongos, so Shard1 + // is stale for the other collection but not the router. + assert.commandWorked( + st.s.adminCommand({moveChunk: otherNs, find: {_id: 5}, to: st.shard1.shardName})); + expectChunks(st, otherNs, [1, 1, 0]); + + session.startTransaction(); + + // Targets Shard1 for the first ns, which is not stale. + assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 5}})); + + // Targets the other sharded collection on Shard1, which is stale. Because a previous statement + // has executed on Shard1, the retry will not restart the transaction, and will fail when it + // finds the transaction has aborted because of the stale shard version. + let res = + assert.commandFailedWithCode(sessionDB.runCommand({find: otherCollName, filter: {_id: 5}}), + ErrorCodes.NoSuchTransaction); + assert.eq(res.errorLabels, ["TransientTransactionError"]); + + session.abortTransaction(); + + // + // Stale shard version on first command to a new shard should succeed. + // + + expectChunks(st, ns, [1, 1, 0]); + + // Move a chunk for the other collection from Shard1 to Shard0 through the main mongos, so + // Shard0 is stale for it and the router is not. + assert.commandWorked( + st.s.adminCommand({moveChunk: otherNs, find: {_id: 5}, to: st.shard0.shardName})); + expectChunks(st, otherNs, [2, 0, 0]); + + session.startTransaction(); + + // Targets Shard1 for the first ns, which is not stale. + assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 5}})); + + // Targets Shard0 for the other ns, which is stale. + assert.commandWorked(sessionDB.runCommand({find: otherCollName, filter: {_id: 5}})); + + // TODO SERVER-36304: Change this to commitTransaction. + session.abortTransaction(); + + // + // Stale mongos aborts on old shard. + // + + // Move a chunk in the first collection from Shard1 to Shard0 through the other mongos, so + // Shard1 and the main mongos are stale for it. + const otherMongos = st.s1; + assert.commandWorked( + otherMongos.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard0.shardName})); + expectChunks(st, ns, [2, 0, 0]); + + session.startTransaction(); + + // Targets Shard1, which hits a stale version error, then re-targets Shard0, which is also + // stale but should succeed. + assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 5}})); + + // TODO SERVER-36304: Change this to commitTransaction. + session.abortTransaction(); + + // Verify there is no in-progress transaction on Shard1. + res = assert.commandFailedWithCode(st.rs1.getPrimary().getDB(dbName).runCommand({ + find: collName, + lsid: session.getSessionId(), + txnNumber: NumberLong(session.getTxnNumber_forTesting()), + autocommit: false, + }), + ErrorCodes.NoSuchTransaction); + assert.eq(res.errorLabels, ["TransientTransactionError"]); + + // + // More than one stale shard version error. + // + + // Move chunks for the first ns from Shard0 to Shard1 and Shard2 through the main mongos, so + // both are stale but not the router. + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName})); + expectChunks(st, ns, [1, 0, 1]); + + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: -5}, to: st.shard1.shardName})); + expectChunks(st, ns, [0, 1, 1]); + + session.startTransaction(); + + // Targets all shards, two of which are stale. + assert.commandWorked(sessionDB.runCommand({find: collName})); + + // TODO SERVER-36304: Change this to commitTransaction. + session.abortTransaction(); + + // + // Can retry a stale write on the first statement. + // + + // Move a chunk to Shard1 to make it stale. + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard1.shardName})); + expectChunks(st, ns, [0, 2, 0]); + + session.startTransaction(); + + // Targets Shard1, which is stale. + assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [{_id: 6}]})); + + // TODO SERVER-36304: Change this to commitTransaction. + session.abortTransaction(); + + // + // Cannot retry a stale write past the first statement. + // + // TODO SERVER-37207: Change batch writes to retry only the failed writes in a batch, to allow + // retrying writes beyond the first overall statement. + // + + // Move a chunk to Shard2 to make it stale. + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 5}, to: st.shard2.shardName})); + expectChunks(st, ns, [0, 1, 1]); + + session.startTransaction(); + + // Targets Shard1, which is not stale. + assert.commandWorked(sessionDB.runCommand({insert: collName, documents: [{_id: -4}]})); + + // Targets Shard2, which is stale. + res = assert.commandFailedWithCode( + sessionDB.runCommand({insert: collName, documents: [{_id: 7}]}), + ErrorCodes.NoSuchTransaction); + assert.eq(res.errorLabels, ["TransientTransactionError"]); + + // TODO SERVER-36304: Change this to commitTransaction. + session.abortTransaction(); + + // + // NoSuchTransaction should be returned if the router exhausts its retries. + // + + // Move a chunk to Shard0 to make it stale. + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: -5}, to: st.shard0.shardName})); + expectChunks(st, ns, [1, 0, 1]); + + // Disable metadata refreshes on the stale shard so it will indefinitely return a stale version + // error. + assert.commandWorked(st.rs0.getPrimary().adminCommand( + {configureFailPoint: "skipShardFilteringMetadataRefresh", mode: "alwaysOn"})); + + session.startTransaction(); + + // Targets Shard0, which is stale and won't refresh its metadata, so mongos should exhaust its + // retries and implicitly abort the transaction. + assert.commandFailedWithCode(sessionDB.runCommand({find: collName, filter: {_id: -5}}), + ErrorCodes.NoSuchTransaction); + + session.abortTransaction(); + + assert.commandWorked(st.rs0.getPrimary().adminCommand( + {configureFailPoint: "skipShardFilteringMetadataRefresh", mode: "off"})); + + st.stop(); +})(); diff --git a/src/mongo/db/handle_request_response.cpp b/src/mongo/db/handle_request_response.cpp index c5b27c6a106..cddf7ee8145 100644 --- a/src/mongo/db/handle_request_response.cpp +++ b/src/mongo/db/handle_request_response.cpp @@ -40,9 +40,7 @@ BSONObj getErrorLabels(const boost::optional<OperationSessionInfoFromClient>& se bool isRetryable = ErrorCodes::isNotMasterError(code) || ErrorCodes::isShutdownError(code); bool isTransientTransactionError = code == ErrorCodes::WriteConflict // - || code == ErrorCodes::SnapshotTooOld // || code == ErrorCodes::SnapshotUnavailable // - || code == ErrorCodes::StaleChunkHistory // || code == ErrorCodes::NoSuchTransaction // || code == ErrorCodes::LockTimeout // || code == ErrorCodes::PreparedTransactionInProgress // diff --git a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp index 3475891e19d..8b670aeb4bc 100644 --- a/src/mongo/db/s/shard_filtering_metadata_refresh.cpp +++ b/src/mongo/db/s/shard_filtering_metadata_refresh.cpp @@ -43,10 +43,14 @@ #include "mongo/db/s/sharding_statistics.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/grid.h" +#include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" namespace mongo { +MONGO_FAIL_POINT_DEFINE(skipDatabaseVersionMetadataRefresh); +MONGO_FAIL_POINT_DEFINE(skipShardFilteringMetadataRefresh); + namespace { void onShardVersionMismatch(OperationContext* opCtx, @@ -84,6 +88,10 @@ void onShardVersionMismatch(OperationContext* opCtx, // smaller than the one we know about. This means that the remote side is behind. } + if (MONGO_FAIL_POINT(skipShardFilteringMetadataRefresh)) { + return; + } + forceShardFilteringMetadataRefresh(opCtx, nss, forceRefreshFromThisThread); } @@ -107,6 +115,10 @@ void onDbVersionMismatch(OperationContext* opCtx, // StaleDatabaseVersion retry attempts while the movePrimary is being committed. OperationShardingState::get(opCtx).waitForMovePrimaryCriticalSectionSignal(opCtx); + if (MONGO_FAIL_POINT(skipDatabaseVersionMetadataRefresh)) { + return; + } + forceDatabaseRefresh(opCtx, dbName); } diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 166bd401b1d..0a36636b1fd 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -404,6 +404,19 @@ void runCommand(OperationContext* opCtx, Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); + // Update transaction tracking state for a possible retry. Throws if the transaction + // cannot continue. + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter->onStaleShardOrDbError(commandName); + // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws. + uassert(ErrorCodes::NoSuchTransaction, + str::stream() << "Transaction " << opCtx->getTxnNumber() + << " was aborted after " + << kMaxNumStaleVersionRetries + << " failed retries", + canRetry); + } + if (canRetry) { continue; } @@ -412,25 +425,41 @@ void runCommand(OperationContext* opCtx, // Mark database entry in cache as stale. Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(ex->getDb(), ex->getVersionReceived()); + + // Update transaction tracking state for a possible retry. Throws if the transaction + // cannot continue. + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter->onStaleShardOrDbError(commandName); + // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws. + uassert(ErrorCodes::NoSuchTransaction, + str::stream() << "Transaction " << opCtx->getTxnNumber() + << " was aborted after " + << kMaxNumStaleVersionRetries + << " failed retries", + canRetry); + } + if (canRetry) { continue; } throw; - } catch (ExceptionForCat<ErrorCategory::SnapshotError>& ex) { + } catch (const ExceptionForCat<ErrorCategory::SnapshotError>&) { // Simple retry on any type of snapshot error. - if (canRetry) { - auto txnRouter = TransactionRouter::get(opCtx); - invariant(txnRouter); - if (txnRouter->canContinueOnSnapshotError()) { - txnRouter->onSnapshotError(); - } else { - // TODO SERVER-36589: Abort the entire transaction. - ex.addContext( - "Encountered snapshot error on subsequent transaction statement"); - throw; - } + // Update transaction tracking state for a possible retry. Throws if the transaction + // cannot continue. + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter->onSnapshotError(); + // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws. + uassert(ErrorCodes::NoSuchTransaction, + str::stream() << "Transaction " << opCtx->getTxnNumber() + << " was aborted after " + << kMaxNumStaleVersionRetries + << " failed retries", + canRetry); + } + if (canRetry) { continue; } throw; diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index ac827f7ae13..0f49c9a9f57 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -82,6 +82,8 @@ static const BSONObj kGeoNearDistanceMetaProjection = BSON("$meta" // more than 8 decimal digits since the response is at most 16MB, and 16 * 1024 * 1024 < 1 * 10^8. static const int kPerDocumentOverheadBytesUpperBound = 10; +const char kFindCmdName[] = "find"; + /** * Given the QueryRequest 'qr' being executed by mongos, returns a copy of the query which is * suitable for forwarding to the targeted hosts. @@ -450,6 +452,12 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx, << " on attempt " << retries << " of " << kMaxRetries << ": " << redact(ex); catalogCache->onStaleShardVersion(std::move(routingInfo)); + + if (auto txnRouter = TransactionRouter::get(opCtx)) { + // A transaction can always continue on a stale version error during find because + // the operation must be idempotent. + txnRouter->onStaleShardOrDbError(kFindCmdName); + } } } diff --git a/src/mongo/s/transaction/append_at_cluster_time_test.cpp b/src/mongo/s/transaction/append_at_cluster_time_test.cpp index 442fd513852..9422c3faf46 100644 --- a/src/mongo/s/transaction/append_at_cluster_time_test.cpp +++ b/src/mongo/s/transaction/append_at_cluster_time_test.cpp @@ -85,5 +85,28 @@ TEST(ClusterCommands, AddingAtClusterTimeOverwritesExistingAfterClusterTime) { ASSERT_BSONOBJ_EQ(expectedCommand, newCommand); } +// Adding atClusterTime overwrites an existing afterClusterTime and will add level "snapshot" if it +// is not there. +TEST(ClusterCommands, AddingAtClusterTimeAddsLevelSnapshotIfNotThere) { + const auto existingAfterClusterTime = Timestamp(1, 1); + BSONObj command = BSON("aggregate" + << "testColl" + << "readConcern" + << BSON("afterClusterTime" << existingAfterClusterTime)); + + const auto computedAtClusterTime = Timestamp(2, 1); + BSONObj expectedCommand = BSON("aggregate" + << "testColl" + << "readConcern" + << BSON("level" + << "snapshot" + << "atClusterTime" + << computedAtClusterTime)); + + BSONObj newCommand = + at_cluster_time_util::appendAtClusterTime(command, LogicalTime(computedAtClusterTime)); + ASSERT_BSONOBJ_EQ(expectedCommand, newCommand); +} + } // namespace } // namespace mongo diff --git a/src/mongo/s/transaction/at_cluster_time_util.cpp b/src/mongo/s/transaction/at_cluster_time_util.cpp index 1264a785047..63f54d5a054 100644 --- a/src/mongo/s/transaction/at_cluster_time_util.cpp +++ b/src/mongo/s/transaction/at_cluster_time_util.cpp @@ -40,6 +40,20 @@ namespace mongo { namespace at_cluster_time_util { +namespace { +const char kReadConcernLevelSnapshotName[] = "snapshot"; + +LogicalTime _computeAtClusterTime(OperationContext* opCtx, + bool mustRunOnAll, + const std::set<ShardId>& shardIds, + const NamespaceString& nss, + const BSONObj query, + const BSONObj collation) { + // TODO: SERVER-31767 + return LogicalClock::get(opCtx)->getClusterTime(); +} +} // namespace + BSONObj appendAtClusterTime(BSONObj cmdObj, LogicalTime atClusterTime) { BSONObjBuilder cmdAtClusterTimeBob; for (auto el : cmdObj) { @@ -54,6 +68,13 @@ BSONObj appendAtClusterTime(BSONObj cmdObj, LogicalTime atClusterTime) { } } + // Transactions will upconvert a read concern with afterClusterTime but no level to have + // level snapshot, so a command may have a read concern field with no level. + if (!readConcernBob.hasField(repl::ReadConcernArgs::kLevelFieldName)) { + readConcernBob.append(repl::ReadConcernArgs::kLevelFieldName, + kReadConcernLevelSnapshotName); + } + readConcernBob.append(repl::ReadConcernArgs::kAtClusterTimeFieldName, atClusterTime.asTimestamp()); } else { @@ -64,19 +85,6 @@ BSONObj appendAtClusterTime(BSONObj cmdObj, LogicalTime atClusterTime) { return cmdAtClusterTimeBob.obj(); } -namespace { - -LogicalTime _computeAtClusterTime(OperationContext* opCtx, - bool mustRunOnAll, - const std::set<ShardId>& shardIds, - const NamespaceString& nss, - const BSONObj query, - const BSONObj collation) { - // TODO: SERVER-31767 - return LogicalClock::get(opCtx)->getClusterTime(); -} -} - boost::optional<LogicalTime> computeAtClusterTime(OperationContext* opCtx, bool mustRunOnAll, const std::set<ShardId>& shardIds, diff --git a/src/mongo/s/transaction/transaction_router.cpp b/src/mongo/s/transaction/transaction_router.cpp index 5359ffc6542..a9c592a5600 100644 --- a/src/mongo/s/transaction/transaction_router.cpp +++ b/src/mongo/s/transaction/transaction_router.cpp @@ -123,7 +123,11 @@ BSONObj appendReadConcernForTxn(BSONObj cmd, if (cmd.hasField(repl::ReadConcernArgs::kReadConcernFieldName)) { repl::ReadConcernArgs existingReadConcernArgs; dassert(existingReadConcernArgs.initialize(cmd)); - dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel()); + // There may be no read concern level if the user only specified afterClusterTime and the + // transaction provided the default level. + dassert(existingReadConcernArgs.getLevel() == readConcernArgs.getLevel() || + !existingReadConcernArgs.hasLevel()); + return atClusterTime ? at_cluster_time_util::appendAtClusterTime(std::move(cmd), *atClusterTime) : cmd; @@ -148,11 +152,20 @@ BSONObjBuilder appendFieldsForStartTransaction(BSONObj cmd, return bob; } +// Commands that are idempotent in a transaction context and can be blindly retried in the middle of +// a transaction. Aggregate with $out is disallowed in a transaction, so aggregates must be read +// operations. +const StringMap<int> alwaysRetryableCmds = { + {"aggregate", 1}, {"distinct", 1}, {"find", 1}, {"getMore", 1}, {"killCursors", 1}}; + } // unnamed namespace TransactionRouter::Participant::Participant(bool isCoordinator, + StmtId stmtIdCreatedAt, SharedTransactionOptions sharedOptions) - : _isCoordinator(isCoordinator), _sharedOptions(sharedOptions) {} + : _isCoordinator(isCoordinator), + _stmtIdCreatedAt(stmtIdCreatedAt), + _sharedOptions(sharedOptions) {} BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(BSONObj cmd) { auto isTxnCmd = isTransactionCommand(cmd); // check first before moving cmd. @@ -202,6 +215,10 @@ void TransactionRouter::Participant::markAsCommandSent() { } } +StmtId TransactionRouter::Participant::getStmtIdCreatedAt() const { + return _stmtIdCreatedAt; +} + TransactionRouter* TransactionRouter::get(OperationContext* opCtx) { auto& opCtxSession = getRouterSessionRuntimeState(opCtx); if (!opCtxSession) { @@ -260,6 +277,7 @@ TransactionRouter::Participant& TransactionRouter::getOrCreateParticipant(const shard.toString(), TransactionRouter::Participant( isFirstParticipant, + _latestStmtId, SharedTransactionOptions{_txnNumber, _readConcernArgs, _atClusterTime})); return resultPair.first->second; @@ -269,12 +287,58 @@ const LogicalSessionId& TransactionRouter::getSessionId() const { return _sessionId; } -bool TransactionRouter::canContinueOnSnapshotError() const { +bool TransactionRouter::_canContinueOnStaleShardOrDbError(StringData cmdName) const { + // We can always retry on the first overall statement. + if (_latestStmtId == _firstStmtId) { + return true; + } + + if (alwaysRetryableCmds.count(cmdName)) { + return true; + } + + return false; +} + +void TransactionRouter::onStaleShardOrDbError(StringData cmdName) { + // TODO SERVER-37210: Implicitly abort the entire transaction if this uassert throws. + uassert(ErrorCodes::NoSuchTransaction, + "Transaction was aborted due to cluster data placement change", + _canContinueOnStaleShardOrDbError(cmdName)); + + // Remove each participant created at the most recent statement id and add them to the orphaned + // list because the retry attempt isn't guaranteed to retarget them. Participants created + // earlier are already fixed in the participant list, so they should not be removed. + for (auto&& it = _participants.begin(); it != _participants.end();) { + auto participant = it++; + if (participant->second.getStmtIdCreatedAt() == _latestStmtId) { + _orphanedParticipants.try_emplace(participant->first); + _participants.erase(participant); + } + } + + // If there are no more participants, also clear the coordinator id because a new one must be + // chosen by the retry. + if (_participants.empty()) { + _coordinatorId.reset(); + return; + } + + // If this is not the first command, the coordinator must have been chosen and successfully + // contacted in an earlier command, and thus must not be in the orphaned list. + invariant(_coordinatorId); + invariant(_orphanedParticipants.count(*_coordinatorId) == 0); +} + +bool TransactionRouter::_canContinueOnSnapshotError() const { return _latestStmtId == _firstStmtId; } void TransactionRouter::onSnapshotError() { - invariant(canContinueOnSnapshotError()); + // TODO SERVER-37210: Implicitly abort the entire transaction if this uassert throws. + uassert(ErrorCodes::NoSuchTransaction, + "Transaction was aborted due to snapshot error on subsequent transaction statement", + _canContinueOnSnapshotError()); // Add each participant to the orphaned list because the retry attempt isn't guaranteed to // re-target it. diff --git a/src/mongo/s/transaction/transaction_router.h b/src/mongo/s/transaction/transaction_router.h index 1294a82df38..a47a1f92db7 100644 --- a/src/mongo/s/transaction/transaction_router.h +++ b/src/mongo/s/transaction/transaction_router.h @@ -71,7 +71,9 @@ public: */ class Participant { public: - explicit Participant(bool isCoordinator, SharedTransactionOptions sharedOptions); + explicit Participant(bool isCoordinator, + StmtId stmtIdCreatedAt, + SharedTransactionOptions sharedOptions); enum class State { // Next transaction should include startTransaction. @@ -101,9 +103,19 @@ public: */ void markAsCommandSent(); + /** + * Returns the highest statement id of the command during which this participant was + * created. + */ + StmtId getStmtIdCreatedAt() const; + private: State _state{State::kMustStart}; const bool _isCoordinator{false}; + + // The highest statement id of the request during which this participant was created. + const StmtId _stmtIdCreatedAt{kUninitializedStmtId}; + const SharedTransactionOptions _sharedOptions; }; @@ -124,15 +136,15 @@ public: void checkOut(); /** - * Returns true if the current transaction can retry on a snapshot error. This is only true on - * the first command recevied for a transaction. + * Updates the transaction state to allow for a retry of the current command on a stale version + * error. Will throw if the transaction cannot be continued. */ - bool canContinueOnSnapshotError() const; + void onStaleShardOrDbError(StringData cmdName); /** * Resets the transaction state to allow for a retry attempt. This includes clearing all * participants and adding them to the orphaned list, clearing the coordinator, and resetting - * the global read timestamp. + * the global read timestamp. Will throw if the transaction cannot be continued. */ void onSnapshotError(); @@ -194,6 +206,28 @@ private: */ Shard::CommandResponse _commitMultiShardTransaction(OperationContext* opCtx); + /** + * Returns true if the current transaction can retry on a stale version error from a contacted + * shard. This is always true except for an error received by a write that is not the first + * overall statement in the sharded transaction. This is because the entire command will be + * retried, and shards that were not stale and are targeted again may incorrectly execute the + * command a second time. + * + * Note: Even if this method returns true, the retry attempt may still fail, e.g. if one of the + * shards that returned a stale version error was involved in a previously completed a statement + * for this transaction. + * + * TODO SERVER-37207: Change batch writes to retry only the failed writes in a batch, to allow + * retrying writes beyond the first overall statement. + */ + bool _canContinueOnStaleShardOrDbError(StringData cmdName) const; + + /** + * Returns true if the current transaction can retry on a snapshot error. This is only true on + * the first command recevied for a transaction. + */ + bool _canContinueOnSnapshotError() const; + const LogicalSessionId _sessionId; TxnNumber _txnNumber{kUninitializedTxnNumber}; diff --git a/src/mongo/s/transaction/transaction_router_test.cpp b/src/mongo/s/transaction/transaction_router_test.cpp index 3f5e7413c71..f74f3ab0935 100644 --- a/src/mongo/s/transaction/transaction_router_test.cpp +++ b/src/mongo/s/transaction/transaction_router_test.cpp @@ -52,6 +52,8 @@ protected: const ShardId shard2 = ShardId("shard2"); const HostAndPort hostAndPort2 = HostAndPort("shard2:1234"); + const ShardId shard3 = ShardId("shard3"); + void setUp() { ShardingTestFixture::setUp(); configTargeter()->setFindHostReturnValue(kTestConfigShardHost); @@ -833,7 +835,6 @@ TEST_F(TransactionRouterTest, SnapshotErrorsResetAtClusterTime) { LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTime); // Simulate a snapshot error. - ASSERT(txnRouter.canContinueOnSnapshotError()); txnRouter.onSnapshotError(); txnRouter.setAtClusterTimeToLatestTime(operationContext()); @@ -915,7 +916,6 @@ TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) { // Simulate a snapshot error and an internal retry that only re-targets one of the original two // shards. - ASSERT(txnRouter.canContinueOnSnapshotError()); txnRouter.onSnapshotError(); ASSERT_FALSE(txnRouter.getCoordinatorId()); @@ -941,35 +941,220 @@ TEST_F(TransactionRouterTest, SnapshotErrorsAddAllParticipantsToOrphanedList) { } } -TEST_F(TransactionRouterTest, CanOnlyContinueOnSnapshotErrorOnFirstCommand) { +TEST_F(TransactionRouterTest, OnSnapshotErrorThrowsAfterFirstCommand) { TxnNumber txnNum{3}; TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); - ASSERT(txnRouter.canContinueOnSnapshotError()); + // Should not throw. + txnRouter.onSnapshotError(); + + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); + ASSERT_THROWS_CODE( + txnRouter.onSnapshotError(), AssertionException, ErrorCodes::NoSuchTransaction); + + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); + ASSERT_THROWS_CODE( + txnRouter.onSnapshotError(), AssertionException, ErrorCodes::NoSuchTransaction); +} + +TEST_F(TransactionRouterTest, ParticipantsRememberStmtIdCreatedAt) { + TransactionRouter txnRouter({}); + txnRouter.checkOut(); + + TxnNumber txnNum{3}; + txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + + // Transaction 1 contacts shard1 and shard2 during the first command, then shard3 in the second + // command. + + int initialStmtId = 0; + ASSERT_EQ(txnRouter.getOrCreateParticipant(shard1).getStmtIdCreatedAt(), initialStmtId); + ASSERT_EQ(txnRouter.getOrCreateParticipant(shard2).getStmtIdCreatedAt(), initialStmtId); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); - ASSERT_FALSE(txnRouter.canContinueOnSnapshotError()); + + ShardId shard3("shard3"); + ASSERT_EQ(txnRouter.getOrCreateParticipant(shard3).getStmtIdCreatedAt(), initialStmtId + 1); + + ASSERT_EQ(txnRouter.getOrCreateParticipant(shard1).getStmtIdCreatedAt(), initialStmtId); + ASSERT_EQ(txnRouter.getOrCreateParticipant(shard2).getStmtIdCreatedAt(), initialStmtId); + + // Transaction 2 contacts shard3 and shard2 during the first command, then shard1 in the second + // command. + + repl::ReadConcernArgs::get(operationContext()) = + repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern); + TxnNumber txnNum2{5}; + txnRouter.beginOrContinueTxn(operationContext(), txnNum2, true); + + ASSERT_EQ(txnRouter.getOrCreateParticipant(shard3).getStmtIdCreatedAt(), initialStmtId); + ASSERT_EQ(txnRouter.getOrCreateParticipant(shard2).getStmtIdCreatedAt(), initialStmtId); + + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn(operationContext(), txnNum2, false); + + ASSERT_EQ(txnRouter.getOrCreateParticipant(shard1).getStmtIdCreatedAt(), initialStmtId + 1); +} + +TEST_F(TransactionRouterTest, AllParticipantsAndCoordinatorClearedOnStaleErrorOnFirstCommand) { + TxnNumber txnNum{3}; + + TransactionRouter txnRouter({}); + txnRouter.checkOut(); + txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + + // Start a transaction on two shards, selecting one as the coordinator, but simulate a + // re-targeting error from at least one of them. + + txnRouter.getOrCreateParticipant(shard1).markAsCommandSent(); + txnRouter.getOrCreateParticipant(shard2).markAsCommandSent(); + + ASSERT(txnRouter.getCoordinatorId()); + ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1); + + ASSERT(txnRouter.getOrphanedParticipants().empty()); + + // Simulate stale error and internal retry that only re-targets one of the original shards. + + txnRouter.onStaleShardOrDbError("find"); + + ASSERT_FALSE(txnRouter.getCoordinatorId()); + ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U); + + { + auto& participant = txnRouter.getOrCreateParticipant(shard2); + ASSERT(participant.mustStartTransaction()); + participant.markAsCommandSent(); + ASSERT_FALSE(participant.mustStartTransaction()); + } + + // There is a new coordinator and shard1 is still in the orphaned list. + ASSERT(txnRouter.getCoordinatorId()); + ASSERT_EQ(*txnRouter.getCoordinatorId(), shard2); + ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 1U); + ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 1U); + + // Shard1 has not started a transaction. + ASSERT(txnRouter.getOrCreateParticipant(shard1).mustStartTransaction()); +} + +TEST_F(TransactionRouterTest, OnlyNewlyCreatedParticipantsAddedToOrphanedListOnStaleError) { + TxnNumber txnNum{3}; + + TransactionRouter txnRouter({}); + txnRouter.checkOut(); + txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + + // First statement successfully targets one shard, selecing it as the coordinator. + + txnRouter.getOrCreateParticipant(shard1).markAsCommandSent(); + + ASSERT(txnRouter.getCoordinatorId()); + ASSERT_EQ(*txnRouter.getCoordinatorId(), shard1); + + ASSERT(txnRouter.getOrphanedParticipants().empty()); + + // Start a subsequent statement that targets two new shards and encounters a stale error from at + // least one of them. repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); - ASSERT_FALSE(txnRouter.canContinueOnSnapshotError()); + + txnRouter.getOrCreateParticipant(shard2).markAsCommandSent(); + txnRouter.getOrCreateParticipant(shard3).markAsCommandSent(); + + txnRouter.onStaleShardOrDbError("find"); + + // Only the two new shards are in the orphaned list. + ASSERT_EQ(txnRouter.getOrphanedParticipants().size(), 2U); + ASSERT_EQ(txnRouter.getOrphanedParticipants().count(shard1), 0U); + + // Shards 2 and 3 must start a transaction, but shard 1 must not. + ASSERT_FALSE(txnRouter.getOrCreateParticipant(shard1).mustStartTransaction()); + ASSERT(txnRouter.getOrCreateParticipant(shard2).mustStartTransaction()); + ASSERT(txnRouter.getOrCreateParticipant(shard3).mustStartTransaction()); +} + +TEST_F(TransactionRouterTest, RetryOnStaleErrorCannotPickNewAtClusterTime) { + TxnNumber txnNum{3}; + + TransactionRouter txnRouter({}); + txnRouter.checkOut(); + txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + + txnRouter.setAtClusterTimeToLatestTime(operationContext()); + + BSONObj expectedReadConcern = BSON("level" + << "snapshot" + << "atClusterTime" + << kInMemoryLogicalTime.asTimestamp()); + + { + auto& participant = txnRouter.getOrCreateParticipant(shard1); + auto newCmd = participant.attachTxnFieldsIfNeeded(BSON("find" + << "test")); + ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); + participant.markAsCommandSent(); + } + + // Advance the latest time in the logical clock, simulate a stale config/db error, and verify + // the retry attempt cannot pick a new atClusterTime. + LogicalTime laterTime(Timestamp(1000, 1)); + ASSERT_GT(laterTime, kInMemoryLogicalTime); + LogicalClock::get(operationContext())->setClusterTimeFromTrustedSource(laterTime); + + txnRouter.onStaleShardOrDbError("find"); + + txnRouter.setAtClusterTimeToLatestTime(operationContext()); + + { + auto& participant = txnRouter.getOrCreateParticipant(shard1); + auto newCmd = participant.attachTxnFieldsIfNeeded(BSON("find" + << "test")); + ASSERT_BSONOBJ_EQ(expectedReadConcern, newCmd["readConcern"].Obj()); + } } -DEATH_TEST_F(TransactionRouterTest, CannotCallOnSnapshotErrorAfterFirstCommand, "invariant") { +TEST_F(TransactionRouterTest, WritesCanOnlyBeRetriedIfFirstOverallCommand) { + auto writeCmds = {"insert", "update", "delete", "findAndModify", "findandmodify"}; + auto otherCmds = {"find", "distinct", "aggregate", "killCursors", "getMore"}; + TxnNumber txnNum{3}; TransactionRouter txnRouter({}); txnRouter.checkOut(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, true); + txnRouter.getOrCreateParticipant(shard1).markAsCommandSent(); + + for (auto writeCmd : writeCmds) { + txnRouter.onStaleShardOrDbError(writeCmd); // Should not throw. + } + + for (auto cmd : otherCmds) { + txnRouter.onStaleShardOrDbError(cmd); // Should not throw. + } + + // Advance to the next command. + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); txnRouter.beginOrContinueTxn(operationContext(), txnNum, false); - txnRouter.onSnapshotError(); + for (auto writeCmd : writeCmds) { + ASSERT_THROWS_CODE(txnRouter.onStaleShardOrDbError(writeCmd), + AssertionException, + ErrorCodes::NoSuchTransaction); + } + + for (auto cmd : otherCmds) { + txnRouter.onStaleShardOrDbError(cmd); // Should not throw. + } } } // unnamed namespace |