summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-05-11 14:05:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-05-11 22:18:10 +0000
commit27c96773225b595d1d58691ece2e70fea389fd43 (patch)
tree3b0cc633da8455e50bf14613fbb0d6206991a606
parent5bef21c71bb033ee142003eac2f6a2fe42606ebe (diff)
downloadmongo-27c96773225b595d1d58691ece2e70fea389fd43.tar.gz
SERVER-66316 Make transactions API interact better with Client Side Operation Timeout
(cherry picked from commit 2102f99116f9c8a4409ef2376abf7f2f1c9590c4)
-rw-r--r--jstests/concurrency/fsm_workloads/internal_transactions_sharded.js8
-rw-r--r--src/mongo/db/SConscript3
-rw-r--r--src/mongo/db/commands/internal_transactions_test_command_d.cpp9
-rw-r--r--src/mongo/db/fle_crud_mongod.cpp16
-rw-r--r--src/mongo/db/transaction_api.cpp38
-rw-r--r--src/mongo/db/transaction_api.h23
-rw-r--r--src/mongo/db/transaction_api_test.cpp58
-rw-r--r--src/mongo/s/commands/internal_transactions_test_command.h14
-rw-r--r--src/mongo/s/commands/internal_transactions_test_command_s.cpp9
9 files changed, 111 insertions, 67 deletions
diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js
index 856f7ecfccd..4c9b11f490a 100644
--- a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js
+++ b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js
@@ -89,10 +89,6 @@ var $config = extendWorkload($config, function($config, $super) {
}));
this.originalStoreFindAndModifyImagesInSideCollection[db.getMongo().host] = res.was;
});
-
- cluster.executeOnMongosNodes((db) => {
- configureFailPoint(db, "skipTransactionApiRetryCheckInHandleError");
- });
};
$config.teardown = function teardown(db, collName, cluster) {
@@ -103,10 +99,6 @@ var $config = extendWorkload($config, function($config, $super) {
this.originalStoreFindAndModifyImagesInSideCollection[db.getMongo().host]
}));
});
-
- cluster.executeOnMongosNodes((db) => {
- configureFailPoint(db, "skipTransactionApiRetryCheckInHandleError", {}, "off");
- });
};
return $config;
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index b08a246c84d..8d06e55d79b 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -858,6 +858,7 @@ env.Library(
'$BUILD_DIR/mongo/db/auth/auth',
'$BUILD_DIR/mongo/db/query/command_request_response',
'$BUILD_DIR/mongo/db/query/query_request',
+ '$BUILD_DIR/mongo/executor/task_executor_interface',
'$BUILD_DIR/mongo/rpc/command_status',
'$BUILD_DIR/mongo/rpc/rpc',
'$BUILD_DIR/mongo/transport/service_entry_point',
@@ -904,7 +905,9 @@ env.Library(
'$BUILD_DIR/mongo/crypto/fle_crypto',
'$BUILD_DIR/mongo/db/query/query_request',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
+ '$BUILD_DIR/mongo/executor/network_interface_factory',
'$BUILD_DIR/mongo/executor/task_executor_pool',
+ '$BUILD_DIR/mongo/executor/thread_pool_task_executor',
"$BUILD_DIR/mongo/util/concurrency/thread_pool",
'fle_crud',
'logical_session_id',
diff --git a/src/mongo/db/commands/internal_transactions_test_command_d.cpp b/src/mongo/db/commands/internal_transactions_test_command_d.cpp
index 4a8570cc87f..5f94e36c78c 100644
--- a/src/mongo/db/commands/internal_transactions_test_command_d.cpp
+++ b/src/mongo/db/commands/internal_transactions_test_command_d.cpp
@@ -39,10 +39,11 @@ namespace {
class InternalTransactionsTestCommandD
: public InternalTransactionsTestCommandBase<InternalTransactionsTestCommandD> {
public:
- static txn_api::SyncTransactionWithRetries getTxn(OperationContext* opCtx,
- ExecutorPtr executor,
- StringData commandName,
- bool useClusterClient) {
+ static txn_api::SyncTransactionWithRetries getTxn(
+ OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ StringData commandName,
+ bool useClusterClient) {
// If a sharded mongod is acting as a mongos, it will need special routing behaviors.
if (useClusterClient) {
return txn_api::SyncTransactionWithRetries(
diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp
index d198a35f398..e8ac3a9b77b 100644
--- a/src/mongo/db/fle_crud_mongod.cpp
+++ b/src/mongo/db/fle_crud_mongod.cpp
@@ -52,6 +52,8 @@
#include "mongo/db/transaction_api.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/transaction_participant_resource_yielder.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/idl/idl_parser.h"
#include "mongo/s/grid.h"
#include "mongo/s/transaction_router_resource_yielder.h"
@@ -62,7 +64,7 @@
namespace mongo {
namespace {
-std::shared_ptr<ThreadPool> _fleCrudthreadPool;
+std::shared_ptr<executor::ThreadPoolTaskExecutor> _fleCrudExecutor;
ThreadPool::Options getThreadPoolOptions() {
ThreadPool::Options tpOptions;
@@ -145,7 +147,7 @@ private:
std::shared_ptr<txn_api::SyncTransactionWithRetries> getTransactionWithRetriesForMongoD(
OperationContext* opCtx) {
return std::make_shared<txn_api::SyncTransactionWithRetries>(
- opCtx, _fleCrudthreadPool, std::make_unique<FLEMongoDResourceYielder>());
+ opCtx, _fleCrudExecutor, std::make_unique<FLEMongoDResourceYielder>());
}
void startFLECrud(ServiceContext* serviceContext) {
@@ -155,14 +157,16 @@ void startFLECrud(ServiceContext* serviceContext) {
return;
}
- _fleCrudthreadPool = std::make_shared<ThreadPool>(getThreadPoolOptions());
- _fleCrudthreadPool->startup();
+ _fleCrudExecutor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(getThreadPoolOptions()),
+ executor::makeNetworkInterface("FLECrudNetwork"));
+ _fleCrudExecutor->startup();
}
void stopFLECrud() {
// Check if it was started
- if (_fleCrudthreadPool.get() != nullptr) {
- _fleCrudthreadPool->shutdown();
+ if (_fleCrudExecutor.get() != nullptr) {
+ _fleCrudExecutor->shutdown();
}
}
diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp
index c67621e8a0b..9208c7936a5 100644
--- a/src/mongo/db/transaction_api.cpp
+++ b/src/mongo/db/transaction_api.cpp
@@ -65,12 +65,17 @@
// TODO SERVER-65395: Remove failpoint when fle2 tests can reliably support internal transaction
// retry limit.
MONGO_FAIL_POINT_DEFINE(skipTransactionApiRetryCheckInHandleError);
+MONGO_FAIL_POINT_DEFINE(overrideTransactionApiMaxRetriesToThree);
-namespace mongo::txn_api {
+namespace mongo {
+
+const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
+
+namespace txn_api {
SyncTransactionWithRetries::SyncTransactionWithRetries(
OperationContext* opCtx,
- ExecutorPtr executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
std::unique_ptr<ResourceYielder> resourceYielder,
std::unique_ptr<TransactionClient> txnClient)
: _resourceYielder(std::move(resourceYielder)),
@@ -160,11 +165,12 @@ std::string Transaction::_transactionStateToString(TransactionState txnState) co
MONGO_UNREACHABLE;
}
-void logNextStep(Transaction::ErrorHandlingStep nextStep, const BSONObj& txnInfo) {
+void logNextStep(Transaction::ErrorHandlingStep nextStep, const BSONObj& txnInfo, int attempts) {
LOGV2(5918600,
"Chose internal transaction error handling step",
"nextStep"_attr = errorHandlingStepToString(nextStep),
- "txnInfo"_attr = txnInfo);
+ "txnInfo"_attr = txnInfo,
+ "attempts"_attr = attempts);
}
SemiFuture<CommitResult> TransactionWithRetries::run(Callback callback) noexcept {
@@ -181,6 +187,7 @@ SemiFuture<CommitResult> TransactionWithRetries::run(Callback callback) noexcept
invariant(txnStatus != ErrorCodes::TransactionAPIMustRetryCommit);
return txnStatus.isOK() || txnStatus != ErrorCodes::TransactionAPIMustRetryTransaction;
})
+ .withBackoffBetweenIterations(kExponentialBackoff)
// Cancellation happens by interrupting the caller's opCtx.
.on(_executor, CancellationToken::uncancelable())
// Safe to inline because the continuation only holds state.
@@ -193,7 +200,7 @@ ExecutorFuture<void> TransactionWithRetries::_runBodyHandleErrors(int bodyAttemp
return _internalTxn->runCallback().thenRunOn(_executor).onError(
[this, bodyAttempts](Status bodyStatus) {
auto nextStep = _internalTxn->handleError(bodyStatus, bodyAttempts);
- logNextStep(nextStep, _internalTxn->reportStateForLog());
+ logNextStep(nextStep, _internalTxn->reportStateForLog(), bodyAttempts);
if (nextStep == Transaction::ErrorHandlingStep::kDoNotRetry) {
iassert(bodyStatus);
@@ -220,7 +227,7 @@ ExecutorFuture<CommitResult> TransactionWithRetries::_runCommitHandleErrors(int
}
auto nextStep = _internalTxn->handleError(swCommitResult, commitAttempts);
- logNextStep(nextStep, _internalTxn->reportStateForLog());
+ logNextStep(nextStep, _internalTxn->reportStateForLog(), commitAttempts);
if (nextStep == Transaction::ErrorHandlingStep::kDoNotRetry) {
return ExecutorFuture<CommitResult>(_executor, swCommitResult);
@@ -249,6 +256,7 @@ ExecutorFuture<CommitResult> TransactionWithRetries::_runCommitWithRetries() {
.until([](StatusWith<CommitResult> swResult) {
return swResult.isOK() || swResult != ErrorCodes::TransactionAPIMustRetryCommit;
})
+ .withBackoffBetweenIterations(kExponentialBackoff)
// Cancellation happens by interrupting the caller's opCtx.
.on(_executor, CancellationToken::uncancelable());
}
@@ -477,6 +485,12 @@ SemiFuture<void> Transaction::runCallback() {
.semi();
}
+int getMaxRetries() {
+ // Allow overriding the number of retries so unit tests can exhaust them faster.
+ return MONGO_unlikely(overrideTransactionApiMaxRetriesToThree.shouldFail()) ? 3
+ : kTxnRetryLimit;
+}
+
Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitResult>& swResult,
int attemptCounter) const noexcept {
stdx::lock_guard<Latch> lg(_mutex);
@@ -489,17 +503,16 @@ Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitR
"hasTransientTransactionErrorLabel"_attr =
_latestResponseHasTransientTransactionErrorLabel,
"txnInfo"_attr = _reportStateForLog(lg),
- "retriesLeft"_attr =
- kTxnRetryLimit - attemptCounter + 1 // To account for the initial execution.
- );
+ "attempts"_attr = attemptCounter);
if (_execContext == ExecutionContext::kClientTransaction) {
// If we're nested in another transaction, let the outer most client decide on errors.
return ErrorHandlingStep::kDoNotRetry;
}
- if (!MONGO_unlikely(skipTransactionApiRetryCheckInHandleError.shouldFail()) &&
- attemptCounter > kTxnRetryLimit) {
+ // If the op has a deadline, retry until it is reached regardless of the number of attempts.
+ if (attemptCounter > getMaxRetries() && !_opDeadline &&
+ !MONGO_unlikely(skipTransactionApiRetryCheckInHandleError.shouldFail())) {
return _isInCommit() ? ErrorHandlingStep::kDoNotRetry
: ErrorHandlingStep::kAbortAndDoNotRetry;
}
@@ -745,4 +758,5 @@ Transaction::~Transaction() {
}
} // namespace details
-} // namespace mongo::txn_api
+} // namespace txn_api
+} // namespace mongo
diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h
index 859aa8ecd7f..c224949863a 100644
--- a/src/mongo/db/transaction_api.h
+++ b/src/mongo/db/transaction_api.h
@@ -34,6 +34,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/db/query/find_command_gen.h"
#include "mongo/db/resource_yielder.h"
+#include "mongo/executor/task_executor.h"
#include "mongo/rpc/write_concern_error_detail.h"
#include "mongo/s/write_ops/batched_command_request.h"
#include "mongo/s/write_ops/batched_command_response.h"
@@ -46,8 +47,12 @@ class TxnMetadataHooks;
class TransactionWithRetries;
} // namespace details
-// Max number of retries allowed for a transaction operation.
-static constexpr int kTxnRetryLimit = 10;
+// Max number of retries allowed for a transaction operation. The API uses exponential backoffs
+// capped at 1 second for transient error and commit network error retries, so this corresponds to
+// roughly 2 minutes of sleeps in total between retries meant to loosely mirror the 2 minute timeout
+// used by the driver's convenient transactions API:
+// https://github.com/mongodb/specifications/blob/92d77a6d/source/transactions-convenient-api/transactions-convenient-api.rst
+static constexpr int kTxnRetryLimit = 120;
static constexpr auto kMaxTimeMSField = "maxTimeMS";
/**
@@ -156,7 +161,7 @@ public:
*
*/
SyncTransactionWithRetries(OperationContext* opCtx,
- ExecutorPtr executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
std::unique_ptr<ResourceYielder> resourceYielder,
std::unique_ptr<TransactionClient> txnClient = nullptr);
@@ -238,7 +243,7 @@ public:
class SEPTransactionClient : public TransactionClient {
public:
SEPTransactionClient(OperationContext* opCtx,
- ExecutorPtr executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
std::unique_ptr<SEPTransactionClientBehaviors> behaviors)
: _serviceContext(opCtx->getServiceContext()),
_executor(executor),
@@ -269,7 +274,7 @@ public:
private:
ServiceContext* const _serviceContext;
- ExecutorPtr _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
std::unique_ptr<SEPTransactionClientBehaviors> _behaviors;
std::unique_ptr<details::TxnMetadataHooks> _hooks;
std::unique_ptr<CancelableOperationContextFactory> _cancelableOpCtxFactory;
@@ -304,7 +309,7 @@ public:
* and infers its execution context from the given OperationContext.
*/
Transaction(OperationContext* opCtx,
- ExecutorPtr executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
std::unique_ptr<TransactionClient> txnClient)
: _executor(executor),
_txnClient(std::move(txnClient)),
@@ -414,7 +419,7 @@ private:
*/
void _primeTransaction(OperationContext* opCtx);
- const ExecutorPtr _executor;
+ const std::shared_ptr<executor::TaskExecutor> _executor;
std::unique_ptr<TransactionClient> _txnClient;
Callback _callback;
@@ -464,7 +469,7 @@ public:
TransactionWithRetries operator=(const TransactionWithRetries&) = delete;
TransactionWithRetries(OperationContext* opCtx,
- ExecutorPtr executor,
+ std::shared_ptr<executor::TaskExecutor> executor,
std::unique_ptr<TransactionClient> txnClient)
: _internalTxn(std::make_shared<Transaction>(opCtx, executor, std::move(txnClient))),
_executor(executor) {}
@@ -499,7 +504,7 @@ private:
ExecutorFuture<void> _bestEffortAbort();
std::shared_ptr<Transaction> _internalTxn;
- ExecutorPtr _executor;
+ std::shared_ptr<executor::TaskExecutor> _executor;
};
} // namespace details
diff --git a/src/mongo/db/transaction_api_test.cpp b/src/mongo/db/transaction_api_test.cpp
index 14e559ed75d..a417a4d3028 100644
--- a/src/mongo/db/transaction_api_test.cpp
+++ b/src/mongo/db/transaction_api_test.cpp
@@ -39,6 +39,8 @@
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_test_fixture.h"
#include "mongo/db/transaction_api.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/is_mongos.h"
#include "mongo/stdx/future.h"
@@ -312,24 +314,28 @@ protected:
options.minThreads = 1;
options.maxThreads = 1;
- _threadPool = std::make_shared<ThreadPool>(std::move(options));
- _threadPool->startup();
+ _executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(std::move(options)),
+ executor::makeNetworkInterface("TxnAPITestNetwork"));
+ _executor->startup();
- auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>(
- opCtx(), _threadPool, nullptr);
+ auto mockClient =
+ std::make_unique<txn_api::details::MockTransactionClient>(opCtx(), _executor, nullptr);
_mockClient = mockClient.get();
_txnWithRetries = std::make_unique<txn_api::SyncTransactionWithRetries>(
- opCtx(), _threadPool, nullptr /* resourceYielder */, std::move(mockClient));
+ opCtx(), _executor, nullptr /* resourceYielder */, std::move(mockClient));
}
void tearDown() override {
- _threadPool->shutdown();
- _threadPool->join();
- _threadPool.reset();
+ _executor->shutdown();
+ _executor->join();
+ _executor.reset();
}
void waitForAllEarlierTasksToComplete() {
- _threadPool->waitForIdle();
+ while (_executor->hasTasks()) {
+ continue;
+ }
}
OperationContext* opCtx() {
@@ -349,8 +355,8 @@ protected:
}
void resetTxnWithRetries(std::unique_ptr<MockResourceYielder> resourceYielder = nullptr) {
- auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>(
- opCtx(), _threadPool, nullptr);
+ auto mockClient =
+ std::make_unique<txn_api::details::MockTransactionClient>(opCtx(), _executor, nullptr);
_mockClient = mockClient.get();
if (resourceYielder) {
_resourceYielder = resourceYielder.get();
@@ -365,7 +371,7 @@ protected:
// pool. This ensures that we can predictably monitor txnNumber's value.
_txnWithRetries = nullptr;
_txnWithRetries = std::make_unique<txn_api::SyncTransactionWithRetries>(
- opCtx(), _threadPool, std::move(resourceYielder), std::move(mockClient));
+ opCtx(), _executor, std::move(resourceYielder), std::move(mockClient));
}
void expectSentAbort(TxnNumber txnNumber, BSONObj writeConcern) {
@@ -380,7 +386,7 @@ protected:
private:
ServiceContext::UniqueOperationContext _opCtx;
- std::shared_ptr<ThreadPool> _threadPool;
+ std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor;
txn_api::details::MockTransactionClient* _mockClient{nullptr};
MockResourceYielder* _resourceYielder{nullptr};
std::unique_ptr<txn_api::SyncTransactionWithRetries> _txnWithRetries;
@@ -1751,6 +1757,8 @@ TEST_F(TxnAPITest, TestExhaustiveFindErrorOnGetMore) {
}
TEST_F(TxnAPITest, OwnSession_StartTransactionRetryLimitOnTransientErrors) {
+ FailPointEnableBlock fp("overrideTransactionApiMaxRetriesToThree");
+
int retryCount = 0;
auto swResult = txnWithRetries().runNoThrow(
opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
@@ -1770,12 +1778,13 @@ TEST_F(TxnAPITest, OwnSession_StartTransactionRetryLimitOnTransientErrors) {
// The transient error should have been propagated.
ASSERT_EQ(swResult.getStatus(), ErrorCodes::HostUnreachable);
- // We get 11 due to the initial try and then 10 follow up retries.
- ASSERT_EQ(retryCount, 11);
+ // We get 4 due to the initial try and then 3 follow up retries because
+ // overrideTransactionApiMaxRetriesToThree sets the max retry attempts to 3.
+ ASSERT_EQ(retryCount, 4);
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
- 10 /* txnNumber */,
+ 3 /* txnNumber */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */);
@@ -1784,6 +1793,8 @@ TEST_F(TxnAPITest, OwnSession_StartTransactionRetryLimitOnTransientErrors) {
}
TEST_F(TxnAPITest, OwnSession_CommitTransactionRetryLimitOnTransientErrors) {
+ FailPointEnableBlock fp("overrideTransactionApiMaxRetriesToThree");
+
// If we are able to successfully finish this test, then we know that we have limited our
// retries.
auto swResult = txnWithRetries().runNoThrow(
@@ -1820,7 +1831,9 @@ TEST_F(TxnAPITest, OwnSession_CommitTransactionRetryLimitOnTransientErrors) {
ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd);
}
-TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) {
+TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadlineAndIgnoresDefaultRetryLimit) {
+ FailPointEnableBlock fp("overrideTransactionApiMaxRetriesToThree");
+
const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>();
mockClock->reset(getServiceContext()->getFastClockSource()->now());
getServiceContext()->setFastClockSource(std::make_unique<SharedClockSourceAdapter>(mockClock));
@@ -1831,8 +1844,11 @@ TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) {
// txnNumber will be incremented upon the release of a session.
resetTxnWithRetries();
+ int attempt = -1;
auto swResult = txnWithRetries().runNoThrow(
opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
+ attempt += 1;
+
mockClient()->setNextCommandResponse(kOKInsertResponse);
auto insertRes = txnClient
.runCommand("user"_sd,
@@ -1842,13 +1858,17 @@ TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) {
.get();
ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned.
assertTxnMetadata(mockClient()->getLastSentRequest(),
- 1 /* txnNumber */,
+ attempt + 1 /* txnNumber */,
true /* startTransaction */,
boost::none /* readConcern */,
boost::none /* writeConcern */,
maxTimeMS /* maxTimeMS */);
assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone);
+ // Throw a transient error more times than the overriden retry limit to verify the limit
+ // is disabled when a deadline is set.
+ uassert(ErrorCodes::HostUnreachable, "Host unreachable error", attempt > 3);
+
mockClock->advance(Milliseconds(1000));
// The commit response.
@@ -1860,7 +1880,7 @@ TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) {
auto lastRequest = mockClient()->getLastSentRequest();
assertTxnMetadata(lastRequest,
- 1 /* txnNumber */,
+ attempt + 1 /* txnNumber */,
boost::none /* startTransaction */,
boost::none /* readConcern */,
WriteConcernOptions().toBSON() /* writeConcern */,
diff --git a/src/mongo/s/commands/internal_transactions_test_command.h b/src/mongo/s/commands/internal_transactions_test_command.h
index c5bf8b61951..63b88c44d51 100644
--- a/src/mongo/s/commands/internal_transactions_test_command.h
+++ b/src/mongo/s/commands/internal_transactions_test_command.h
@@ -34,6 +34,8 @@
#include "mongo/db/commands/internal_transactions_test_command_gen.h"
#include "mongo/db/query/find_command_gen.h"
#include "mongo/db/transaction_api.h"
+#include "mongo/executor/network_interface_factory.h"
+#include "mongo/executor/thread_pool_task_executor.h"
#include "mongo/logv2/log.h"
#include "mongo/s/grid.h"
#include "mongo/stdx/future.h"
@@ -63,8 +65,8 @@ public:
auto sharedBlock = std::make_shared<SharedBlock>(Base::request().getCommandInfos());
const auto executor = Grid::get(opCtx)->isShardingInitialized()
- ? static_cast<ExecutorPtr>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor())
- : static_cast<ExecutorPtr>(getTransactionExecutor());
+ ? Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()
+ : getTransactionExecutor();
// If internalTransactionsTestCommand is received by a mongod, it should be instantiated
// with the TransactionParticipant's resource yielder. If on a mongos, txn should be
@@ -141,10 +143,10 @@ public:
ActionType::internal));
}
- const std::shared_ptr<ThreadPool>& getTransactionExecutor() {
+ std::shared_ptr<executor::TaskExecutor> getTransactionExecutor() {
static Mutex mutex =
MONGO_MAKE_LATCH("InternalTransactionsTestCommandExecutor::_mutex");
- static std::shared_ptr<ThreadPool> executor;
+ static std::shared_ptr<executor::ThreadPoolTaskExecutor> executor;
stdx::lock_guard<Latch> lg(mutex);
if (!executor) {
@@ -152,7 +154,9 @@ public:
options.poolName = "InternalTransaction";
options.minThreads = 0;
options.maxThreads = 4;
- executor = std::make_shared<ThreadPool>(std::move(options));
+ executor = std::make_shared<executor::ThreadPoolTaskExecutor>(
+ std::make_unique<ThreadPool>(std::move(options)),
+ executor::makeNetworkInterface("InternalTransactionNetwork"));
executor->startup();
}
return executor;
diff --git a/src/mongo/s/commands/internal_transactions_test_command_s.cpp b/src/mongo/s/commands/internal_transactions_test_command_s.cpp
index 7ebc42e2ffb..e6cc0313a4a 100644
--- a/src/mongo/s/commands/internal_transactions_test_command_s.cpp
+++ b/src/mongo/s/commands/internal_transactions_test_command_s.cpp
@@ -36,10 +36,11 @@ namespace {
class InternalTransactionsTestCommandS
: public InternalTransactionsTestCommandBase<InternalTransactionsTestCommandS> {
public:
- static txn_api::SyncTransactionWithRetries getTxn(OperationContext* opCtx,
- ExecutorPtr executor,
- StringData commandName,
- bool useClusterClient) {
+ static txn_api::SyncTransactionWithRetries getTxn(
+ OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> executor,
+ StringData commandName,
+ bool useClusterClient) {
return txn_api::SyncTransactionWithRetries(
opCtx, executor, TransactionRouterResourceYielder::makeForLocalHandoff());
}