summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-03-14 14:09:10 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-30 22:43:17 +0000
commit2c77d92ddcd9b1157cd13fb97dd3580b67e205a1 (patch)
tree3b7d6ff508c28e7f4da46d3447e488d2c74073ce
parent7ceeed142005460b81efc2f1d534f8fbcf8a1f65 (diff)
downloadmongo-2c77d92ddcd9b1157cd13fb97dd3580b67e205a1.tar.gz
SERVER-63495 Support running cluster commands through the transaction API
-rw-r--r--jstests/replsets/db_reads_while_recovering_all_commands.js1
-rw-r--r--jstests/sharding/transaction_api_distributed_from_shard.js110
-rw-r--r--src/mongo/db/SConscript17
-rw-r--r--src/mongo/db/cluster_transaction_api.cpp73
-rw-r--r--src/mongo/db/cluster_transaction_api.h51
-rw-r--r--src/mongo/db/commands.cpp7
-rw-r--r--src/mongo/db/s/SConscript2
-rw-r--r--src/mongo/db/transaction_api.cpp26
-rw-r--r--src/mongo/db/transaction_api.h47
-rw-r--r--src/mongo/db/transaction_api_test.cpp13
-rw-r--r--src/mongo/db/transaction_validation.cpp14
-rw-r--r--src/mongo/s/commands/SConscript8
-rw-r--r--src/mongo/s/commands/internal_transactions_test_commands.cpp61
-rw-r--r--src/mongo/s/commands/internal_transactions_test_commands.idl31
-rw-r--r--src/mongo/s/service_entry_point_mongos.cpp9
-rw-r--r--src/mongo/s/service_entry_point_mongos.h4
16 files changed, 448 insertions, 26 deletions
diff --git a/jstests/replsets/db_reads_while_recovering_all_commands.js b/jstests/replsets/db_reads_while_recovering_all_commands.js
index c10fe0d74d6..5c09d2361b6 100644
--- a/jstests/replsets/db_reads_while_recovering_all_commands.js
+++ b/jstests/replsets/db_reads_while_recovering_all_commands.js
@@ -348,6 +348,7 @@ const allCommands = {
stopRecordingTraffic: {skip: isNotAUserDataRead},
testDeprecation: {skip: isNotAUserDataRead},
testDeprecationInVersion2: {skip: isNotAUserDataRead},
+ testInternalTransactions: {skip: isNotAUserDataRead},
testRemoval: {skip: isNotAUserDataRead},
testReshardCloneCollection: {skip: isNotAUserDataRead},
testVersions1And2: {skip: isNotAUserDataRead},
diff --git a/jstests/sharding/transaction_api_distributed_from_shard.js b/jstests/sharding/transaction_api_distributed_from_shard.js
new file mode 100644
index 00000000000..e44fca1850e
--- /dev/null
+++ b/jstests/sharding/transaction_api_distributed_from_shard.js
@@ -0,0 +1,110 @@
+/**
+ * Tests that the transaction API can be used for distributed transactions initiated from a shard.
+ *
+ * @tags: [requires_fcv_53, featureFlagInternalTransactions]
+ */
+(function() {
+"use strict";
+
+// The test command is meant to test the "no session" transaction API case.
+TestData.disableImplicitSessions = true;
+
+const st = new ShardingTest({shards: 2, config: 1});
+const shard0Primary = st.rs0.getPrimary();
+
+const kDbName = "foo";
+const kCollName = "bar";
+const kNs = kDbName + "." + kCollName;
+
+function runTestSuccess() {
+ const commands = [
+ {dbName: kDbName, command: {find: kCollName, singleBatch: true}},
+ {dbName: kDbName, command: {insert: kCollName, documents: [{_id: 2}, {_id: 3}]}},
+ {
+ dbName: kDbName,
+ command: {update: kCollName, updates: [{q: {_id: 2}, u: {$set: {updated: true}}}]}
+ },
+ {dbName: kDbName, command: {delete: kCollName, deletes: [{q: {_id: 3}, limit: 1}]}},
+ {dbName: kDbName, command: {find: kCollName, singleBatch: true}},
+ ];
+
+ // Insert initial data.
+ assert.commandWorked(st.s.getCollection(kNs).insert([{_id: 1}]));
+
+ const res = assert.commandWorked(
+ shard0Primary.adminCommand({testInternalTransactions: 1, commandInfos: commands}));
+ res.responses.forEach((innerRes) => {
+ assert.commandWorked(innerRes, tojson(res));
+ });
+
+ assert.eq(res.responses.length, commands.length, tojson(res));
+ assert.sameMembers(res.responses[0].cursor.firstBatch, [{_id: 1}], tojson(res));
+ assert.eq(res.responses[1], {n: 2, ok: 1}, tojson(res));
+ assert.eq(res.responses[2], {nModified: 1, n: 1, ok: 1}, tojson(res));
+ assert.eq(res.responses[3], {n: 1, ok: 1}, tojson(res));
+ assert.sameMembers(
+ res.responses[4].cursor.firstBatch, [{_id: 1}, {_id: 2, updated: true}], tojson(res));
+
+ // The written documents should be visible outside the transaction.
+ assert.sameMembers(st.s.getCollection(kNs).find().toArray(),
+ [{_id: 1}, {_id: 2, updated: true}]);
+
+ // Clean up.
+ assert.commandWorked(st.s.getCollection(kNs).remove({}, false /* justOne */));
+}
+
+function runTestFailure() {
+ const commands = [
+ {dbName: kDbName, command: {insert: kCollName, documents: [{_id: 2}, {_id: 3}]}},
+ {dbName: kDbName, command: {find: kCollName, singleBatch: true}},
+ // clusterCount does not exist, so the API will reject this command without running it. This
+ // will still abort the transaction.
+ {dbName: kDbName, command: {count: kCollName}},
+ ];
+
+ // Insert initial data.
+ assert.commandWorked(st.s.getCollection(kNs).insert([{_id: 1}]));
+
+ const res = assert.commandWorked(
+ shard0Primary.adminCommand({testInternalTransactions: 1, commandInfos: commands}));
+ // The clusterCount is rejected without being run, so expect one fewer response.
+ assert.eq(res.responses.length, commands.length - 1, tojson(res));
+
+ assert.commandWorked(res.responses[0], tojson(res));
+ assert.eq(res.responses[0], {n: 2, ok: 1}, tojson(res));
+
+ assert.commandWorked(res.responses[1], tojson(res));
+ assert.sameMembers(
+ res.responses[1].cursor.firstBatch, [{_id: 1}, {_id: 2}, {_id: 3}], tojson(res));
+
+ // Verify the API didn't insert any documents.
+ assert.sameMembers(st.s.getCollection(kNs).find().toArray(), [{_id: 1}]);
+
+ // Clean up.
+ assert.commandWorked(st.s.getCollection(kNs).remove({}, false /* justOne */));
+}
+
+//
+// Unsharded collection case.
+//
+
+runTestSuccess();
+runTestFailure();
+
+//
+// Sharded collection case.
+//
+
+assert.commandWorked(st.s.adminCommand({enableSharding: kDbName}));
+st.ensurePrimaryShard(kDbName, st.shard0.shardName);
+assert.commandWorked(st.s.getCollection(kNs).createIndex({x: 1}));
+assert.commandWorked(st.s.adminCommand({shardCollection: kNs, key: {x: 1}}));
+
+assert.commandWorked(st.s.adminCommand({split: kNs, middle: {x: 0}}));
+assert.commandWorked(st.s.adminCommand({moveChunk: kNs, find: {x: 0}, to: st.shard1.shardName}));
+
+runTestSuccess();
+runTestFailure();
+
+st.stop();
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 8ce61596ac0..7b6a56bfe8e 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -902,6 +902,23 @@ env.Library(
)
env.Library(
+ target='cluster_transaction_api',
+ source=[
+ 'cluster_transaction_api.cpp',
+ ],
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/rpc/rpc',
+ '$BUILD_DIR/mongo/s/startup_initialization',
+ '$BUILD_DIR/mongo/transport/service_entry_point',
+ 'logical_session_id',
+ 'logical_session_id_helpers',
+ 'service_context',
+ 'shared_request_handling',
+ 'transaction_api',
+ ],
+)
+
+env.Library(
target='dbdirectclient',
source=[
'dbdirectclient.cpp',
diff --git a/src/mongo/db/cluster_transaction_api.cpp b/src/mongo/db/cluster_transaction_api.cpp
new file mode 100644
index 00000000000..770d5bad816
--- /dev/null
+++ b/src/mongo/db/cluster_transaction_api.cpp
@@ -0,0 +1,73 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/cluster_transaction_api.h"
+
+#include <fmt/format.h>
+
+#include "mongo/executor/task_executor.h"
+#include "mongo/rpc/factory.h"
+#include "mongo/rpc/op_msg_rpc_impls.h"
+#include "mongo/rpc/reply_interface.h"
+#include "mongo/stdx/future.h"
+
+namespace mongo::txn_api::details {
+
+namespace {
+
+StringMap<std::string> clusterCommandTranslations = {
+ {"abortTransaction", "clusterAbortTransaction"},
+ {"commitTransaction", "clusterCommitTransaction"},
+ {"delete", "clusterDelete"},
+ {"insert", "clusterInsert"},
+ {"update", "clusterUpdate"},
+ {"find", "clusterFind"}};
+
+BSONObj replaceCommandNameWithClusterCommandName(BSONObj cmdObj) {
+ auto cmdName = cmdObj.firstElement().fieldNameStringData();
+ auto newNameIt = clusterCommandTranslations.find(cmdName);
+ uassert(6349501,
+ "Cannot use unsupported command {} with cluster transaction API"_format(cmdName),
+ newNameIt != clusterCommandTranslations.end());
+
+ return cmdObj.replaceFieldNames(BSON(newNameIt->second << 1));
+}
+
+} // namespace
+
+BSONObj ClusterSEPTransactionClientBehaviors::maybeModifyCommand(BSONObj cmdObj) const {
+ return replaceCommandNameWithClusterCommandName(cmdObj);
+}
+
+Future<DbResponse> ClusterSEPTransactionClientBehaviors::handleRequest(
+ OperationContext* opCtx, const Message& request) const {
+ return ServiceEntryPointMongos::handleRequestImpl(opCtx, request);
+}
+
+} // namespace mongo::txn_api::details
diff --git a/src/mongo/db/cluster_transaction_api.h b/src/mongo/db/cluster_transaction_api.h
new file mode 100644
index 00000000000..45135cde65d
--- /dev/null
+++ b/src/mongo/db/cluster_transaction_api.h
@@ -0,0 +1,51 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/transaction_api.h"
+#include "mongo/s/service_entry_point_mongos.h"
+
+namespace mongo::txn_api::details {
+
+/**
+ * Behaviors for running cluster commands from a non-router process, ie mongod.
+ */
+class ClusterSEPTransactionClientBehaviors : public SEPTransactionClientBehaviors {
+public:
+ ClusterSEPTransactionClientBehaviors(ServiceContext* service) {}
+
+ BSONObj maybeModifyCommand(BSONObj cmdObj) const override;
+
+ Future<DbResponse> handleRequest(OperationContext* opCtx,
+ const Message& request) const override;
+};
+
+} // namespace mongo::txn_api::details
diff --git a/src/mongo/db/commands.cpp b/src/mongo/db/commands.cpp
index d404795f413..d1df81deb92 100644
--- a/src/mongo/db/commands.cpp
+++ b/src/mongo/db/commands.cpp
@@ -116,9 +116,16 @@ bool checkAuthorizationImplPreParse(OperationContext* opCtx,
return false;
}
+// TODO SERVER-65101: Replace this with a property on each command.
// The command names that are allowed in a multi-document transaction.
const StringMap<int> txnCmdAllowlist = {{"abortTransaction", 1},
{"aggregate", 1},
+ {"clusterAbortTransaction", 1},
+ {"clusterCommitTransaction", 1},
+ {"clusterDelete", 1},
+ {"clusterFind", 1},
+ {"clusterInsert", 1},
+ {"clusterUpdate", 1},
{"commitTransaction", 1},
{"coordinateCommitTransaction", 1},
{"create", 1},
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index d77627eb367..1ef790b7756 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -429,6 +429,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/commands/txn_cmd_request',
'$BUILD_DIR/mongo/db/fle_crud',
+ '$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
'$BUILD_DIR/mongo/db/multitenancy',
'$BUILD_DIR/mongo/db/repl/primary_only_service',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
@@ -438,6 +439,7 @@ env.Library(
'$BUILD_DIR/mongo/db/timeseries/timeseries_collmod',
'$BUILD_DIR/mongo/db/timeseries/timeseries_conversion_util',
'$BUILD_DIR/mongo/db/timeseries/timeseries_options',
+ '$BUILD_DIR/mongo/db/transaction_api',
'$BUILD_DIR/mongo/idl/cluster_server_parameter',
'$BUILD_DIR/mongo/s/commands/cluster_commands_common',
'$BUILD_DIR/mongo/s/commands/sharded_cluster_sharding_commands',
diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp
index 41b96703688..cbb2a879d64 100644
--- a/src/mongo/db/transaction_api.cpp
+++ b/src/mongo/db/transaction_api.cpp
@@ -249,10 +249,16 @@ void primeInternalClientAndOpCtx(Client* client, OperationContext* opCtx) {
}
}
-SemiFuture<BSONObj> SEPTransactionClient::runCommand(StringData dbName, BSONObj cmdObj) const {
- BSONObjBuilder cmdBuilder(std::move(cmdObj));
+Future<DbResponse> DefaultSEPTransactionClientBehaviors::handleRequest(
+ OperationContext* opCtx, const Message& request) const {
+ auto serviceEntryPoint = opCtx->getServiceContext()->getServiceEntryPoint();
+ return serviceEntryPoint->handleRequest(opCtx, request);
+}
+SemiFuture<BSONObj> SEPTransactionClient::runCommand(StringData dbName, BSONObj cmdObj) const {
invariant(_hooks, "Transaction metadata hooks must be injected before a command can be run");
+
+ BSONObjBuilder cmdBuilder(_behaviors->maybeModifyCommand(std::move(cmdObj)));
_hooks->runRequestHook(&cmdBuilder);
invariant(!haveClient());
@@ -261,10 +267,9 @@ SemiFuture<BSONObj> SEPTransactionClient::runCommand(StringData dbName, BSONObj
auto cancellableOpCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
primeInternalClientAndOpCtx(&cc(), cancellableOpCtx.get());
- auto sep = cc().getServiceContext()->getServiceEntryPoint();
auto opMsgRequest = OpMsgRequest::fromDBAndBody(dbName, cmdBuilder.obj());
auto requestMessage = opMsgRequest.serialize();
- return sep->handleRequest(cancellableOpCtx.get(), requestMessage)
+ return _behaviors->handleRequest(cancellableOpCtx.get(), requestMessage)
.then([this](DbResponse dbResponse) {
auto reply = rpc::makeReply(&dbResponse.response)->getCommandReply().getOwned();
_hooks->runReplyHook(reply);
@@ -485,16 +490,15 @@ void Transaction::prepareRequest(BSONObjBuilder* cmdBuilder) {
// (aka -1), which indicates retry history should not be saved. If statement ids are not
// explicitly sent, implicit ids may be inferred, which could lead to bugs if different
// commands have the same ids inferred.
- uassert(
- 6410500,
- str::stream()
- << "In a retryable write transaction every retryable write command should have an "
- "explicit statement id, command: "
- << redact(cmdBuilder->asTempObj()),
+ dassert(
!isRetryableWriteCommand(
cmdBuilder->asTempObj().firstElement().fieldNameStringData()) ||
(cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdsFieldName) ||
- cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdFieldName)));
+ cmdBuilder->hasField(write_ops::WriteCommandRequestBase::kStmtIdFieldName)),
+ str::stream()
+ << "In a retryable write transaction every retryable write command should have an "
+ "explicit statement id, command: "
+ << redact(cmdBuilder->asTempObj()));
}
stdx::lock_guard<Latch> lg(_mutex);
diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h
index e7adde45f04..542f020655a 100644
--- a/src/mongo/db/transaction_api.h
+++ b/src/mongo/db/transaction_api.h
@@ -199,13 +199,52 @@ private:
namespace details {
/**
+ * Customization point for behaviors different in the default SEPTransactionClient and the one for
+ * running distributed transactions.
+ */
+class SEPTransactionClientBehaviors {
+public:
+ virtual ~SEPTransactionClientBehaviors() {}
+
+ /**
+ * Makes any necessary modifications to the given command, e.g. changing the name to the
+ * "cluster" version for the cluster behaviors.
+ */
+ virtual BSONObj maybeModifyCommand(BSONObj cmdObj) const = 0;
+
+ /**
+ * Returns a future with the result of running the given request.
+ */
+ virtual Future<DbResponse> handleRequest(OperationContext* opCtx,
+ const Message& request) const = 0;
+};
+
+/**
+ * Default behaviors that does not modify commands and runs them against the local process service
+ * entry point.
+ */
+class DefaultSEPTransactionClientBehaviors : public SEPTransactionClientBehaviors {
+public:
+ BSONObj maybeModifyCommand(BSONObj cmdObj) const override {
+ return cmdObj;
+ }
+
+ Future<DbResponse> handleRequest(OperationContext* opCtx,
+ const Message& request) const override;
+};
+
+/**
* Default transaction client that runs given commands through the local process service entry
* point.
*/
class SEPTransactionClient : public TransactionClient {
public:
- SEPTransactionClient(OperationContext* opCtx, ExecutorPtr executor)
- : _serviceContext(opCtx->getServiceContext()), _executor(executor) {
+ SEPTransactionClient(OperationContext* opCtx,
+ ExecutorPtr executor,
+ std::unique_ptr<SEPTransactionClientBehaviors> behaviors)
+ : _serviceContext(opCtx->getServiceContext()),
+ _executor(executor),
+ _behaviors(std::move(behaviors)) {
_cancelableOpCtxFactory = std::make_unique<CancelableOperationContextFactory>(
opCtx->getCancellationToken(), executor);
}
@@ -233,6 +272,7 @@ public:
private:
ServiceContext* const _serviceContext;
ExecutorPtr _executor;
+ std::unique_ptr<SEPTransactionClientBehaviors> _behaviors;
std::unique_ptr<details::TxnMetadataHooks> _hooks;
std::unique_ptr<CancelableOperationContextFactory> _cancelableOpCtxFactory;
};
@@ -275,7 +315,8 @@ public:
*/
Transaction(OperationContext* opCtx, ExecutorPtr executor)
: _executor(executor),
- _txnClient(std::make_unique<SEPTransactionClient>(opCtx, executor)),
+ _txnClient(std::make_unique<SEPTransactionClient>(
+ opCtx, executor, std::make_unique<DefaultSEPTransactionClientBehaviors>())),
_service(opCtx->getServiceContext()) {
_primeTransaction(opCtx);
_txnClient->injectHooks(_makeTxnMetadataHooks());
diff --git a/src/mongo/db/transaction_api_test.cpp b/src/mongo/db/transaction_api_test.cpp
index 955c84b348e..ad1e0325d10 100644
--- a/src/mongo/db/transaction_api_test.cpp
+++ b/src/mongo/db/transaction_api_test.cpp
@@ -29,6 +29,7 @@
#include "mongo/platform/basic.h"
+#include "mongo/config.h"
#include "mongo/db/error_labels.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
@@ -277,8 +278,8 @@ protected:
_threadPool = std::make_shared<ThreadPool>(std::move(options));
_threadPool->startup();
- auto mockClient =
- std::make_unique<txn_api::details::MockTransactionClient>(opCtx(), _threadPool);
+ auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>(
+ opCtx(), _threadPool, nullptr);
_mockClient = mockClient.get();
_txnWithRetries = std::make_unique<txn_api::TransactionWithRetries>(
opCtx(), _threadPool, std::move(mockClient), nullptr /* resourceYielder */);
@@ -312,7 +313,7 @@ protected:
void resetTxnWithRetries(std::unique_ptr<MockResourceYielder> resourceYielder = nullptr) {
auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>(
- opCtx(), InlineQueuedCountingExecutor::make());
+ opCtx(), _threadPool, nullptr);
_mockClient = mockClient.get();
if (resourceYielder) {
_resourceYielder = resourceYielder.get();
@@ -1286,7 +1287,10 @@ TEST_F(TxnAPITest, ClientRetryableWrite_UsesRetryableInternalSession) {
ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd);
}
-TEST_F(TxnAPITest, ClientRetryableWrite_RetryableWriteWithoutStmtIdFails) {
+#ifdef MONGO_CONFIG_DEBUG_BUILD
+DEATH_TEST_F(TxnAPITest,
+ ClientRetryableWrite_RetryableWriteWithoutStmtIdCrashesOnDebug,
+ "In a retryable write transaction every retryable write command should") {
opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
opCtx()->setTxnNumber(5);
resetTxnWithRetries();
@@ -1305,6 +1309,7 @@ TEST_F(TxnAPITest, ClientRetryableWrite_RetryableWriteWithoutStmtIdFails) {
});
ASSERT_EQ(swResult.getStatus(), ErrorCodes::duplicateCodeForTest(6410500));
}
+#endif
TEST_F(TxnAPITest, ClientTransaction_UsesClientTransactionOptionsAndDoesNotCommitOnSuccess) {
opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp
index 5ae750ec2f5..04bc909208e 100644
--- a/src/mongo/db/transaction_validation.cpp
+++ b/src/mongo/db/transaction_validation.cpp
@@ -45,7 +45,11 @@ using namespace fmt::literals;
namespace {
-const StringMap<int> retryableWriteCommands = {{"delete", 1},
+// TODO SERVER-65101: Replace this with a property on each command.
+const StringMap<int> retryableWriteCommands = {{"clusterDelete", 1},
+ {"clusterInsert", 1},
+ {"clusterUpdate", 1},
+ {"delete", 1},
{"findandmodify", 1},
{"findAndModify", 1},
{"insert", 1},
@@ -63,13 +67,17 @@ const StringMap<int> retryableWriteCommands = {{"delete", 1},
{"_shardsvrCollModParticipant", 1},
{"_shardsvrSetUserWriteBlockMode", 1}};
+// TODO SERVER-65101: Replace this with a property on each command.
// Commands that can be sent with session info but should not check out a session.
const StringMap<int> skipSessionCheckoutList = {
{"coordinateCommitTransaction", 1}, {"_recvChunkStart", 1}, {"replSetStepDown", 1}};
-const StringMap<int> transactionCommands = {{"commitTransaction", 1},
+// TODO SERVER-65101: Replace this with a property on each command.
+const StringMap<int> transactionCommands = {{"abortTransaction", 1},
+ {"clusterAbortTransaction", 1},
+ {"clusterCommitTransaction", 1},
+ {"commitTransaction", 1},
{"coordinateCommitTransaction", 1},
- {"abortTransaction", 1},
{"prepareTransaction", 1}};
} // namespace
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index adbc0a3c03c..df97cd30619 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -12,10 +12,16 @@ env.Library(
source=[
'flush_router_config_cmd.cpp',
'get_shard_map_cmd.cpp',
+ 'internal_transactions_test_commands.cpp',
+ 'internal_transactions_test_commands.idl',
],
LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/db/cluster_transaction_api',
'$BUILD_DIR/mongo/db/commands',
+ '$BUILD_DIR/mongo/db/transaction_api',
'$BUILD_DIR/mongo/s/grid',
+ '$BUILD_DIR/mongo/s/sharding_router_api',
+ '$BUILD_DIR/mongo/s/startup_initialization',
]
)
@@ -90,11 +96,9 @@ env.Library(
'cluster_validate_db_metadata_cmd.cpp',
'cluster_whats_my_uri_cmd.cpp',
'cluster_write_cmd_s.cpp',
- 'internal_transactions_test_commands.cpp',
'kill_sessions_remote.cpp',
's_read_write_concern_defaults_server_status.cpp',
'cluster_commands.idl',
- 'internal_transactions_test_commands.idl',
'shard_collection.idl',
],
LIBDEPS_PRIVATE=[
diff --git a/src/mongo/s/commands/internal_transactions_test_commands.cpp b/src/mongo/s/commands/internal_transactions_test_commands.cpp
index 81ac508f06e..9e827f8d35c 100644
--- a/src/mongo/s/commands/internal_transactions_test_commands.cpp
+++ b/src/mongo/s/commands/internal_transactions_test_commands.cpp
@@ -26,9 +26,17 @@
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kCommand
+
#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/cluster_transaction_api.h"
#include "mongo/db/commands.h"
+#include "mongo/db/transaction_api.h"
+#include "mongo/logv2/log.h"
#include "mongo/s/commands/internal_transactions_test_commands_gen.h"
+#include "mongo/s/grid.h"
+#include "mongo/s/transaction_router_resource_yielder.h"
namespace mongo {
namespace {
@@ -41,7 +49,56 @@ public:
public:
using InvocationBase::InvocationBase;
- void typedRun(OperationContext* opCtx){};
+ TestInternalTransactionsReply typedRun(OperationContext* opCtx) {
+ Grid::get(opCtx)->assertShardingIsInitialized();
+
+ auto fixedExec = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ auto txn = txn_api::TransactionWithRetries(
+ opCtx,
+ fixedExec,
+ std::make_unique<txn_api::details::SEPTransactionClient>(
+ opCtx,
+ fixedExec,
+ std::make_unique<txn_api::details::ClusterSEPTransactionClientBehaviors>(
+ opCtx->getServiceContext())),
+ TransactionRouterResourceYielder::makeForLocalHandoff());
+
+ struct SharedBlock {
+ SharedBlock(std::vector<TestInternalTransactionsCommandInfo> commandInfos_)
+ : commandInfos(commandInfos_) {}
+
+ std::vector<TestInternalTransactionsCommandInfo> commandInfos;
+ std::vector<BSONObj> responses;
+ };
+ auto sharedBlock = std::make_shared<SharedBlock>(request().getCommandInfos());
+
+ // Swallow errors and let clients inspect the responses array to determine success /
+ // failure.
+ (void)txn.runSyncNoThrow(
+ opCtx,
+ [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ for (const auto& commandInfo : sharedBlock->commandInfos) {
+ const auto& dbName = commandInfo.getDbName();
+ const auto& command = commandInfo.getCommand();
+ auto assertSucceeds = commandInfo.getAssertSucceeds();
+
+ auto res = txnClient.runCommand(dbName, command).get();
+ sharedBlock->responses.emplace_back(
+ CommandHelpers::filterCommandReplyForPassthrough(
+ res.removeField("recoveryToken")));
+
+ if (assertSucceeds) {
+ // Note this only inspects the top level ok field for non-write
+ // commands.
+ uassertStatusOK(getStatusFromWriteCommandReply(res));
+ }
+ }
+
+ return SemiFuture<void>::makeReady();
+ });
+
+ return TestInternalTransactionsReply(std::move(sharedBlock->responses));
+ };
NamespaceString ns() const override {
return NamespaceString(request().getDbName(), "");
@@ -64,6 +121,8 @@ public:
return "Internal command for testing internal transactions";
}
+ // This command can use the transaction API to run commands on different databases, so a single
+ // user database doesn't apply and we restrict this to only the admin database.
bool adminOnly() const override {
return true;
}
diff --git a/src/mongo/s/commands/internal_transactions_test_commands.idl b/src/mongo/s/commands/internal_transactions_test_commands.idl
index 846351967d3..e34d4adbbb9 100644
--- a/src/mongo/s/commands/internal_transactions_test_commands.idl
+++ b/src/mongo/s/commands/internal_transactions_test_commands.idl
@@ -32,9 +32,40 @@ global:
imports:
- "mongo/idl/basic_types.idl"
+structs:
+ TestInternalTransactionsReply:
+ description: "Response for testInternalTransactions command"
+ strict: false
+ fields:
+ responses:
+ type: array<object>
+
+ TestInternalTransactionsCommandInfo:
+ description: "A command, its database name, and other test options"
+ strict: false
+ fields:
+ dbName:
+ type: string
+ command:
+ type: object
+ assertSucceeds:
+ type: bool
+ default: true
+
commands:
testInternalTransactions:
command_name: testInternalTransactions
description: "The 'testInternalTransactions' command."
namespace: ignored
api_version: ""
+ fields:
+ useClusterClient:
+ description: "Whether the transaction API client used should opt into running the
+ 'cluster' versions of commands that enables a non-router node to run
+ the router versions of commands. Only meaningful on mongod because a
+ mongos will always run 'cluster' commands."
+ type: bool
+ default: false
+ commandInfos:
+ type: array<TestInternalTransactionsCommandInfo>
+ reply_type: TestInternalTransactionsReply
diff --git a/src/mongo/s/service_entry_point_mongos.cpp b/src/mongo/s/service_entry_point_mongos.cpp
index 89e71ea5e05..43e2d5213f0 100644
--- a/src/mongo/s/service_entry_point_mongos.cpp
+++ b/src/mongo/s/service_entry_point_mongos.cpp
@@ -202,12 +202,17 @@ Future<DbResponse> HandleRequest::run() {
return future;
}
-Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCtx,
- const Message& message) noexcept {
+Future<DbResponse> ServiceEntryPointMongos::handleRequestImpl(OperationContext* opCtx,
+ const Message& message) noexcept {
auto hr = std::make_shared<HandleRequest>(opCtx, message);
return hr->run();
}
+Future<DbResponse> ServiceEntryPointMongos::handleRequest(OperationContext* opCtx,
+ const Message& message) noexcept {
+ return handleRequestImpl(opCtx, message);
+}
+
void ServiceEntryPointMongos::onClientConnect(Client* client) {
if (load_balancer_support::isFromLoadBalancer(client)) {
_loadBalancedConnections.increment();
diff --git a/src/mongo/s/service_entry_point_mongos.h b/src/mongo/s/service_entry_point_mongos.h
index c821ab184ef..c5c6530d2a9 100644
--- a/src/mongo/s/service_entry_point_mongos.h
+++ b/src/mongo/s/service_entry_point_mongos.h
@@ -44,6 +44,10 @@ class ServiceEntryPointMongos final : public ServiceEntryPointImpl {
public:
using ServiceEntryPointImpl::ServiceEntryPointImpl;
+
+ static Future<DbResponse> handleRequestImpl(OperationContext* opCtx,
+ const Message& request) noexcept;
+
Future<DbResponse> handleRequest(OperationContext* opCtx,
const Message& request) noexcept override;