diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2022-05-11 14:05:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-05-11 22:18:10 +0000 |
commit | 27c96773225b595d1d58691ece2e70fea389fd43 (patch) | |
tree | 3b0cc633da8455e50bf14613fbb0d6206991a606 | |
parent | 5bef21c71bb033ee142003eac2f6a2fe42606ebe (diff) | |
download | mongo-27c96773225b595d1d58691ece2e70fea389fd43.tar.gz |
SERVER-66316 Make transactions API interact better with Client Side Operation Timeout
(cherry picked from commit 2102f99116f9c8a4409ef2376abf7f2f1c9590c4)
-rw-r--r-- | jstests/concurrency/fsm_workloads/internal_transactions_sharded.js | 8 | ||||
-rw-r--r-- | src/mongo/db/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/internal_transactions_test_command_d.cpp | 9 | ||||
-rw-r--r-- | src/mongo/db/fle_crud_mongod.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.cpp | 38 | ||||
-rw-r--r-- | src/mongo/db/transaction_api.h | 23 | ||||
-rw-r--r-- | src/mongo/db/transaction_api_test.cpp | 58 | ||||
-rw-r--r-- | src/mongo/s/commands/internal_transactions_test_command.h | 14 | ||||
-rw-r--r-- | src/mongo/s/commands/internal_transactions_test_command_s.cpp | 9 |
9 files changed, 111 insertions, 67 deletions
diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js index 856f7ecfccd..4c9b11f490a 100644 --- a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js +++ b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js @@ -89,10 +89,6 @@ var $config = extendWorkload($config, function($config, $super) { })); this.originalStoreFindAndModifyImagesInSideCollection[db.getMongo().host] = res.was; }); - - cluster.executeOnMongosNodes((db) => { - configureFailPoint(db, "skipTransactionApiRetryCheckInHandleError"); - }); }; $config.teardown = function teardown(db, collName, cluster) { @@ -103,10 +99,6 @@ var $config = extendWorkload($config, function($config, $super) { this.originalStoreFindAndModifyImagesInSideCollection[db.getMongo().host] })); }); - - cluster.executeOnMongosNodes((db) => { - configureFailPoint(db, "skipTransactionApiRetryCheckInHandleError", {}, "off"); - }); }; return $config; diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index b08a246c84d..8d06e55d79b 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -858,6 +858,7 @@ env.Library( '$BUILD_DIR/mongo/db/auth/auth', '$BUILD_DIR/mongo/db/query/command_request_response', '$BUILD_DIR/mongo/db/query/query_request', + '$BUILD_DIR/mongo/executor/task_executor_interface', '$BUILD_DIR/mongo/rpc/command_status', '$BUILD_DIR/mongo/rpc/rpc', '$BUILD_DIR/mongo/transport/service_entry_point', @@ -904,7 +905,9 @@ env.Library( '$BUILD_DIR/mongo/crypto/fle_crypto', '$BUILD_DIR/mongo/db/query/query_request', '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/executor/network_interface_factory', '$BUILD_DIR/mongo/executor/task_executor_pool', + '$BUILD_DIR/mongo/executor/thread_pool_task_executor', "$BUILD_DIR/mongo/util/concurrency/thread_pool", 'fle_crud', 'logical_session_id', diff --git a/src/mongo/db/commands/internal_transactions_test_command_d.cpp b/src/mongo/db/commands/internal_transactions_test_command_d.cpp index 4a8570cc87f..5f94e36c78c 100644 --- a/src/mongo/db/commands/internal_transactions_test_command_d.cpp +++ b/src/mongo/db/commands/internal_transactions_test_command_d.cpp @@ -39,10 +39,11 @@ namespace { class InternalTransactionsTestCommandD : public InternalTransactionsTestCommandBase<InternalTransactionsTestCommandD> { public: - static txn_api::SyncTransactionWithRetries getTxn(OperationContext* opCtx, - ExecutorPtr executor, - StringData commandName, - bool useClusterClient) { + static txn_api::SyncTransactionWithRetries getTxn( + OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + StringData commandName, + bool useClusterClient) { // If a sharded mongod is acting as a mongos, it will need special routing behaviors. if (useClusterClient) { return txn_api::SyncTransactionWithRetries( diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp index d198a35f398..e8ac3a9b77b 100644 --- a/src/mongo/db/fle_crud_mongod.cpp +++ b/src/mongo/db/fle_crud_mongod.cpp @@ -52,6 +52,8 @@ #include "mongo/db/transaction_api.h" #include "mongo/db/transaction_participant.h" #include "mongo/db/transaction_participant_resource_yielder.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" #include "mongo/idl/idl_parser.h" #include "mongo/s/grid.h" #include "mongo/s/transaction_router_resource_yielder.h" @@ -62,7 +64,7 @@ namespace mongo { namespace { -std::shared_ptr<ThreadPool> _fleCrudthreadPool; +std::shared_ptr<executor::ThreadPoolTaskExecutor> _fleCrudExecutor; ThreadPool::Options getThreadPoolOptions() { ThreadPool::Options tpOptions; @@ -145,7 +147,7 @@ private: std::shared_ptr<txn_api::SyncTransactionWithRetries> getTransactionWithRetriesForMongoD( OperationContext* opCtx) { return std::make_shared<txn_api::SyncTransactionWithRetries>( - opCtx, _fleCrudthreadPool, std::make_unique<FLEMongoDResourceYielder>()); + opCtx, _fleCrudExecutor, std::make_unique<FLEMongoDResourceYielder>()); } void startFLECrud(ServiceContext* serviceContext) { @@ -155,14 +157,16 @@ void startFLECrud(ServiceContext* serviceContext) { return; } - _fleCrudthreadPool = std::make_shared<ThreadPool>(getThreadPoolOptions()); - _fleCrudthreadPool->startup(); + _fleCrudExecutor = std::make_shared<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(getThreadPoolOptions()), + executor::makeNetworkInterface("FLECrudNetwork")); + _fleCrudExecutor->startup(); } void stopFLECrud() { // Check if it was started - if (_fleCrudthreadPool.get() != nullptr) { - _fleCrudthreadPool->shutdown(); + if (_fleCrudExecutor.get() != nullptr) { + _fleCrudExecutor->shutdown(); } } diff --git a/src/mongo/db/transaction_api.cpp b/src/mongo/db/transaction_api.cpp index c67621e8a0b..9208c7936a5 100644 --- a/src/mongo/db/transaction_api.cpp +++ b/src/mongo/db/transaction_api.cpp @@ -65,12 +65,17 @@ // TODO SERVER-65395: Remove failpoint when fle2 tests can reliably support internal transaction // retry limit. MONGO_FAIL_POINT_DEFINE(skipTransactionApiRetryCheckInHandleError); +MONGO_FAIL_POINT_DEFINE(overrideTransactionApiMaxRetriesToThree); -namespace mongo::txn_api { +namespace mongo { + +const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); + +namespace txn_api { SyncTransactionWithRetries::SyncTransactionWithRetries( OperationContext* opCtx, - ExecutorPtr executor, + std::shared_ptr<executor::TaskExecutor> executor, std::unique_ptr<ResourceYielder> resourceYielder, std::unique_ptr<TransactionClient> txnClient) : _resourceYielder(std::move(resourceYielder)), @@ -160,11 +165,12 @@ std::string Transaction::_transactionStateToString(TransactionState txnState) co MONGO_UNREACHABLE; } -void logNextStep(Transaction::ErrorHandlingStep nextStep, const BSONObj& txnInfo) { +void logNextStep(Transaction::ErrorHandlingStep nextStep, const BSONObj& txnInfo, int attempts) { LOGV2(5918600, "Chose internal transaction error handling step", "nextStep"_attr = errorHandlingStepToString(nextStep), - "txnInfo"_attr = txnInfo); + "txnInfo"_attr = txnInfo, + "attempts"_attr = attempts); } SemiFuture<CommitResult> TransactionWithRetries::run(Callback callback) noexcept { @@ -181,6 +187,7 @@ SemiFuture<CommitResult> TransactionWithRetries::run(Callback callback) noexcept invariant(txnStatus != ErrorCodes::TransactionAPIMustRetryCommit); return txnStatus.isOK() || txnStatus != ErrorCodes::TransactionAPIMustRetryTransaction; }) + .withBackoffBetweenIterations(kExponentialBackoff) // Cancellation happens by interrupting the caller's opCtx. .on(_executor, CancellationToken::uncancelable()) // Safe to inline because the continuation only holds state. @@ -193,7 +200,7 @@ ExecutorFuture<void> TransactionWithRetries::_runBodyHandleErrors(int bodyAttemp return _internalTxn->runCallback().thenRunOn(_executor).onError( [this, bodyAttempts](Status bodyStatus) { auto nextStep = _internalTxn->handleError(bodyStatus, bodyAttempts); - logNextStep(nextStep, _internalTxn->reportStateForLog()); + logNextStep(nextStep, _internalTxn->reportStateForLog(), bodyAttempts); if (nextStep == Transaction::ErrorHandlingStep::kDoNotRetry) { iassert(bodyStatus); @@ -220,7 +227,7 @@ ExecutorFuture<CommitResult> TransactionWithRetries::_runCommitHandleErrors(int } auto nextStep = _internalTxn->handleError(swCommitResult, commitAttempts); - logNextStep(nextStep, _internalTxn->reportStateForLog()); + logNextStep(nextStep, _internalTxn->reportStateForLog(), commitAttempts); if (nextStep == Transaction::ErrorHandlingStep::kDoNotRetry) { return ExecutorFuture<CommitResult>(_executor, swCommitResult); @@ -249,6 +256,7 @@ ExecutorFuture<CommitResult> TransactionWithRetries::_runCommitWithRetries() { .until([](StatusWith<CommitResult> swResult) { return swResult.isOK() || swResult != ErrorCodes::TransactionAPIMustRetryCommit; }) + .withBackoffBetweenIterations(kExponentialBackoff) // Cancellation happens by interrupting the caller's opCtx. .on(_executor, CancellationToken::uncancelable()); } @@ -477,6 +485,12 @@ SemiFuture<void> Transaction::runCallback() { .semi(); } +int getMaxRetries() { + // Allow overriding the number of retries so unit tests can exhaust them faster. + return MONGO_unlikely(overrideTransactionApiMaxRetriesToThree.shouldFail()) ? 3 + : kTxnRetryLimit; +} + Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitResult>& swResult, int attemptCounter) const noexcept { stdx::lock_guard<Latch> lg(_mutex); @@ -489,17 +503,16 @@ Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitR "hasTransientTransactionErrorLabel"_attr = _latestResponseHasTransientTransactionErrorLabel, "txnInfo"_attr = _reportStateForLog(lg), - "retriesLeft"_attr = - kTxnRetryLimit - attemptCounter + 1 // To account for the initial execution. - ); + "attempts"_attr = attemptCounter); if (_execContext == ExecutionContext::kClientTransaction) { // If we're nested in another transaction, let the outer most client decide on errors. return ErrorHandlingStep::kDoNotRetry; } - if (!MONGO_unlikely(skipTransactionApiRetryCheckInHandleError.shouldFail()) && - attemptCounter > kTxnRetryLimit) { + // If the op has a deadline, retry until it is reached regardless of the number of attempts. + if (attemptCounter > getMaxRetries() && !_opDeadline && + !MONGO_unlikely(skipTransactionApiRetryCheckInHandleError.shouldFail())) { return _isInCommit() ? ErrorHandlingStep::kDoNotRetry : ErrorHandlingStep::kAbortAndDoNotRetry; } @@ -745,4 +758,5 @@ Transaction::~Transaction() { } } // namespace details -} // namespace mongo::txn_api +} // namespace txn_api +} // namespace mongo diff --git a/src/mongo/db/transaction_api.h b/src/mongo/db/transaction_api.h index 859aa8ecd7f..c224949863a 100644 --- a/src/mongo/db/transaction_api.h +++ b/src/mongo/db/transaction_api.h @@ -34,6 +34,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/find_command_gen.h" #include "mongo/db/resource_yielder.h" +#include "mongo/executor/task_executor.h" #include "mongo/rpc/write_concern_error_detail.h" #include "mongo/s/write_ops/batched_command_request.h" #include "mongo/s/write_ops/batched_command_response.h" @@ -46,8 +47,12 @@ class TxnMetadataHooks; class TransactionWithRetries; } // namespace details -// Max number of retries allowed for a transaction operation. -static constexpr int kTxnRetryLimit = 10; +// Max number of retries allowed for a transaction operation. The API uses exponential backoffs +// capped at 1 second for transient error and commit network error retries, so this corresponds to +// roughly 2 minutes of sleeps in total between retries meant to loosely mirror the 2 minute timeout +// used by the driver's convenient transactions API: +// https://github.com/mongodb/specifications/blob/92d77a6d/source/transactions-convenient-api/transactions-convenient-api.rst +static constexpr int kTxnRetryLimit = 120; static constexpr auto kMaxTimeMSField = "maxTimeMS"; /** @@ -156,7 +161,7 @@ public: * */ SyncTransactionWithRetries(OperationContext* opCtx, - ExecutorPtr executor, + std::shared_ptr<executor::TaskExecutor> executor, std::unique_ptr<ResourceYielder> resourceYielder, std::unique_ptr<TransactionClient> txnClient = nullptr); @@ -238,7 +243,7 @@ public: class SEPTransactionClient : public TransactionClient { public: SEPTransactionClient(OperationContext* opCtx, - ExecutorPtr executor, + std::shared_ptr<executor::TaskExecutor> executor, std::unique_ptr<SEPTransactionClientBehaviors> behaviors) : _serviceContext(opCtx->getServiceContext()), _executor(executor), @@ -269,7 +274,7 @@ public: private: ServiceContext* const _serviceContext; - ExecutorPtr _executor; + std::shared_ptr<executor::TaskExecutor> _executor; std::unique_ptr<SEPTransactionClientBehaviors> _behaviors; std::unique_ptr<details::TxnMetadataHooks> _hooks; std::unique_ptr<CancelableOperationContextFactory> _cancelableOpCtxFactory; @@ -304,7 +309,7 @@ public: * and infers its execution context from the given OperationContext. */ Transaction(OperationContext* opCtx, - ExecutorPtr executor, + std::shared_ptr<executor::TaskExecutor> executor, std::unique_ptr<TransactionClient> txnClient) : _executor(executor), _txnClient(std::move(txnClient)), @@ -414,7 +419,7 @@ private: */ void _primeTransaction(OperationContext* opCtx); - const ExecutorPtr _executor; + const std::shared_ptr<executor::TaskExecutor> _executor; std::unique_ptr<TransactionClient> _txnClient; Callback _callback; @@ -464,7 +469,7 @@ public: TransactionWithRetries operator=(const TransactionWithRetries&) = delete; TransactionWithRetries(OperationContext* opCtx, - ExecutorPtr executor, + std::shared_ptr<executor::TaskExecutor> executor, std::unique_ptr<TransactionClient> txnClient) : _internalTxn(std::make_shared<Transaction>(opCtx, executor, std::move(txnClient))), _executor(executor) {} @@ -499,7 +504,7 @@ private: ExecutorFuture<void> _bestEffortAbort(); std::shared_ptr<Transaction> _internalTxn; - ExecutorPtr _executor; + std::shared_ptr<executor::TaskExecutor> _executor; }; } // namespace details diff --git a/src/mongo/db/transaction_api_test.cpp b/src/mongo/db/transaction_api_test.cpp index 14e559ed75d..a417a4d3028 100644 --- a/src/mongo/db/transaction_api_test.cpp +++ b/src/mongo/db/transaction_api_test.cpp @@ -39,6 +39,8 @@ #include "mongo/db/service_context.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/db/transaction_api.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/is_mongos.h" #include "mongo/stdx/future.h" @@ -312,24 +314,28 @@ protected: options.minThreads = 1; options.maxThreads = 1; - _threadPool = std::make_shared<ThreadPool>(std::move(options)); - _threadPool->startup(); + _executor = std::make_shared<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(std::move(options)), + executor::makeNetworkInterface("TxnAPITestNetwork")); + _executor->startup(); - auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>( - opCtx(), _threadPool, nullptr); + auto mockClient = + std::make_unique<txn_api::details::MockTransactionClient>(opCtx(), _executor, nullptr); _mockClient = mockClient.get(); _txnWithRetries = std::make_unique<txn_api::SyncTransactionWithRetries>( - opCtx(), _threadPool, nullptr /* resourceYielder */, std::move(mockClient)); + opCtx(), _executor, nullptr /* resourceYielder */, std::move(mockClient)); } void tearDown() override { - _threadPool->shutdown(); - _threadPool->join(); - _threadPool.reset(); + _executor->shutdown(); + _executor->join(); + _executor.reset(); } void waitForAllEarlierTasksToComplete() { - _threadPool->waitForIdle(); + while (_executor->hasTasks()) { + continue; + } } OperationContext* opCtx() { @@ -349,8 +355,8 @@ protected: } void resetTxnWithRetries(std::unique_ptr<MockResourceYielder> resourceYielder = nullptr) { - auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>( - opCtx(), _threadPool, nullptr); + auto mockClient = + std::make_unique<txn_api::details::MockTransactionClient>(opCtx(), _executor, nullptr); _mockClient = mockClient.get(); if (resourceYielder) { _resourceYielder = resourceYielder.get(); @@ -365,7 +371,7 @@ protected: // pool. This ensures that we can predictably monitor txnNumber's value. _txnWithRetries = nullptr; _txnWithRetries = std::make_unique<txn_api::SyncTransactionWithRetries>( - opCtx(), _threadPool, std::move(resourceYielder), std::move(mockClient)); + opCtx(), _executor, std::move(resourceYielder), std::move(mockClient)); } void expectSentAbort(TxnNumber txnNumber, BSONObj writeConcern) { @@ -380,7 +386,7 @@ protected: private: ServiceContext::UniqueOperationContext _opCtx; - std::shared_ptr<ThreadPool> _threadPool; + std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor; txn_api::details::MockTransactionClient* _mockClient{nullptr}; MockResourceYielder* _resourceYielder{nullptr}; std::unique_ptr<txn_api::SyncTransactionWithRetries> _txnWithRetries; @@ -1751,6 +1757,8 @@ TEST_F(TxnAPITest, TestExhaustiveFindErrorOnGetMore) { } TEST_F(TxnAPITest, OwnSession_StartTransactionRetryLimitOnTransientErrors) { + FailPointEnableBlock fp("overrideTransactionApiMaxRetriesToThree"); + int retryCount = 0; auto swResult = txnWithRetries().runNoThrow( opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { @@ -1770,12 +1778,13 @@ TEST_F(TxnAPITest, OwnSession_StartTransactionRetryLimitOnTransientErrors) { // The transient error should have been propagated. ASSERT_EQ(swResult.getStatus(), ErrorCodes::HostUnreachable); - // We get 11 due to the initial try and then 10 follow up retries. - ASSERT_EQ(retryCount, 11); + // We get 4 due to the initial try and then 3 follow up retries because + // overrideTransactionApiMaxRetriesToThree sets the max retry attempts to 3. + ASSERT_EQ(retryCount, 4); auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, - 10 /* txnNumber */, + 3 /* txnNumber */, boost::none /* startTransaction */, boost::none /* readConcern */, WriteConcernOptions().toBSON() /* writeConcern */); @@ -1784,6 +1793,8 @@ TEST_F(TxnAPITest, OwnSession_StartTransactionRetryLimitOnTransientErrors) { } TEST_F(TxnAPITest, OwnSession_CommitTransactionRetryLimitOnTransientErrors) { + FailPointEnableBlock fp("overrideTransactionApiMaxRetriesToThree"); + // If we are able to successfully finish this test, then we know that we have limited our // retries. auto swResult = txnWithRetries().runNoThrow( @@ -1820,7 +1831,9 @@ TEST_F(TxnAPITest, OwnSession_CommitTransactionRetryLimitOnTransientErrors) { ASSERT_EQ(lastRequest.firstElementFieldNameStringData(), "commitTransaction"_sd); } -TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) { +TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadlineAndIgnoresDefaultRetryLimit) { + FailPointEnableBlock fp("overrideTransactionApiMaxRetriesToThree"); + const std::shared_ptr<ClockSourceMock> mockClock = std::make_shared<ClockSourceMock>(); mockClock->reset(getServiceContext()->getFastClockSource()->now()); getServiceContext()->setFastClockSource(std::make_unique<SharedClockSourceAdapter>(mockClock)); @@ -1831,8 +1844,11 @@ TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) { // txnNumber will be incremented upon the release of a session. resetTxnWithRetries(); + int attempt = -1; auto swResult = txnWithRetries().runNoThrow( opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { + attempt += 1; + mockClient()->setNextCommandResponse(kOKInsertResponse); auto insertRes = txnClient .runCommand("user"_sd, @@ -1842,13 +1858,17 @@ TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) { .get(); ASSERT_EQ(insertRes["n"].Int(), 1); // Verify the mocked response was returned. assertTxnMetadata(mockClient()->getLastSentRequest(), - 1 /* txnNumber */, + attempt + 1 /* txnNumber */, true /* startTransaction */, boost::none /* readConcern */, boost::none /* writeConcern */, maxTimeMS /* maxTimeMS */); assertSessionIdMetadata(mockClient()->getLastSentRequest(), LsidAssertion::kStandalone); + // Throw a transient error more times than the overriden retry limit to verify the limit + // is disabled when a deadline is set. + uassert(ErrorCodes::HostUnreachable, "Host unreachable error", attempt > 3); + mockClock->advance(Milliseconds(1000)); // The commit response. @@ -1860,7 +1880,7 @@ TEST_F(TxnAPITest, MaxTimeMSIsSetIfOperationContextHasDeadline) { auto lastRequest = mockClient()->getLastSentRequest(); assertTxnMetadata(lastRequest, - 1 /* txnNumber */, + attempt + 1 /* txnNumber */, boost::none /* startTransaction */, boost::none /* readConcern */, WriteConcernOptions().toBSON() /* writeConcern */, diff --git a/src/mongo/s/commands/internal_transactions_test_command.h b/src/mongo/s/commands/internal_transactions_test_command.h index c5bf8b61951..63b88c44d51 100644 --- a/src/mongo/s/commands/internal_transactions_test_command.h +++ b/src/mongo/s/commands/internal_transactions_test_command.h @@ -34,6 +34,8 @@ #include "mongo/db/commands/internal_transactions_test_command_gen.h" #include "mongo/db/query/find_command_gen.h" #include "mongo/db/transaction_api.h" +#include "mongo/executor/network_interface_factory.h" +#include "mongo/executor/thread_pool_task_executor.h" #include "mongo/logv2/log.h" #include "mongo/s/grid.h" #include "mongo/stdx/future.h" @@ -63,8 +65,8 @@ public: auto sharedBlock = std::make_shared<SharedBlock>(Base::request().getCommandInfos()); const auto executor = Grid::get(opCtx)->isShardingInitialized() - ? static_cast<ExecutorPtr>(Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()) - : static_cast<ExecutorPtr>(getTransactionExecutor()); + ? Grid::get(opCtx)->getExecutorPool()->getFixedExecutor() + : getTransactionExecutor(); // If internalTransactionsTestCommand is received by a mongod, it should be instantiated // with the TransactionParticipant's resource yielder. If on a mongos, txn should be @@ -141,10 +143,10 @@ public: ActionType::internal)); } - const std::shared_ptr<ThreadPool>& getTransactionExecutor() { + std::shared_ptr<executor::TaskExecutor> getTransactionExecutor() { static Mutex mutex = MONGO_MAKE_LATCH("InternalTransactionsTestCommandExecutor::_mutex"); - static std::shared_ptr<ThreadPool> executor; + static std::shared_ptr<executor::ThreadPoolTaskExecutor> executor; stdx::lock_guard<Latch> lg(mutex); if (!executor) { @@ -152,7 +154,9 @@ public: options.poolName = "InternalTransaction"; options.minThreads = 0; options.maxThreads = 4; - executor = std::make_shared<ThreadPool>(std::move(options)); + executor = std::make_shared<executor::ThreadPoolTaskExecutor>( + std::make_unique<ThreadPool>(std::move(options)), + executor::makeNetworkInterface("InternalTransactionNetwork")); executor->startup(); } return executor; diff --git a/src/mongo/s/commands/internal_transactions_test_command_s.cpp b/src/mongo/s/commands/internal_transactions_test_command_s.cpp index 7ebc42e2ffb..e6cc0313a4a 100644 --- a/src/mongo/s/commands/internal_transactions_test_command_s.cpp +++ b/src/mongo/s/commands/internal_transactions_test_command_s.cpp @@ -36,10 +36,11 @@ namespace { class InternalTransactionsTestCommandS : public InternalTransactionsTestCommandBase<InternalTransactionsTestCommandS> { public: - static txn_api::SyncTransactionWithRetries getTxn(OperationContext* opCtx, - ExecutorPtr executor, - StringData commandName, - bool useClusterClient) { + static txn_api::SyncTransactionWithRetries getTxn( + OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> executor, + StringData commandName, + bool useClusterClient) { return txn_api::SyncTransactionWithRetries( opCtx, executor, TransactionRouterResourceYielder::makeForLocalHandoff()); } |