diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2022-03-14 14:09:10 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-03-30 22:43:17 +0000 |
commit | 2c77d92ddcd9b1157cd13fb97dd3580b67e205a1 (patch) | |
tree | 3b7d6ff508c28e7f4da46d3447e488d2c74073ce | |
parent | 7ceeed142005460b81efc2f1d534f8fbcf8a1f65 (diff) | |
download | mongo-2c77d92ddcd9b1157cd13fb97dd3580b67e205a1.tar.gz |
SERVER-63495 Support running cluster commands through the transaction API
-rw-r--r-- | jstests/replsets/db_reads_while_recovering_all_commands.js | 1 | ||||
-rw-r--r-- | jstests/sharding/transaction_api_distributed_from_shard.js | 110 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 17 | ||||
-rw-r--r-- | src/mongo/db/cluster_transaction_api.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/cluster_transaction_api.h | 51 | ||||
-rw-r--r-- | src/mongo/db/commands.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.cpp | 26 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.h | 47 | ||||
-rw-r--r-- | src/mongo/db/transaction_api_test.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/transaction_validation.cpp | 14 | ||||
-rw-r--r-- | src/mongo/s/commands/SConscript | 8 | ||||
-rw-r--r-- | src/mongo/s/commands/internal_transactions_test_commands.cpp | 61 | ||||
-rw-r--r-- | src/mongo/s/commands/internal_transactions_test_commands.idl | 31 | ||||
-rw-r--r-- | src/mongo/s/service_entry_point_mongos.cpp | 9 | ||||
-rw-r--r-- | src/mongo/s/service_entry_point_mongos.h | 4 |
16 files changed, 448 insertions, 26 deletions
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js index c10fe0d74d6..5c09d2361b6 100644 --- a/jstests/replsets/db_reads_while_recovering_all_commands.js +++ b/jstests/replsets/db_reads_while_recovering_all_commands.js @@ -348,6 +348,7 @@ const allCommands = { stopRecordingTraffic: {skip: isNotAUserDataRead}, testDeprecation: {skip: isNotAUserDataRead}, testDeprecationInVersion2: {skip: isNotAUserDataRead}, + testInternalTransactions: {skip: isNotAUserDataRead}, testRemoval: {skip: isNotAUserDataRead}, testReshardCloneCollection: {skip: isNotAUserDataRead}, testVersions1And2: {skip: isNotAUserDataRead}, diff --git a/jstests/sharding/transaction_api_distributed_from_shard.js b/jstests/sharding/transaction_api_distributed_from_shard.js new file mode 100644 index 00000000000..e44fca1850e --- /dev/null +++ b/jstests/sharding/transaction_api_distributed_from_shard.js @@ -0,0 +1,110 @@ +/** + * Tests that the transaction API can be used for distributed transactions initiated from a shard. + * + * @tags: [requires_fcv_53, featureFlagInternalTransactions] + */ +(function() { +"use strict"; + +// The test command is meant to test the "no session" transaction API case. +TestData.disableImplicitSessions = true; + +const st = new ShardingTest({shards: 2, config: 1}); +const shard0Primary = st.rs0.getPrimary(); + +const kDbName = "foo"; +const kCollName = "bar"; +const kNs = kDbName + "." + kCollName; + +function runTestSuccess() { + const commands = [ + {dbName: kDbName, command: {find: kCollName, singleBatch: true}}, + {dbName: kDbName, command: {insert: kCollName, documents: [{_id: 2}, {_id: 3}]}}, + { + dbName: kDbName, + command: {update: kCollName, updates: [{q: {_id: 2}, u: {$set: {updated: true}}}]} + }, + {dbName: kDbName, command: {delete: kCollName, deletes: [{q: {_id: 3}, limit: 1}]}}, + {dbName: kDbName, command: {find: kCollName, singleBatch: true}}, + ]; + + // Insert initial data. + assert.commandWorked(st.s.getCollection(kNs).insert([{_id: 1}])); + + const res = assert.commandWorked( + shard0Primary.adminCommand({testInternalTransactions: 1, commandInfos: commands})); + res.responses.forEach((innerRes) => { + assert.commandWorked(innerRes, tojson(res)); + }); + + assert.eq(res.responses.length, commands.length, tojson(res)); + assert.sameMembers(res.responses[0].cursor.firstBatch, [{_id: 1}], tojson(res)); + assert.eq(res.responses[1], {n: 2, ok: 1}, tojson(res)); + assert.eq(res.responses[2], {nModified: 1, n: 1, ok: 1}, tojson(res)); + assert.eq(res.responses[3], {n: 1, ok: 1}, tojson(res)); + assert.sameMembers( + res.responses[4].cursor.firstBatch, [{_id: 1}, {_id: 2, updated: true}], tojson(res)); + + // The written documents should be visible outside the transaction. + assert.sameMembers(st.s.getCollection(kNs).find().toArray(), + [{_id: 1}, {_id: 2, updated: true}]); + + // Clean up. + assert.commandWorked(st.s.getCollection(kNs).remove({}, false /* justOne */)); +} + +function runTestFailure() { + const commands = [ + {dbName: kDbName, command: {insert: kCollName, documents: [{_id: 2}, {_id: 3}]}}, + {dbName: kDbName, command: {find: kCollName, singleBatch: true}}, + // clusterCount does not exist, so the API will reject this command without running it. This + // will still abort the transaction. + {dbName: kDbName, command: {count: kCollName}}, + ]; + + // Insert initial data. + assert.commandWorked(st.s.getCollection(kNs).insert([{_id: 1}])); + + const res = assert.commandWorked( + shard0Primary.adminCommand({testInternalTransactions: 1, commandInfos: commands})); + // The clusterCount is rejected without being run, so expect one fewer response. + assert.eq(res.responses.length, commands.length - 1, tojson(res)); + + assert.commandWorked(res.responses[0], tojson(res)); + assert.eq(res.responses[0], {n: 2, ok: 1}, tojson(res)); + + assert.commandWorked(res.responses[1], tojson(res)); + assert.sameMembers( + res.responses[1].cursor.firstBatch, [{_id: 1}, {_id: 2}, {_id: 3}], tojson(res)); + + // Verify the API didn't insert any documents. + assert.sameMembers(st.s.getCollection(kNs).find().toArray(), [{_id: 1}]); + + // Clean up. + assert.commandWorked(st.s.getCollection(kNs).remove({}, false /* justOne */)); +} + +// +// Unsharded collection case. +// + +runTestSuccess(); +runTestFailure(); + +// +// Sharded collection case. +// + +assert.commandWorked(st.s.adminCommand({enableSharding: kDbName})); +st.ensurePrimaryShard(kDbName, st.shard0.shardName); +assert.commandWorked(st.s.getCollection(kNs).createIndex({x: 1})); +assert.commandWorked(st.s.adminCommand({shardCollection: kNs, key: {x: 1}})); + +assert.commandWorked(st.s.adminCommand({split: kNs, middle: {x: 0}})); +assert.commandWorked(st.s.adminCommand({moveChunk: kNs, find: {x: 0}, to: st.shard1.shardName})); + +runTestSuccess(); +runTestFailure(); + +st.stop(); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 8ce61596ac0..7b6a56bfe8e 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -902,6 +902,23 @@ env.Library( ) env.Library( + target='cluster_transaction_api', + source=[ + 'cluster_transaction_api.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/rpc/rpc', + '$BUILD_DIR/mongo/s/startup_initialization', + '$BUILD_DIR/mongo/transport/service_entry_point', + 'logical_session_id', + 'logical_session_id_helpers', + 'service_context', + 'shared_request_handling', + 'transaction_api', + ], +) + +env.Library( target='dbdirectclient', source=[ 'dbdirectclient.cpp', diff --git a/src/mongo/db/cluster_transaction_api.cpp b/src/mongo/db/cluster_transaction_api.cpp new file mode 100644 index 00000000000..770d5bad816 --- /dev/null +++ b/src/mongo/db/cluster_transaction_api.cpp @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/cluster_transaction_api.h" + +#include <fmt/format.h> + +#include "mongo/executor/task_executor.h" +#include "mongo/rpc/factory.h" +#include "mongo/rpc/op_msg_rpc_impls.h" +#include "mongo/rpc/reply_interface.h" +#include "mongo/stdx/future.h" + +namespace mongo::txn_api::details { + +namespace { + +StringMap<std::string> clusterCommandTranslations = { + {"abortTransaction", "clusterAbortTransaction"}, + {"commitTransaction", "clusterCommitTransaction"}, + {"delete", "clusterDelete"}, + {"insert", "clusterInsert"}, + {"update", "clusterUpdate"}, + {"find", "clusterFind"}}; + +BSONObj replaceCommandNameWithClusterCommandName(BSONObj cmdObj) { + auto cmdName = cmdObj.firstElement().fieldNameStringData(); + auto newNameIt = clusterCommandTranslations.find(cmdName); + uassert(6349501, + "Cannot use unsupported command {} with cluster transaction API"_format(cmdName), + newNameIt != clusterCommandTranslations.end()); + + return cmdObj.replaceFieldNames(BSON(newNameIt->second << 1)); +} + +} // namespace + +BSONObj ClusterSEPTransactionClientBehaviors::maybeModifyCommand(BSONObj cmdObj) const { + return replaceCommandNameWithClusterCommandName(cmdObj); +} + +Future<DbResponse> ClusterSEPTransactionClientBehaviors::handleRequest( + OperationContext* opCtx, const Message& request) const { + return ServiceEntryPointMongos::handleRequestImpl(opCtx, request); +} + +} // namespace mongo::txn_api::details diff --git a/src/mongo/db/cluster_transaction_api.h b/src/mongo/db/cluster_transaction_api.h new file mode 100644 index 00000000000..45135cde65d --- /dev/null +++ b/src/mongo/db/cluster_transaction_api.h @@ -0,0 +1,51 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/transaction_api.h" +#include "mongo/s/service_entry_point_mongos.h" + +namespace mongo::txn_api::details { + +/** + * Behaviors for running cluster commands from a non-router process, ie mongod. + */ +class ClusterSEPTransactionClientBehaviors : public SEPTransactionClientBehaviors { +public: + ClusterSEPTransactionClientBehaviors(ServiceContext* service) {} + + BSONObj maybeModifyCommand(BSONObj cmdObj) const override; + + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) const override; +}; + +} // namespace mongo::txn_api::details diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp index d404795f413..d1df81deb92 100644 --- a/src/mongo/db/commands.cpp +++ b/src/mongo/db/commands.cpp @@ -116,9 +116,16 @@ bool checkAuthorizationImplPreParse(OperationContext* opCtx, return false; } +// TODO SERVER-65101: Replace this with a property on each command. // The command names that are allowed in a multi-document transaction. const StringMap<int> txnCmdAllowlist = {{"abortTransaction", 1}, {"aggregate", 1}, + {"clusterAbortTransaction", 1}, + {"clusterCommitTransaction", 1}, + {"clusterDelete", 1}, + {"clusterFind", 1}, + {"clusterInsert", 1}, + {"clusterUpdate", 1}, {"commitTransaction", 1}, {"coordinateCommitTransaction", 1}, {"create", 1}, diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index d77627eb367..1ef790b7756 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -429,6 +429,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/commands/txn_cmd_request', '$BUILD_DIR/mongo/db/fle_crud', + '$BUILD_DIR/mongo/db/internal_transactions_feature_flag', '$BUILD_DIR/mongo/db/multitenancy', '$BUILD_DIR/mongo/db/repl/primary_only_service', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', @@ -438,6 +439,7 @@ env.Library( '$BUILD_DIR/mongo/db/timeseries/timeseries_collmod', '$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util', '$BUILD_DIR/mongo/db/timeseries/timeseries_options', + '$BUILD_DIR/mongo/db/transaction_api', '$BUILD_DIR/mongo/idl/cluster_server_parameter', '$BUILD_DIR/mongo/s/commands/cluster_commands_common', '$BUILD_DIR/mongo/s/commands/sharded_cluster_sharding_commands', diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp index 41b96703688..cbb2a879d64 100644 --- a/src/mongo/db/transaction_api.cpp +++ b/src/mongo/db/transaction_api.cpp @@ -249,10 +249,16 @@ void primeInternalClientAndOpCtx(Client* client, OperationContext* opCtx) { } } -SemiFuture<BSONObj> SEPTransactionClient::runCommand(StringData dbName, BSONObj cmdObj) const { - BSONObjBuilder cmdBuilder(std::move(cmdObj)); +Future<DbResponse> DefaultSEPTransactionClientBehaviors::handleRequest( + OperationContext* opCtx, const Message& request) const { + auto serviceEntryPoint = opCtx->getServiceContext()->getServiceEntryPoint(); + return serviceEntryPoint->handleRequest(opCtx, request); +} +SemiFuture<BSONObj> SEPTransactionClient::runCommand(StringData dbName, BSONObj cmdObj) const { invariant(_hooks, "Transaction metadata hooks must be injected before a command can be run"); + + BSONObjBuilder cmdBuilder(_behaviors->maybeModifyCommand(std::move(cmdObj))); _hooks->runRequestHook(&cmdBuilder); invariant(!haveClient()); @@ -261,10 +267,9 @@ SemiFuture<BSONObj> SEPTransactionClient::runCommand(StringData dbName, BSONObj auto cancellableOpCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); primeInternalClientAndOpCtx(&cc(), cancellableOpCtx.get()); - auto sep = cc().getServiceContext()->getServiceEntryPoint(); auto opMsgRequest = OpMsgRequest::fromDBAndBody(dbName, cmdBuilder.obj()); auto requestMessage = opMsgRequest.serialize(); - return sep->handleRequest(cancellableOpCtx.get(), requestMessage) + return _behaviors->handleRequest(cancellableOpCtx.get(), requestMessage) .then([this](DbResponse dbResponse) { auto reply = rpc::makeReply(&dbResponse.response)->getCommandReply().getOwned(); _hooks->runReplyHook(reply); @@ -485,16 +490,15 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) { // (aka -1), which indicates retry history should not be saved. If statement ids are not // explicitly sent, implicit ids may be inferred, which could lead to bugs if different // commands have the same ids inferred. - uassert( - 6410500, - str::stream() - << "In a retryable write transaction every retryable write command should have an " - "explicit statement id, command: " - << redact(cmdBuilder->asTempObj()), + dassert( !isRetryableWriteCommand( cmdBuilder->asTempObj().firstElement().fieldNameStringData()) || (cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdsFieldName) || - cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdFieldName))); + cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdFieldName)), + str::stream() + << "In a retryable write transaction every retryable write command should have an " + "explicit statement id, command: " + << redact(cmdBuilder->asTempObj())); } stdx::lock_guard<Latch> lg(_mutex); diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h index e7adde45f04..542f020655a 100644 --- a/src/mongo/db/transaction_api.h +++ b/src/mongo/db/transaction_api.h @@ -199,13 +199,52 @@ private: namespace details { /** + * Customization point for behaviors different in the default SEPTransactionClient and the one for + * running distributed transactions. + */ +class SEPTransactionClientBehaviors { +public: + virtual ~SEPTransactionClientBehaviors() {} + + /** + * Makes any necessary modifications to the given command, e.g. changing the name to the + * "cluster" version for the cluster behaviors. + */ + virtual BSONObj maybeModifyCommand(BSONObj cmdObj) const = 0; + + /** + * Returns a future with the result of running the given request. + */ + virtual Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) const = 0; +}; + +/** + * Default behaviors that does not modify commands and runs them against the local process service + * entry point. + */ +class DefaultSEPTransactionClientBehaviors : public SEPTransactionClientBehaviors { +public: + BSONObj maybeModifyCommand(BSONObj cmdObj) const override { + return cmdObj; + } + + Future<DbResponse> handleRequest(OperationContext* opCtx, + const Message& request) const override; +}; + +/** * Default transaction client that runs given commands through the local process service entry * point. */ class SEPTransactionClient : public TransactionClient { public: - SEPTransactionClient(OperationContext* opCtx, ExecutorPtr executor) - : _serviceContext(opCtx->getServiceContext()), _executor(executor) { + SEPTransactionClient(OperationContext* opCtx, + ExecutorPtr executor, + std::unique_ptr<SEPTransactionClientBehaviors> behaviors) + : _serviceContext(opCtx->getServiceContext()), + _executor(executor), + _behaviors(std::move(behaviors)) { _cancelableOpCtxFactory = std::make_unique<CancelableOperationContextFactory>( opCtx->getCancellationToken(), executor); } @@ -233,6 +272,7 @@ public: private: ServiceContext* const _serviceContext; ExecutorPtr _executor; + std::unique_ptr<SEPTransactionClientBehaviors> _behaviors; std::unique_ptr<details::TxnMetadataHooks> _hooks; std::unique_ptr<CancelableOperationContextFactory> _cancelableOpCtxFactory; }; @@ -275,7 +315,8 @@ public: */ Transaction(OperationContext* opCtx, ExecutorPtr executor) : _executor(executor), - _txnClient(std::make_unique<SEPTransactionClient>(opCtx, executor)), + _txnClient(std::make_unique<SEPTransactionClient>( + opCtx, executor, std::make_unique<DefaultSEPTransactionClientBehaviors>())), _service(opCtx->getServiceContext()) { _primeTransaction(opCtx); _txnClient->injectHooks(_makeTxnMetadataHooks()); diff --git a/src/mongo/db/transaction_api_test.cpp b/src/mongo/db/transaction_api_test.cpp index 955c84b348e..ad1e0325d10 100644 --- a/src/mongo/db/transaction_api_test.cpp +++ b/src/mongo/db/transaction_api_test.cpp @@ -29,6 +29,7 @@ #include "mongo/platform/basic.h" +#include "mongo/config.h" #include "mongo/db/error_labels.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" @@ -277,8 +278,8 @@ protected: _threadPool = std::make_shared<ThreadPool>(std::move(options)); _threadPool->startup(); - auto mockClient = - std::make_unique<txn_api::details::MockTransactionClient>(opCtx(), _threadPool); + auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>( + opCtx(), _threadPool, nullptr); _mockClient = mockClient.get(); _txnWithRetries = std::make_unique<txn_api::TransactionWithRetries>( opCtx(), _threadPool, std::move(mockClient), nullptr /* resourceYielder */); @@ -312,7 +313,7 @@ protected: void resetTxnWithRetries(std::unique_ptr<MockResourceYielder> resourceYielder = nullptr) { auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>( - opCtx(), InlineQueuedCountingExecutor::make()); + opCtx(), _threadPool, nullptr); _mockClient = mockClient.get(); if (resourceYielder) { _resourceYielder = resourceYielder.get(); @@ -1286,7 +1287,10 @@ TEST_F(TxnAPITest, ClientRetryableWrite_UsesRetryableInternalSession) { ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd); } -TEST_F(TxnAPITest, ClientRetryableWrite_RetryableWriteWithoutStmtIdFails) { +#ifdef MONGO_CONFIG_DEBUG_BUILD +DEATH_TEST_F(TxnAPITest, + ClientRetryableWrite_RetryableWriteWithoutStmtIdCrashesOnDebug, + "In a retryable write transaction every retryable write command should") { opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); opCtx()->setTxnNumber(5); resetTxnWithRetries(); @@ -1305,6 +1309,7 @@ TEST_F(TxnAPITest, ClientRetryableWrite_RetryableWriteWithoutStmtIdFails) { }); ASSERT_EQ(swResult.getStatus(), ErrorCodes::duplicateCodeForTest(6410500)); } +#endif TEST_F(TxnAPITest, ClientTransaction_UsesClientTransactionOptionsAndDoesNotCommitOnSuccess) { opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp index 5ae750ec2f5..04bc909208e 100644 --- a/src/mongo/db/transaction_validation.cpp +++ b/src/mongo/db/transaction_validation.cpp @@ -45,7 +45,11 @@ using namespace fmt::literals; namespace { -const StringMap<int> retryableWriteCommands = {{"delete", 1}, +// TODO SERVER-65101: Replace this with a property on each command. +const StringMap<int> retryableWriteCommands = {{"clusterDelete", 1}, + {"clusterInsert", 1}, + {"clusterUpdate", 1}, + {"delete", 1}, {"findandmodify", 1}, {"findAndModify", 1}, {"insert", 1}, @@ -63,13 +67,17 @@ const StringMap<int> retryableWriteCommands = {{"delete", 1}, {"_shardsvrCollModParticipant", 1}, {"_shardsvrSetUserWriteBlockMode", 1}}; +// TODO SERVER-65101: Replace this with a property on each command. // Commands that can be sent with session info but should not check out a session. const StringMap<int> skipSessionCheckoutList = { {"coordinateCommitTransaction", 1}, {"_recvChunkStart", 1}, {"replSetStepDown", 1}}; -const StringMap<int> transactionCommands = {{"commitTransaction", 1}, +// TODO SERVER-65101: Replace this with a property on each command. +const StringMap<int> transactionCommands = {{"abortTransaction", 1}, + {"clusterAbortTransaction", 1}, + {"clusterCommitTransaction", 1}, + {"commitTransaction", 1}, {"coordinateCommitTransaction", 1}, - {"abortTransaction", 1}, {"prepareTransaction", 1}}; } // namespace diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript index adbc0a3c03c..df97cd30619 100644 --- a/src/mongo/s/commands/SConscript +++ b/src/mongo/s/commands/SConscript @@ -12,10 +12,16 @@ env.Library( source=[ 'flush_router_config_cmd.cpp', 'get_shard_map_cmd.cpp', + 'internal_transactions_test_commands.cpp', + 'internal_transactions_test_commands.idl', ], LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/cluster_transaction_api', '$BUILD_DIR/mongo/db/commands', + '$BUILD_DIR/mongo/db/transaction_api', '$BUILD_DIR/mongo/s/grid', + '$BUILD_DIR/mongo/s/sharding_router_api', + '$BUILD_DIR/mongo/s/startup_initialization', ] ) @@ -90,11 +96,9 @@ env.Library( 'cluster_validate_db_metadata_cmd.cpp', 'cluster_whats_my_uri_cmd.cpp', 'cluster_write_cmd_s.cpp', - 'internal_transactions_test_commands.cpp', 'kill_sessions_remote.cpp', 's_read_write_concern_defaults_server_status.cpp', 'cluster_commands.idl', - 'internal_transactions_test_commands.idl', 'shard_collection.idl', ], LIBDEPS_PRIVATE=[ diff --git a/src/mongo/s/commands/internal_transactions_test_commands.cpp b/src/mongo/s/commands/internal_transactions_test_commands.cpp index 81ac508f06e..9e827f8d35c 100644 --- a/src/mongo/s/commands/internal_transactions_test_commands.cpp +++ b/src/mongo/s/commands/internal_transactions_test_commands.cpp @@ -26,9 +26,17 @@ * exception statement from all source files in the program, then also delete * it in the license file. */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand + #include "mongo/db/auth/authorization_session.h" +#include "mongo/db/cluster_transaction_api.h" #include "mongo/db/commands.h" +#include "mongo/db/transaction_api.h" +#include "mongo/logv2/log.h" #include "mongo/s/commands/internal_transactions_test_commands_gen.h" +#include "mongo/s/grid.h" +#include "mongo/s/transaction_router_resource_yielder.h" namespace mongo { namespace { @@ -41,7 +49,56 @@ public: public: using InvocationBase::InvocationBase; - void typedRun(OperationContext* opCtx){}; + TestInternalTransactionsReply typedRun(OperationContext* opCtx) { + Grid::get(opCtx)->assertShardingIsInitialized(); + + auto fixedExec = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto txn = txn_api::TransactionWithRetries( + opCtx, + fixedExec, + std::make_unique<txn_api::details::SEPTransactionClient>( + opCtx, + fixedExec, + std::make_unique<txn_api::details::ClusterSEPTransactionClientBehaviors>( + opCtx->getServiceContext())), + TransactionRouterResourceYielder::makeForLocalHandoff()); + + struct SharedBlock { + SharedBlock(std::vector<TestInternalTransactionsCommandInfo> commandInfos_) + : commandInfos(commandInfos_) {} + + std::vector<TestInternalTransactionsCommandInfo> commandInfos; + std::vector<BSONObj> responses; + }; + auto sharedBlock = std::make_shared<SharedBlock>(request().getCommandInfos()); + + // Swallow errors and let clients inspect the responses array to determine success / + // failure. + (void)txn.runSyncNoThrow( + opCtx, + [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + for (const auto& commandInfo : sharedBlock->commandInfos) { + const auto& dbName = commandInfo.getDbName(); + const auto& command = commandInfo.getCommand(); + auto assertSucceeds = commandInfo.getAssertSucceeds(); + + auto res = txnClient.runCommand(dbName, command).get(); + sharedBlock->responses.emplace_back( + CommandHelpers::filterCommandReplyForPassthrough( + res.removeField("recoveryToken"))); + + if (assertSucceeds) { + // Note this only inspects the top level ok field for non-write + // commands. + uassertStatusOK(getStatusFromWriteCommandReply(res)); + } + } + + return SemiFuture<void>::makeReady(); + }); + + return TestInternalTransactionsReply(std::move(sharedBlock->responses)); + }; NamespaceString ns() const override { return NamespaceString(request().getDbName(), ""); @@ -64,6 +121,8 @@ public: return "Internal command for testing internal transactions"; } + // This command can use the transaction API to run commands on different databases, so a single + // user database doesn't apply and we restrict this to only the admin database. bool adminOnly() const override { return true; } diff --git a/src/mongo/s/commands/internal_transactions_test_commands.idl b/src/mongo/s/commands/internal_transactions_test_commands.idl index 846351967d3..e34d4adbbb9 100644 --- a/src/mongo/s/commands/internal_transactions_test_commands.idl +++ b/src/mongo/s/commands/internal_transactions_test_commands.idl @@ -32,9 +32,40 @@ global: imports: - "mongo/idl/basic_types.idl" +structs: + TestInternalTransactionsReply: + description: "Response for testInternalTransactions command" + strict: false + fields: + responses: + type: array<object> + + TestInternalTransactionsCommandInfo: + description: "A command, its database name, and other test options" + strict: false + fields: + dbName: + type: string + command: + type: object + assertSucceeds: + type: bool + default: true + commands: testInternalTransactions: command_name: testInternalTransactions description: "The 'testInternalTransactions' command." namespace: ignored api_version: "" + fields: + useClusterClient: + description: "Whether the transaction API client used should opt into running the + 'cluster' versions of commands that enables a non-router node to run + the router versions of commands. Only meaningful on mongod because a + mongos will always run 'cluster' commands." + type: bool + default: false + commandInfos: + type: array<TestInternalTransactionsCommandInfo> + reply_type: TestInternalTransactionsReply diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp index 89e71ea5e05..43e2d5213f0 100644 --- a/src/mongo/s/service_entry_point_mongos.cpp +++ b/src/mongo/s/service_entry_point_mongos.cpp @@ -202,12 +202,17 @@ Future<DbResponse> HandleRequest::run() { return future; } -Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, - const Message& message) noexcept { +Future<DbResponse> ServiceEntryPointMongos::handleRequestImpl(OperationContext* opCtx, + const Message& message) noexcept { auto hr = std::make_shared<HandleRequest>(opCtx, message); return hr->run(); } +Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCtx, + const Message& message) noexcept { + return handleRequestImpl(opCtx, message); +} + void ServiceEntryPointMongos::onClientConnect(Client* client) { if (load_balancer_support::isFromLoadBalancer(client)) { _loadBalancedConnections.increment(); diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h index c821ab184ef..c5c6530d2a9 100644 --- a/src/mongo/s/service_entry_point_mongos.h +++ b/src/mongo/s/service_entry_point_mongos.h @@ -44,6 +44,10 @@ class ServiceEntryPointMongos final : public ServiceEntryPointImpl { public: using ServiceEntryPointImpl::ServiceEntryPointImpl; + + static Future<DbResponse> handleRequestImpl(OperationContext* opCtx, + const Message& request) noexcept; + Future<DbResponse> handleRequest(OperationContext* opCtx, const Message& request) noexcept override; |