diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-09-28 14:12:16 -0400 |
---|---|---|
committer | Jack Mulrow <jack.mulrow@mongodb.com> | 2018-10-09 09:39:08 -0400 |
commit | d2d7dbadcc008a484218321666aae44b75964787 (patch) | |
tree | 6d5167a57fd3d446144d8dc08f078dfbbfbd52f6 | |
parent | 1e03955cdab995fed6672d75a6a4544a9771a279 (diff) | |
download | mongo-d2d7dbadcc008a484218321666aae44b75964787.tar.gz |
SERVER-37210 Mongos should implicitly abort transactions on unhandled errors
20 files changed, 467 insertions, 73 deletions
diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml index 71d3c2b60f6..30dd78acf48 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards.yml @@ -46,6 +46,7 @@ selector: - jstests/sharding/shard_collection_existing_zones.js - jstests/sharding/snapshot_cursor_commands_mongos.js - jstests/sharding/transactions_error_labels.js + - jstests/sharding/transactions_implicit_abort.js - jstests/sharding/transactions_snapshot_errors_first_statement.js - jstests/sharding/transactions_snapshot_errors_subsequent_statements.js - jstests/sharding/transactions_stale_database_version_errors.js diff --git a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml index 7de29355f8b..9bee9c8a4ed 100644 --- a/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml +++ b/buildscripts/resmokeconfig/suites/sharding_last_stable_mongos_and_mixed_shards_misc.yml @@ -361,6 +361,7 @@ selector: - jstests/sharding/shard_collection_existing_zones.js - jstests/sharding/snapshot_cursor_commands_mongos.js - jstests/sharding/transactions_error_labels.js + - jstests/sharding/transactions_implicit_abort.js - jstests/sharding/transactions_snapshot_errors_first_statement.js - jstests/sharding/transactions_snapshot_errors_subsequent_statements.js - jstests/sharding/transactions_stale_database_version_errors.js diff --git a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 index 0b5e09f4249..aaedbeeb4f9 100644 --- a/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 +++ b/buildscripts/templates/generate_resmoke_suites/sharding_last_stable_mongos_and_mixed_shards.yml.j2 @@ -56,6 +56,7 @@ selector: - jstests/sharding/shard_collection_existing_zones.js - jstests/sharding/snapshot_cursor_commands_mongos.js - jstests/sharding/transactions_error_labels.js + - jstests/sharding/transactions_implicit_abort.js - jstests/sharding/transactions_snapshot_errors_first_statement.js - jstests/sharding/transactions_snapshot_errors_subsequent_statements.js - jstests/sharding/transactions_stale_database_version_errors.js diff --git a/jstests/sharding/libs/sharded_transactions_helpers.js b/jstests/sharding/libs/sharded_transactions_helpers.js index 422bdac250d..d1b4a82bce0 100644 --- a/jstests/sharding/libs/sharded_transactions_helpers.js +++ b/jstests/sharding/libs/sharded_transactions_helpers.js @@ -19,3 +19,22 @@ function unsetFailCommandOnEachShard(st, numShards) { shardConn.adminCommand({configureFailPoint: "failCommand", mode: "off"})); } } + +function assertNoSuchTransactionOnAllShards(st, lsid, txnNumber) { + st._rs.forEach(function(rs) { + assertNoSuchTransactionOnConn(rs.test.getPrimary(), lsid, txnNumber); + }); +} + +function assertNoSuchTransactionOnConn(conn, lsid, txnNumber) { + assert.commandFailedWithCode(conn.getDB("foo").runCommand({ + find: "bar", + lsid: lsid, + txnNumber: NumberLong(txnNumber), + autocommit: false, + }), + ErrorCodes.NoSuchTransaction, + "expected there to be no active transaction on shard, lsid: " + + tojson(lsid) + ", txnNumber: " + tojson(txnNumber) + + ", connection: " + tojson(conn)); +} diff --git a/jstests/sharding/transactions_implicit_abort.js b/jstests/sharding/transactions_implicit_abort.js new file mode 100644 index 00000000000..4b9b7f48515 --- /dev/null +++ b/jstests/sharding/transactions_implicit_abort.js @@ -0,0 +1,61 @@ +// Verifies mongos will implicitly abort a transaction on all involved shards on a transaction fatal +// error. +// +// @tags: [requires_sharding, uses_transactions, uses_multi_shard_transaction] +(function() { + "use strict"; + + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + + const dbName = "test"; + const collName = "foo"; + const ns = dbName + '.' + collName; + + const st = new ShardingTest({shards: 2, mongos: 1, config: 1}); + + // Set up a sharded collection with one chunk on each shard. + + assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: -1}, {writeConcern: {w: "majority"}})); + assert.writeOK(st.s.getDB(dbName)[collName].insert({_id: 1}, {writeConcern: {w: "majority"}})); + + assert.commandWorked(st.s.adminCommand({enableSharding: dbName})); + st.ensurePrimaryShard(dbName, st.shard0.shardName); + + assert.commandWorked(st.s.adminCommand({shardCollection: ns, key: {_id: 1}})); + assert.commandWorked(st.s.adminCommand({split: ns, middle: {_id: 0}})); + assert.commandWorked( + st.s.adminCommand({moveChunk: ns, find: {_id: 1}, to: st.shard1.shardName})); + + const session = st.s.startSession(); + const sessionDB = session.getDatabase(dbName); + + // + // An unhandled error during a transaction should try to abort it on all participants. + // + + session.startTransaction(); + + // Targets Shard0 successfully. + assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: -1}})); + + assert.commandWorked(st.rs1.getPrimary().adminCommand({ + configureFailPoint: "failCommand", + mode: "alwaysOn", + data: {errorCode: ErrorCodes.InternalError, failCommands: ["find"]} + })); + + // Targets Shard1 and encounters a transaction fatal error. + assert.commandFailedWithCode(sessionDB.runCommand({find: collName, filter: {_id: 1}}), + ErrorCodes.InternalError); + + assert.commandWorked( + st.rs1.getPrimary().adminCommand({configureFailPoint: "failCommand", mode: "off"})); + + // The transaction should have been aborted on both shards. + assertNoSuchTransactionOnAllShards( + st, session.getSessionId(), session.getTxnNumber_forTesting()); + assert.commandFailedWithCode(session.abortTransaction_forTesting(), + ErrorCodes.NoSuchTransaction); + + st.stop(); +})(); diff --git a/jstests/sharding/transactions_snapshot_errors_first_statement.js b/jstests/sharding/transactions_snapshot_errors_first_statement.js index 7d8e3dabeb2..67516b024e6 100644 --- a/jstests/sharding/transactions_snapshot_errors_first_statement.js +++ b/jstests/sharding/transactions_snapshot_errors_first_statement.js @@ -114,9 +114,13 @@ ErrorCodes.NoSuchTransaction); assert.eq(res.errorLabels, ["TransientTransactionError"]); - session.abortTransaction(); - unsetFailCommandOnEachShard(st, numShardsToError); + + assertNoSuchTransactionOnAllShards( + st, session.getSessionId(), session.getTxnNumber_forTesting()); + + assert.commandFailedWithCode(session.abortTransaction_forTesting(), + ErrorCodes.NoSuchTransaction); } } @@ -162,7 +166,7 @@ // Test only one shard throwing the error when more than one are targeted. for (let errorCode of kSnapshotErrors) { - runTest(st, collName, 1, errorCode, 2); + runTest(st, collName, 1, errorCode, true); } st.stop(); diff --git a/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js b/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js index 28a6e861fee..1ff45f75302 100644 --- a/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js +++ b/jstests/sharding/transactions_snapshot_errors_subsequent_statements.js @@ -59,7 +59,10 @@ ErrorCodes.NoSuchTransaction); assert.eq(res.errorLabels, ["TransientTransactionError"]); - session.abortTransaction(); + assertNoSuchTransactionOnAllShards( + st, session.getSessionId(), session.getTxnNumber_forTesting()); + assert.commandFailedWithCode(session.abortTransaction_forTesting(), + ErrorCodes.NoSuchTransaction); } } diff --git a/jstests/sharding/transactions_stale_database_version_errors.js b/jstests/sharding/transactions_stale_database_version_errors.js index 6c3329c5400..a4a2db4d294 100644 --- a/jstests/sharding/transactions_stale_database_version_errors.js +++ b/jstests/sharding/transactions_stale_database_version_errors.js @@ -4,6 +4,8 @@ (function() { "use strict"; + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + const dbName = "test"; const collName = "foo"; @@ -50,7 +52,10 @@ ErrorCodes.NoSuchTransaction); assert.eq(res.errorLabels, ["TransientTransactionError"]); - session.abortTransaction(); + assertNoSuchTransactionOnAllShards( + st, session.getSessionId(), session.getTxnNumber_forTesting()); + assert.commandFailedWithCode(session.abortTransaction_forTesting(), + ErrorCodes.NoSuchTransaction); // // Stale database version on first command to a new shard should succeed. @@ -91,6 +96,7 @@ // st.ensurePrimaryShard(dbName, st.shard0.shardName); + st.ensurePrimaryShard(otherDbName, st.shard1.shardName); // Disable database metadata refreshes on the stale shard so it will indefinitely return a stale // version error. @@ -99,13 +105,20 @@ session.startTransaction(); + // Target Shard1, to verify the transaction on it is implicitly aborted later. + assert.commandWorked(sessionOtherDB.runCommand({find: otherCollName})); + // Target the first database which is on Shard0. The shard is stale and won't refresh its // metadata, so mongos should exhaust its retries and implicitly abort the transaction. assert.commandFailedWithCode( sessionDB.runCommand({distinct: collName, key: "_id", query: {_id: 0}}), ErrorCodes.NoSuchTransaction); - session.abortTransaction(); + // Verify all shards aborted the transaction. + assertNoSuchTransactionOnAllShards( + st, session.getSessionId(), session.getTxnNumber_forTesting()); + assert.commandFailedWithCode(session.abortTransaction_forTesting(), + ErrorCodes.NoSuchTransaction); assert.commandWorked(st.rs0.getPrimary().adminCommand( {configureFailPoint: "skipDatabaseVersionMetadataRefresh", mode: "off"})); diff --git a/jstests/sharding/transactions_stale_shard_version_errors.js b/jstests/sharding/transactions_stale_shard_version_errors.js index 7e796ca80a0..0db7eb517ea 100644 --- a/jstests/sharding/transactions_stale_shard_version_errors.js +++ b/jstests/sharding/transactions_stale_shard_version_errors.js @@ -4,6 +4,8 @@ (function() { "use strict"; + load("jstests/sharding/libs/sharded_transactions_helpers.js"); + function expectChunks(st, ns, chunks) { for (let i = 0; i < chunks.length; i++) { assert.eq(chunks[i], @@ -101,7 +103,10 @@ ErrorCodes.NoSuchTransaction); assert.eq(res.errorLabels, ["TransientTransactionError"]); - session.abortTransaction(); + assertNoSuchTransactionOnAllShards( + st, session.getSessionId(), session.getTxnNumber_forTesting()); + assert.commandFailedWithCode(session.abortTransaction_forTesting(), + ErrorCodes.NoSuchTransaction); // // Stale shard version on first command to a new shard should succeed. @@ -237,12 +242,19 @@ session.startTransaction(); - // Targets Shard0, which is stale and won't refresh its metadata, so mongos should exhaust its - // retries and implicitly abort the transaction. - assert.commandFailedWithCode(sessionDB.runCommand({find: collName, filter: {_id: -5}}), + // Target Shard2, to verify the transaction on it is aborted implicitly later. + assert.commandWorked(sessionDB.runCommand({find: collName, filter: {_id: 5}})); + + // Targets all shards. Shard0 is stale and won't refresh its metadata, so mongos should exhaust + // its retries and implicitly abort the transaction. + assert.commandFailedWithCode(sessionDB.runCommand({find: collName}), ErrorCodes.NoSuchTransaction); - session.abortTransaction(); + // Verify the shards that did not return an error were also aborted. + assertNoSuchTransactionOnAllShards( + st, session.getSessionId(), session.getTxnNumber_forTesting()); + assert.commandFailedWithCode(session.abortTransaction_forTesting(), + ErrorCodes.NoSuchTransaction); assert.commandWorked(st.rs0.getPrimary().adminCommand( {configureFailPoint: "skipShardFilteringMetadataRefresh", mode: "off"})); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 50b21051fdc..4dafd3c009b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -825,7 +825,7 @@ env.Library( '$BUILD_DIR/mongo/base', ], LIBDEPS_PRIVATE=[ - 'handle_request_response', + 'shared_request_handling', 'introspect', 'lasterror', 'query_exec', @@ -1431,9 +1431,10 @@ env.Library( ) env.Library( - target='handle_request_response', + target='shared_request_handling', source=[ 'handle_request_response.cpp', + 'transaction_validation.cpp', ], LIBDEPS=[ 'logical_session_cache_impl', diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index eb22b1a129b..0dcb4b89dbe 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -73,6 +73,7 @@ #include "mongo/db/stats/counters.h" #include "mongo/db/stats/top.h" #include "mongo/db/transaction_participant.h" +#include "mongo/db/transaction_validation.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/rpc/message.h" @@ -462,14 +463,9 @@ bool runCommandImpl(OperationContext* opCtx, } } else { auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body)); - uassert(ErrorCodes::InvalidOptions, - "writeConcern is not allowed within a multi-statement transaction", - wcResult.usedDefault || !txnParticipant || - !txnParticipant->inMultiDocumentTransaction() || - invocation->definition()->getName() == "commitTransaction" || - invocation->definition()->getName() == "abortTransaction" || - invocation->definition()->getName() == "prepareTransaction" || - invocation->definition()->getName() == "doTxn"); + if (txnParticipant && txnParticipant->inMultiDocumentTransaction()) { + validateWriteConcernForTransaction(wcResult, invocation->definition()->getName()); + } auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp new file mode 100644 index 00000000000..2f26313d848 --- /dev/null +++ b/src/mongo/db/transaction_validation.cpp @@ -0,0 +1,45 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/transaction_validation.h" + +#include "mongo/db/write_concern_options.h" + +namespace mongo { + +void validateWriteConcernForTransaction(const WriteConcernOptions& wcResult, StringData cmdName) { + uassert(ErrorCodes::InvalidOptions, + "writeConcern is not allowed within a multi-statement transaction", + wcResult.usedDefault || cmdName == "commitTransaction" || + cmdName == "abortTransaction" || cmdName == "prepareTransaction" || + cmdName == "doTxn"); +} + +} // namespace mongo diff --git a/src/mongo/db/transaction_validation.h b/src/mongo/db/transaction_validation.h new file mode 100644 index 00000000000..0eff9b8a156 --- /dev/null +++ b/src/mongo/db/transaction_validation.h @@ -0,0 +1,40 @@ +/** + * Copyright (C) 2018 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include "mongo/db/write_concern_options.h" + +namespace mongo { + +/** + * Throws if the given write concern is not allowed in a transaction. + */ +void validateWriteConcernForTransaction(const WriteConcernOptions& wcResult, StringData cmdName); + +} // namespace mongo diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index 0d72376173e..b1892e0a43e 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -104,7 +104,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/commands/write_commands_common', '$BUILD_DIR/mongo/db/ftdc/ftdc_server', - '$BUILD_DIR/mongo/db/handle_request_response', + '$BUILD_DIR/mongo/db/shared_request_handling', '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/pipeline/aggregation', '$BUILD_DIR/mongo/db/stats/counters', diff --git a/src/mongo/s/commands/cluster_command_test_fixture.cpp b/src/mongo/s/commands/cluster_command_test_fixture.cpp index e9c44a154ef..884bba83ee0 100644 --- a/src/mongo/s/commands/cluster_command_test_fixture.cpp +++ b/src/mongo/s/commands/cluster_command_test_fixture.cpp @@ -158,9 +158,17 @@ void ClusterCommandTestFixture::runCommandInspectRequests(BSONObj cmd, future.timed_get(kFutureTimeout); } -void ClusterCommandTestFixture::runCommandMaxErrors(BSONObj cmd, - ErrorCodes::Error code, - bool isTargeted) { +void ClusterCommandTestFixture::expectAbortTransaction() { + onCommandForPoolExecutor([](const executor::RemoteCommandRequest& request) { + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "abortTransaction"); + return BSON("ok" << 1); + }); +} + +void ClusterCommandTestFixture::runTxnCommandMaxErrors(BSONObj cmd, + ErrorCodes::Error code, + bool isTargeted) { auto future = launchAsync([&] { runCommand(cmd); }); size_t numRetries = @@ -169,6 +177,13 @@ void ClusterCommandTestFixture::runCommandMaxErrors(BSONObj cmd, expectReturnsError(code); } + // In a transaction, each targeted shard is sent abortTransaction when the router exhausts its + // retries. + size_t numTargetedShards = isTargeted ? 1 : 2; + for (size_t i = 0; i < numTargetedShards; i++) { + expectAbortTransaction(); + } + future.timed_get(kFutureTimeout); } @@ -199,13 +214,13 @@ void ClusterCommandTestFixture::testRetryOnSnapshotError(BSONObj targetedCmd, void ClusterCommandTestFixture::testMaxRetriesSnapshotErrors(BSONObj targetedCmd, BSONObj scatterGatherCmd) { // Target one shard. - runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true); - runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true); + runTxnCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true); + runTxnCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true); // Target all shards if (!scatterGatherCmd.isEmpty()) { - runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false); - runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false); + runTxnCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false); + runTxnCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false); } } diff --git a/src/mongo/s/commands/cluster_command_test_fixture.h b/src/mongo/s/commands/cluster_command_test_fixture.h index 71cc2f2e830..2f40d06128b 100644 --- a/src/mongo/s/commands/cluster_command_test_fixture.h +++ b/src/mongo/s/commands/cluster_command_test_fixture.h @@ -55,6 +55,8 @@ protected: void expectReturnsError(ErrorCodes::Error code); + void expectAbortTransaction(); + DbResponse runCommand(BSONObj cmd); void runCommandSuccessful(BSONObj cmd, bool isTargeted); @@ -63,7 +65,7 @@ protected: void runCommandInspectRequests(BSONObj cmd, InspectionCallback cb, bool isTargeted); - void runCommandMaxErrors(BSONObj cmd, ErrorCodes::Error code, bool isTargeted); + void runTxnCommandMaxErrors(BSONObj cmd, ErrorCodes::Error code, bool isTargeted); /** * Verifies that running the given commands through mongos will succeed. diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 99f0d18a807..55081a9a04b 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -57,6 +57,7 @@ #include "mongo/db/query/getmore_request.h" #include "mongo/db/query/query_request.h" #include "mongo/db/stats/counters.h" +#include "mongo/db/transaction_validation.h" #include "mongo/db/views/resolved_view.h" #include "mongo/rpc/factory.h" #include "mongo/rpc/get_status_from_command_result.h" @@ -150,6 +151,44 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res } } +/** + * Invokes the given command and aborts the transaction on any non-retryable errors. + */ +void invokeInTransactionRouter(OperationContext* opCtx, + CommandInvocation* invocation, + TransactionRouter* txnRouter, + rpc::ReplyBuilderInterface* result) { + try { + invocation->run(opCtx, result); + } catch (const DBException& e) { + if (ErrorCodes::isSnapshotError(e.code()) || + ErrorCodes::isNeedRetargettingError(e.code()) || + e.code() == ErrorCodes::StaleDbVersion) { + // Don't abort on possibly retryable errors. + throw; + } + + txnRouter->implicitlyAbortTransaction(opCtx); + throw; + } +} + +/** + * Throws NoSuchTransaction if canRetry is false. + */ +void handleCanRetryInTransaction(OperationContext* opCtx, + TransactionRouter* txnRouter, + bool canRetry, + const DBException& ex) { + if (!canRetry) { + uasserted(ErrorCodes::NoSuchTransaction, + str::stream() << "Transaction " << opCtx->getTxnNumber() << " was aborted after " + << kMaxNumStaleVersionRetries + << " failed retries. The latest attempt failed with: " + << ex.toStatus()); + } +} + void execCommandClient(OperationContext* opCtx, CommandInvocation* invocation, const OpMsgRequest& request, @@ -200,16 +239,10 @@ void execCommandClient(OperationContext* opCtx, globalOpCounters.gotCommand(); } - StatusWith<WriteConcernOptions> wcResult = - WriteConcernOptions::extractWCFromCommand(request.body); - if (!wcResult.isOK()) { - auto body = result->getBodyBuilder(); - CommandHelpers::appendCommandStatusNoThrow(body, wcResult.getStatus()); - return; - } + auto wcResult = uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body)); bool supportsWriteConcern = invocation->supportsWriteConcern(); - if (!supportsWriteConcern && !wcResult.getValue().usedDefault) { + if (!supportsWriteConcern && !wcResult.usedDefault) { // This command doesn't do writes so it should not be passed a writeConcern. // If we did not use the default writeConcern, one was provided when it shouldn't have // been by the user. @@ -219,6 +252,10 @@ void execCommandClient(OperationContext* opCtx, return; } + if (TransactionRouter::get(opCtx)) { + validateWriteConcernForTransaction(wcResult, c->getName()); + } + auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { @@ -269,20 +306,34 @@ void execCommandClient(OperationContext* opCtx, return; } + auto txnRouter = TransactionRouter::get(opCtx); if (!supportsWriteConcern) { - invocation->run(opCtx, result); + if (txnRouter) { + invokeInTransactionRouter(opCtx, invocation, txnRouter, result); + } else { + invocation->run(opCtx, result); + } } else { // Change the write concern while running the command. const auto oldWC = opCtx->getWriteConcern(); ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); }); - opCtx->setWriteConcern(wcResult.getValue()); + opCtx->setWriteConcern(wcResult); - invocation->run(opCtx, result); + if (txnRouter) { + invokeInTransactionRouter(opCtx, invocation, txnRouter, result); + } else { + invocation->run(opCtx, result); + } } + auto body = result->getBodyBuilder(); bool ok = CommandHelpers::extractOrAppendOk(body); if (!ok) { c->incrementCommandsFailed(); + + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter->implicitlyAbortTransaction(opCtx); + } } } @@ -406,18 +457,14 @@ void runCommand(OperationContext* opCtx, Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs); - // Update transaction tracking state for a possible retry. Throws if the transaction - // cannot continue. + // Update transaction tracking state for a possible retry. Throws and aborts the + // transaction if it cannot continue. if (auto txnRouter = TransactionRouter::get(opCtx)) { + auto abortGuard = + MakeGuard([&] { txnRouter->implicitlyAbortTransaction(opCtx); }); + handleCanRetryInTransaction(opCtx, txnRouter, canRetry, ex); txnRouter->onStaleShardOrDbError(commandName); - // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws. - uassert(ErrorCodes::NoSuchTransaction, - str::stream() << "Transaction " << opCtx->getTxnNumber() - << " was aborted after " - << kMaxNumStaleVersionRetries - << " failed retries. The latest attempt failed with: " - << ex.toStatus(), - canRetry); + abortGuard.Dismiss(); } if (canRetry) { @@ -429,18 +476,14 @@ void runCommand(OperationContext* opCtx, Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(ex->getDb(), ex->getVersionReceived()); - // Update transaction tracking state for a possible retry. Throws if the transaction - // cannot continue. + // Update transaction tracking state for a possible retry. Throws and aborts the + // transaction if it cannot continue. if (auto txnRouter = TransactionRouter::get(opCtx)) { + auto abortGuard = + MakeGuard([&] { txnRouter->implicitlyAbortTransaction(opCtx); }); + handleCanRetryInTransaction(opCtx, txnRouter, canRetry, ex); txnRouter->onStaleShardOrDbError(commandName); - // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws. - uassert(ErrorCodes::NoSuchTransaction, - str::stream() << "Transaction " << opCtx->getTxnNumber() - << " was aborted after " - << kMaxNumStaleVersionRetries - << " failed retries. The latest attempt failed with: " - << ex.toStatus(), - canRetry); + abortGuard.Dismiss(); } if (canRetry) { @@ -450,18 +493,14 @@ void runCommand(OperationContext* opCtx, } catch (const ExceptionForCat<ErrorCategory::SnapshotError>& ex) { // Simple retry on any type of snapshot error. - // Update transaction tracking state for a possible retry. Throws if the transaction - // cannot continue. + // Update transaction tracking state for a possible retry. Throws and aborts the + // transaction if it cannot continue. if (auto txnRouter = TransactionRouter::get(opCtx)) { + auto abortGuard = + MakeGuard([&] { txnRouter->implicitlyAbortTransaction(opCtx); }); + handleCanRetryInTransaction(opCtx, txnRouter, canRetry, ex); txnRouter->onSnapshotError(); - // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws. - uassert(ErrorCodes::NoSuchTransaction, - str::stream() << "Transaction " << opCtx->getTxnNumber() - << " was aborted after " - << kMaxNumStaleVersionRetries - << " failed retries. The latest attempt failed with: " - << ex.toStatus(), - canRetry); + abortGuard.Dismiss(); } if (canRetry) { diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index 5b8e1c09b4d..3b44b615eab 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -357,7 +357,6 @@ bool TransactionRouter::_canContinueOnStaleShardOrDbError(StringData cmdName) co } void TransactionRouter::onStaleShardOrDbError(StringData cmdName) { - // TODO SERVER-37210: Implicitly abort the entire transaction if this uassert throws. uassert(ErrorCodes::NoSuchTransaction, "Transaction was aborted due to cluster data placement change", _canContinueOnStaleShardOrDbError(cmdName)); @@ -381,7 +380,6 @@ bool TransactionRouter::_canContinueOnSnapshotError() const { } void TransactionRouter::onSnapshotError() { - // TODO SERVER-37210: Implicitly abort the entire transaction if this uassert throws. uassert(ErrorCodes::NoSuchTransaction, "Transaction was aborted due to snapshot error on subsequent transaction statement", _canContinueOnSnapshotError()); @@ -636,4 +634,16 @@ ScopedRouterSession::~ScopedRouterSession() { RouterSessionCatalog::get(_opCtx)->checkInSessionState(opCtxSession->getSessionId()); } +void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx) { + if (_participants.empty()) { + return; + } + + try { + abortTransaction(opCtx); + } catch (...) { + // Ignore any exceptions. + } +} + } // namespace mongo diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index 770a7cb7c6d..3020db842c3 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -186,6 +186,12 @@ public: std::vector<AsyncRequestsSender::Response> abortTransaction(OperationContext* opCtx); /** + * Sends abort to all shards in the current participant list. Will retry on retryable errors, + * but ignores the responses from each shard. + */ + void implicitlyAbortTransaction(OperationContext* opCtx); + + /** * Extract the runtimne state attached to the operation context. Returns nullptr if none is * attached. */ diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 6dcc8795f4f..b0da4b30af0 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -1241,5 +1241,130 @@ TEST_F(TransactionRouterTest, OnViewResolutionErrorClearsAllNewParticipants) { ASSERT_TRUE(secondShardCmd["startTransaction"].trueValue()); } +TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) { + TxnNumber txnNum{3}; + + auto opCtx = operationContext(); + opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + opCtx->setTxnNumber(txnNum); + ScopedRouterSession scopedSession(opCtx); + + auto txnRouter = TransactionRouter::get(opCtx); + txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + + // Should not throw. + txnRouter->implicitlyAbortTransaction(opCtx); +} + +TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) { + LogicalSessionId lsid(makeLogicalSessionIdForTest()); + TxnNumber txnNum{3}; + + auto opCtx = operationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setTxnNumber(txnNum); + + ScopedRouterSession scopedSession(opCtx); + auto txnRouter = TransactionRouter::get(opCtx); + + txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->attachTxnFieldsIfNeeded(shard1, {}); + + auto future = + launchAsync([&] { return txnRouter->implicitlyAbortTransaction(operationContext()); }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(hostAndPort1, request.target); + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "abortTransaction"); + + checkSessionDetails(request.cmdObj, lsid, txnNum, true); + + return BSON("ok" << 1); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) { + LogicalSessionId lsid(makeLogicalSessionIdForTest()); + TxnNumber txnNum{3}; + + auto opCtx = operationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setTxnNumber(txnNum); + + ScopedRouterSession scopedSession(opCtx); + auto txnRouter = TransactionRouter::get(opCtx); + + txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->attachTxnFieldsIfNeeded(shard1, {}); + txnRouter->attachTxnFieldsIfNeeded(shard2, {}); + + auto future = + launchAsync([&] { return txnRouter->implicitlyAbortTransaction(operationContext()); }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(hostAndPort1, request.target); + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "abortTransaction"); + + checkSessionDetails(request.cmdObj, lsid, txnNum, true); + + return BSON("ok" << 1); + }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(hostAndPort2, request.target); + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "abortTransaction"); + + checkSessionDetails(request.cmdObj, lsid, txnNum, boost::none); + + return BSON("ok" << 1); + }); + + future.timed_get(kFutureTimeout); +} + +TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) { + LogicalSessionId lsid(makeLogicalSessionIdForTest()); + TxnNumber txnNum{3}; + + auto opCtx = operationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setTxnNumber(txnNum); + + ScopedRouterSession scopedSession(opCtx); + auto txnRouter = TransactionRouter::get(opCtx); + + txnRouter->beginOrContinueTxn(opCtx, txnNum, true); + txnRouter->attachTxnFieldsIfNeeded(shard1, {}); + + auto future = + launchAsync([&] { return txnRouter->implicitlyAbortTransaction(operationContext()); }); + + onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { + ASSERT_EQ(hostAndPort1, request.target); + ASSERT_EQ("admin", request.dbname); + + auto cmdName = request.cmdObj.firstElement().fieldNameStringData(); + ASSERT_EQ(cmdName, "abortTransaction"); + + checkSessionDetails(request.cmdObj, lsid, txnNum, true); + + return BSON("ok" << 0); + }); + + // Shouldn't throw. + future.timed_get(kFutureTimeout); +} + } // unnamed namespace } // namespace mongo |