summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-09-28 14:12:16 -0400
committerJack Mulrow <jack.mulrow@mongodb.com>2018-10-09 09:39:08 -0400
commitd2d7dbadcc008a484218321666aae44b75964787 (patch)
tree6d5167a57fd3d446144d8dc08f078dfbbfbd52f6 /src
parent1e03955cdab995fed6672d75a6a4544a9771a279 (diff)
downloadmongo-d2d7dbadcc008a484218321666aae44b75964787.tar.gz
SERVER-37210 Mongos should implicitly abort transactions on unhandled errors
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript5
-rw-r--r--src/mongo/db/service_entry_point_common.cpp12
-rw-r--r--src/mongo/db/transaction_validation.cpp45
-rw-r--r--src/mongo/db/transaction_validation.h40
-rw-r--r--src/mongo/s/commands/SConscript2
-rw-r--r--src/mongo/s/commands/cluster_command_test_fixture.cpp29
-rw-r--r--src/mongo/s/commands/cluster_command_test_fixture.h4
-rw-r--r--src/mongo/s/commands/strategy.cpp121
-rw-r--r--src/mongo/s/transaction_router.cpp14
-rw-r--r--src/mongo/s/transaction_router.h6
-rw-r--r--src/mongo/s/transaction_router_test.cpp125
11 files changed, 341 insertions, 62 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 50b21051fdc..4dafd3c009b 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -825,7 +825,7 @@ env.Library(
'$BUILD_DIR/mongo/base',
],
LIBDEPS_PRIVATE=[
- 'handle_request_response',
+ 'shared_request_handling',
'introspect',
'lasterror',
'query_exec',
@@ -1431,9 +1431,10 @@ env.Library(
)
env.Library(
- target='handle_request_response',
+ target='shared_request_handling',
source=[
'handle_request_response.cpp',
+ 'transaction_validation.cpp',
],
LIBDEPS=[
'logical_session_cache_impl',
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index eb22b1a129b..0dcb4b89dbe 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -73,6 +73,7 @@
#include "mongo/db/stats/counters.h"
#include "mongo/db/stats/top.h"
#include "mongo/db/transaction_participant.h"
+#include "mongo/db/transaction_validation.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/message.h"
@@ -462,14 +463,9 @@ bool runCommandImpl(OperationContext* opCtx,
}
} else {
auto wcResult = uassertStatusOK(extractWriteConcern(opCtx, request.body));
- uassert(ErrorCodes::InvalidOptions,
- "writeConcern is not allowed within a multi-statement transaction",
- wcResult.usedDefault || !txnParticipant ||
- !txnParticipant->inMultiDocumentTransaction() ||
- invocation->definition()->getName() == "commitTransaction" ||
- invocation->definition()->getName() == "abortTransaction" ||
- invocation->definition()->getName() == "prepareTransaction" ||
- invocation->definition()->getName() == "doTxn");
+ if (txnParticipant && txnParticipant->inMultiDocumentTransaction()) {
+ validateWriteConcernForTransaction(wcResult, invocation->definition()->getName());
+ }
auto lastOpBeforeRun = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
diff --git a/src/mongo/db/transaction_validation.cpp b/src/mongo/db/transaction_validation.cpp
new file mode 100644
index 00000000000..2f26313d848
--- /dev/null
+++ b/src/mongo/db/transaction_validation.cpp
@@ -0,0 +1,45 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/transaction_validation.h"
+
+#include "mongo/db/write_concern_options.h"
+
+namespace mongo {
+
+void validateWriteConcernForTransaction(const WriteConcernOptions& wcResult, StringData cmdName) {
+ uassert(ErrorCodes::InvalidOptions,
+ "writeConcern is not allowed within a multi-statement transaction",
+ wcResult.usedDefault || cmdName == "commitTransaction" ||
+ cmdName == "abortTransaction" || cmdName == "prepareTransaction" ||
+ cmdName == "doTxn");
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/transaction_validation.h b/src/mongo/db/transaction_validation.h
new file mode 100644
index 00000000000..0eff9b8a156
--- /dev/null
+++ b/src/mongo/db/transaction_validation.h
@@ -0,0 +1,40 @@
+/**
+ * Copyright (C) 2018 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/write_concern_options.h"
+
+namespace mongo {
+
+/**
+ * Throws if the given write concern is not allowed in a transaction.
+ */
+void validateWriteConcernForTransaction(const WriteConcernOptions& wcResult, StringData cmdName);
+
+} // namespace mongo
diff --git a/src/mongo/s/commands/SConscript b/src/mongo/s/commands/SConscript
index 0d72376173e..b1892e0a43e 100644
--- a/src/mongo/s/commands/SConscript
+++ b/src/mongo/s/commands/SConscript
@@ -104,7 +104,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/commands/write_commands_common',
'$BUILD_DIR/mongo/db/ftdc/ftdc_server',
- '$BUILD_DIR/mongo/db/handle_request_response',
+ '$BUILD_DIR/mongo/db/shared_request_handling',
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/pipeline/aggregation',
'$BUILD_DIR/mongo/db/stats/counters',
diff --git a/src/mongo/s/commands/cluster_command_test_fixture.cpp b/src/mongo/s/commands/cluster_command_test_fixture.cpp
index e9c44a154ef..884bba83ee0 100644
--- a/src/mongo/s/commands/cluster_command_test_fixture.cpp
+++ b/src/mongo/s/commands/cluster_command_test_fixture.cpp
@@ -158,9 +158,17 @@ void ClusterCommandTestFixture::runCommandInspectRequests(BSONObj cmd,
future.timed_get(kFutureTimeout);
}
-void ClusterCommandTestFixture::runCommandMaxErrors(BSONObj cmd,
- ErrorCodes::Error code,
- bool isTargeted) {
+void ClusterCommandTestFixture::expectAbortTransaction() {
+ onCommandForPoolExecutor([](const executor::RemoteCommandRequest& request) {
+ auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
+ ASSERT_EQ(cmdName, "abortTransaction");
+ return BSON("ok" << 1);
+ });
+}
+
+void ClusterCommandTestFixture::runTxnCommandMaxErrors(BSONObj cmd,
+ ErrorCodes::Error code,
+ bool isTargeted) {
auto future = launchAsync([&] { runCommand(cmd); });
size_t numRetries =
@@ -169,6 +177,13 @@ void ClusterCommandTestFixture::runCommandMaxErrors(BSONObj cmd,
expectReturnsError(code);
}
+ // In a transaction, each targeted shard is sent abortTransaction when the router exhausts its
+ // retries.
+ size_t numTargetedShards = isTargeted ? 1 : 2;
+ for (size_t i = 0; i < numTargetedShards; i++) {
+ expectAbortTransaction();
+ }
+
future.timed_get(kFutureTimeout);
}
@@ -199,13 +214,13 @@ void ClusterCommandTestFixture::testRetryOnSnapshotError(BSONObj targetedCmd,
void ClusterCommandTestFixture::testMaxRetriesSnapshotErrors(BSONObj targetedCmd,
BSONObj scatterGatherCmd) {
// Target one shard.
- runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
- runCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
+ runTxnCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotUnavailable, true);
+ runTxnCommandMaxErrors(_makeCmd(targetedCmd), ErrorCodes::SnapshotTooOld, true);
// Target all shards
if (!scatterGatherCmd.isEmpty()) {
- runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
- runCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
+ runTxnCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotUnavailable, false);
+ runTxnCommandMaxErrors(_makeCmd(scatterGatherCmd), ErrorCodes::SnapshotTooOld, false);
}
}
diff --git a/src/mongo/s/commands/cluster_command_test_fixture.h b/src/mongo/s/commands/cluster_command_test_fixture.h
index 71cc2f2e830..2f40d06128b 100644
--- a/src/mongo/s/commands/cluster_command_test_fixture.h
+++ b/src/mongo/s/commands/cluster_command_test_fixture.h
@@ -55,6 +55,8 @@ protected:
void expectReturnsError(ErrorCodes::Error code);
+ void expectAbortTransaction();
+
DbResponse runCommand(BSONObj cmd);
void runCommandSuccessful(BSONObj cmd, bool isTargeted);
@@ -63,7 +65,7 @@ protected:
void runCommandInspectRequests(BSONObj cmd, InspectionCallback cb, bool isTargeted);
- void runCommandMaxErrors(BSONObj cmd, ErrorCodes::Error code, bool isTargeted);
+ void runTxnCommandMaxErrors(BSONObj cmd, ErrorCodes::Error code, bool isTargeted);
/**
* Verifies that running the given commands through mongos will succeed.
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 99f0d18a807..55081a9a04b 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -57,6 +57,7 @@
#include "mongo/db/query/getmore_request.h"
#include "mongo/db/query/query_request.h"
#include "mongo/db/stats/counters.h"
+#include "mongo/db/transaction_validation.h"
#include "mongo/db/views/resolved_view.h"
#include "mongo/rpc/factory.h"
#include "mongo/rpc/get_status_from_command_result.h"
@@ -150,6 +151,44 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res
}
}
+/**
+ * Invokes the given command and aborts the transaction on any non-retryable errors.
+ */
+void invokeInTransactionRouter(OperationContext* opCtx,
+ CommandInvocation* invocation,
+ TransactionRouter* txnRouter,
+ rpc::ReplyBuilderInterface* result) {
+ try {
+ invocation->run(opCtx, result);
+ } catch (const DBException& e) {
+ if (ErrorCodes::isSnapshotError(e.code()) ||
+ ErrorCodes::isNeedRetargettingError(e.code()) ||
+ e.code() == ErrorCodes::StaleDbVersion) {
+ // Don't abort on possibly retryable errors.
+ throw;
+ }
+
+ txnRouter->implicitlyAbortTransaction(opCtx);
+ throw;
+ }
+}
+
+/**
+ * Throws NoSuchTransaction if canRetry is false.
+ */
+void handleCanRetryInTransaction(OperationContext* opCtx,
+ TransactionRouter* txnRouter,
+ bool canRetry,
+ const DBException& ex) {
+ if (!canRetry) {
+ uasserted(ErrorCodes::NoSuchTransaction,
+ str::stream() << "Transaction " << opCtx->getTxnNumber() << " was aborted after "
+ << kMaxNumStaleVersionRetries
+ << " failed retries. The latest attempt failed with: "
+ << ex.toStatus());
+ }
+}
+
void execCommandClient(OperationContext* opCtx,
CommandInvocation* invocation,
const OpMsgRequest& request,
@@ -200,16 +239,10 @@ void execCommandClient(OperationContext* opCtx,
globalOpCounters.gotCommand();
}
- StatusWith<WriteConcernOptions> wcResult =
- WriteConcernOptions::extractWCFromCommand(request.body);
- if (!wcResult.isOK()) {
- auto body = result->getBodyBuilder();
- CommandHelpers::appendCommandStatusNoThrow(body, wcResult.getStatus());
- return;
- }
+ auto wcResult = uassertStatusOK(WriteConcernOptions::extractWCFromCommand(request.body));
bool supportsWriteConcern = invocation->supportsWriteConcern();
- if (!supportsWriteConcern && !wcResult.getValue().usedDefault) {
+ if (!supportsWriteConcern && !wcResult.usedDefault) {
// This command doesn't do writes so it should not be passed a writeConcern.
// If we did not use the default writeConcern, one was provided when it shouldn't have
// been by the user.
@@ -219,6 +252,10 @@ void execCommandClient(OperationContext* opCtx,
return;
}
+ if (TransactionRouter::get(opCtx)) {
+ validateWriteConcernForTransaction(wcResult, c->getName());
+ }
+
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
if (readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
@@ -269,20 +306,34 @@ void execCommandClient(OperationContext* opCtx,
return;
}
+ auto txnRouter = TransactionRouter::get(opCtx);
if (!supportsWriteConcern) {
- invocation->run(opCtx, result);
+ if (txnRouter) {
+ invokeInTransactionRouter(opCtx, invocation, txnRouter, result);
+ } else {
+ invocation->run(opCtx, result);
+ }
} else {
// Change the write concern while running the command.
const auto oldWC = opCtx->getWriteConcern();
ON_BLOCK_EXIT([&] { opCtx->setWriteConcern(oldWC); });
- opCtx->setWriteConcern(wcResult.getValue());
+ opCtx->setWriteConcern(wcResult);
- invocation->run(opCtx, result);
+ if (txnRouter) {
+ invokeInTransactionRouter(opCtx, invocation, txnRouter, result);
+ } else {
+ invocation->run(opCtx, result);
+ }
}
+
auto body = result->getBodyBuilder();
bool ok = CommandHelpers::extractOrAppendOk(body);
if (!ok) {
c->incrementCommandsFailed();
+
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter->implicitlyAbortTransaction(opCtx);
+ }
}
}
@@ -406,18 +457,14 @@ void runCommand(OperationContext* opCtx,
Grid::get(opCtx)->catalogCache()->invalidateShardedCollection(staleNs);
- // Update transaction tracking state for a possible retry. Throws if the transaction
- // cannot continue.
+ // Update transaction tracking state for a possible retry. Throws and aborts the
+ // transaction if it cannot continue.
if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ auto abortGuard =
+ MakeGuard([&] { txnRouter->implicitlyAbortTransaction(opCtx); });
+ handleCanRetryInTransaction(opCtx, txnRouter, canRetry, ex);
txnRouter->onStaleShardOrDbError(commandName);
- // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws.
- uassert(ErrorCodes::NoSuchTransaction,
- str::stream() << "Transaction " << opCtx->getTxnNumber()
- << " was aborted after "
- << kMaxNumStaleVersionRetries
- << " failed retries. The latest attempt failed with: "
- << ex.toStatus(),
- canRetry);
+ abortGuard.Dismiss();
}
if (canRetry) {
@@ -429,18 +476,14 @@ void runCommand(OperationContext* opCtx,
Grid::get(opCtx)->catalogCache()->onStaleDatabaseVersion(ex->getDb(),
ex->getVersionReceived());
- // Update transaction tracking state for a possible retry. Throws if the transaction
- // cannot continue.
+ // Update transaction tracking state for a possible retry. Throws and aborts the
+ // transaction if it cannot continue.
if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ auto abortGuard =
+ MakeGuard([&] { txnRouter->implicitlyAbortTransaction(opCtx); });
+ handleCanRetryInTransaction(opCtx, txnRouter, canRetry, ex);
txnRouter->onStaleShardOrDbError(commandName);
- // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws.
- uassert(ErrorCodes::NoSuchTransaction,
- str::stream() << "Transaction " << opCtx->getTxnNumber()
- << " was aborted after "
- << kMaxNumStaleVersionRetries
- << " failed retries. The latest attempt failed with: "
- << ex.toStatus(),
- canRetry);
+ abortGuard.Dismiss();
}
if (canRetry) {
@@ -450,18 +493,14 @@ void runCommand(OperationContext* opCtx,
} catch (const ExceptionForCat<ErrorCategory::SnapshotError>& ex) {
// Simple retry on any type of snapshot error.
- // Update transaction tracking state for a possible retry. Throws if the transaction
- // cannot continue.
+ // Update transaction tracking state for a possible retry. Throws and aborts the
+ // transaction if it cannot continue.
if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ auto abortGuard =
+ MakeGuard([&] { txnRouter->implicitlyAbortTransaction(opCtx); });
+ handleCanRetryInTransaction(opCtx, txnRouter, canRetry, ex);
txnRouter->onSnapshotError();
- // TODO SERVER-37210: Implicitly abort the transaction if this uassert throws.
- uassert(ErrorCodes::NoSuchTransaction,
- str::stream() << "Transaction " << opCtx->getTxnNumber()
- << " was aborted after "
- << kMaxNumStaleVersionRetries
- << " failed retries. The latest attempt failed with: "
- << ex.toStatus(),
- canRetry);
+ abortGuard.Dismiss();
}
if (canRetry) {
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 5b8e1c09b4d..3b44b615eab 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -357,7 +357,6 @@ bool TransactionRouter::_canContinueOnStaleShardOrDbError(StringData cmdName) co
}
void TransactionRouter::onStaleShardOrDbError(StringData cmdName) {
- // TODO SERVER-37210: Implicitly abort the entire transaction if this uassert throws.
uassert(ErrorCodes::NoSuchTransaction,
"Transaction was aborted due to cluster data placement change",
_canContinueOnStaleShardOrDbError(cmdName));
@@ -381,7 +380,6 @@ bool TransactionRouter::_canContinueOnSnapshotError() const {
}
void TransactionRouter::onSnapshotError() {
- // TODO SERVER-37210: Implicitly abort the entire transaction if this uassert throws.
uassert(ErrorCodes::NoSuchTransaction,
"Transaction was aborted due to snapshot error on subsequent transaction statement",
_canContinueOnSnapshotError());
@@ -636,4 +634,16 @@ ScopedRouterSession::~ScopedRouterSession() {
RouterSessionCatalog::get(_opCtx)->checkInSessionState(opCtxSession->getSessionId());
}
+void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx) {
+ if (_participants.empty()) {
+ return;
+ }
+
+ try {
+ abortTransaction(opCtx);
+ } catch (...) {
+ // Ignore any exceptions.
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index 770a7cb7c6d..3020db842c3 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -186,6 +186,12 @@ public:
std::vector<AsyncRequestsSender::Response> abortTransaction(OperationContext* opCtx);
/**
+ * Sends abort to all shards in the current participant list. Will retry on retryable errors,
+ * but ignores the responses from each shard.
+ */
+ void implicitlyAbortTransaction(OperationContext* opCtx);
+
+ /**
* Extract the runtimne state attached to the operation context. Returns nullptr if none is
* attached.
*/
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 6dcc8795f4f..b0da4b30af0 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -1241,5 +1241,130 @@ TEST_F(TransactionRouterTest, OnViewResolutionErrorClearsAllNewParticipants) {
ASSERT_TRUE(secondShardCmd["startTransaction"].trueValue());
}
+TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) {
+ TxnNumber txnNum{3};
+
+ auto opCtx = operationContext();
+ opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ opCtx->setTxnNumber(txnNum);
+ ScopedRouterSession scopedSession(opCtx);
+
+ auto txnRouter = TransactionRouter::get(opCtx);
+ txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+
+ // Should not throw.
+ txnRouter->implicitlyAbortTransaction(opCtx);
+}
+
+TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) {
+ LogicalSessionId lsid(makeLogicalSessionIdForTest());
+ TxnNumber txnNum{3};
+
+ auto opCtx = operationContext();
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNum);
+
+ ScopedRouterSession scopedSession(opCtx);
+ auto txnRouter = TransactionRouter::get(opCtx);
+
+ txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->attachTxnFieldsIfNeeded(shard1, {});
+
+ auto future =
+ launchAsync([&] { return txnRouter->implicitlyAbortTransaction(operationContext()); });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(hostAndPort1, request.target);
+ ASSERT_EQ("admin", request.dbname);
+
+ auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
+ ASSERT_EQ(cmdName, "abortTransaction");
+
+ checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+
+ return BSON("ok" << 1);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) {
+ LogicalSessionId lsid(makeLogicalSessionIdForTest());
+ TxnNumber txnNum{3};
+
+ auto opCtx = operationContext();
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNum);
+
+ ScopedRouterSession scopedSession(opCtx);
+ auto txnRouter = TransactionRouter::get(opCtx);
+
+ txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->attachTxnFieldsIfNeeded(shard1, {});
+ txnRouter->attachTxnFieldsIfNeeded(shard2, {});
+
+ auto future =
+ launchAsync([&] { return txnRouter->implicitlyAbortTransaction(operationContext()); });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(hostAndPort1, request.target);
+ ASSERT_EQ("admin", request.dbname);
+
+ auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
+ ASSERT_EQ(cmdName, "abortTransaction");
+
+ checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+
+ return BSON("ok" << 1);
+ });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(hostAndPort2, request.target);
+ ASSERT_EQ("admin", request.dbname);
+
+ auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
+ ASSERT_EQ(cmdName, "abortTransaction");
+
+ checkSessionDetails(request.cmdObj, lsid, txnNum, boost::none);
+
+ return BSON("ok" << 1);
+ });
+
+ future.timed_get(kFutureTimeout);
+}
+
+TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) {
+ LogicalSessionId lsid(makeLogicalSessionIdForTest());
+ TxnNumber txnNum{3};
+
+ auto opCtx = operationContext();
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNum);
+
+ ScopedRouterSession scopedSession(opCtx);
+ auto txnRouter = TransactionRouter::get(opCtx);
+
+ txnRouter->beginOrContinueTxn(opCtx, txnNum, true);
+ txnRouter->attachTxnFieldsIfNeeded(shard1, {});
+
+ auto future =
+ launchAsync([&] { return txnRouter->implicitlyAbortTransaction(operationContext()); });
+
+ onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
+ ASSERT_EQ(hostAndPort1, request.target);
+ ASSERT_EQ("admin", request.dbname);
+
+ auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
+ ASSERT_EQ(cmdName, "abortTransaction");
+
+ checkSessionDetails(request.cmdObj, lsid, txnNum, true);
+
+ return BSON("ok" << 0);
+ });
+
+ // Shouldn't throw.
+ future.timed_get(kFutureTimeout);
+}
+
} // unnamed namespace
} // namespace mongo