diff options
author | Jack Mulrow <jack.mulrow@mongodb.com> | 2023-04-24 20:32:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-05-04 16:35:50 +0000 |
commit | 2fafe2adeffb424d16bf14ef4686caf645adfba5 (patch) | |
tree | 9cec7377b1a37290850dd9a67de1d19923a2f809 /src/mongo | |
parent | 6adceba617ddf90c2d739ccf7b383f4a3b5f189b (diff) | |
download | mongo-2fafe2adeffb424d16bf14ef4686caf645adfba5.tar.gz |
SERVER-75676 Run transaction API cleanup tasks on out of line executor
Diffstat (limited to 'src/mongo')
19 files changed, 106 insertions, 150 deletions
diff --git a/src/mongo/db/commands/internal_transactions_test_command_d.cpp b/src/mongo/db/commands/internal_transactions_test_command_d.cpp index 6f7a4ad8bab..963a309b8ef 100644 --- a/src/mongo/db/commands/internal_transactions_test_command_d.cpp +++ b/src/mongo/db/commands/internal_transactions_test_command_d.cpp @@ -43,12 +43,12 @@ public: StringData commandName, bool useClusterClient) { auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); // If a sharded mongod is acting as a mongos, it will need special routing behaviors. if (useClusterClient) { + auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); return txn_api::SyncTransactionWithRetries( opCtx, - sleepInlineExecutor, + executor, TransactionParticipantResourceYielder::make(commandName), inlineExecutor, std::make_unique<txn_api::details::SEPTransactionClient>( @@ -61,7 +61,7 @@ public: return txn_api::SyncTransactionWithRetries( opCtx, - sleepInlineExecutor, + executor, TransactionParticipantResourceYielder::make(commandName), inlineExecutor); } diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp index 7c41c7374d7..06610a08590 100644 --- a/src/mongo/db/fle_crud.cpp +++ b/src/mongo/db/fle_crud.cpp @@ -240,12 +240,12 @@ std::vector<std::vector<FLEEdgeCountInfo>> toEdgeCounts( std::shared_ptr<txn_api::SyncTransactionWithRetries> getTransactionWithRetriesForMongoS( OperationContext* opCtx) { + auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto fleInlineCrudExecutor = std::make_shared<executor::InlineExecutor>(); return std::make_shared<txn_api::SyncTransactionWithRetries>( opCtx, - fleInlineCrudExecutor->getSleepableExecutor( - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()), + executor, TransactionRouterResourceYielder::makeForLocalHandoff(), fleInlineCrudExecutor); } diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp index a13e25c6685..1aea4755bc5 100644 --- a/src/mongo/db/fle_crud_mongod.cpp +++ b/src/mongo/db/fle_crud_mongod.cpp @@ -336,11 +336,10 @@ std::shared_ptr<txn_api::SyncTransactionWithRetries> getTransactionWithRetriesFo OperationContext* opCtx) { auto fleInlineCrudExecutor = std::make_shared<executor::InlineExecutor>(); - auto inlineSleepExecutor = fleInlineCrudExecutor->getSleepableExecutor(_fleCrudExecutor); return std::make_shared<txn_api::SyncTransactionWithRetries>( opCtx, - inlineSleepExecutor, + _fleCrudExecutor, std::make_unique<FLEMongoDResourceYielder>(), fleInlineCrudExecutor); } diff --git a/src/mongo/db/s/config/configsvr_commit_index_command.cpp b/src/mongo/db/s/config/configsvr_commit_index_command.cpp index 4d39704890c..d85bbb576df 100644 --- a/src/mongo/db/s/config/configsvr_commit_index_command.cpp +++ b/src/mongo/db/s/config/configsvr_commit_index_command.cpp @@ -94,11 +94,9 @@ void commitIndexInTransaction(OperationContext* opCtx, }()}); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); - txn_api::SyncTransactionWithRetries txn( opCtx, - sleepInlineExecutor, + executor, TransactionParticipantResourceYielder::make("commitIndexCatalogEntry"), inlineExecutor); diff --git a/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp b/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp index 7875a4e8178..4528c761e61 100644 --- a/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp +++ b/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp @@ -86,11 +86,10 @@ void dropIndexInTransaction(OperationContext* opCtx, }()}); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); txn_api::SyncTransactionWithRetries txn( opCtx, - sleepInlineExecutor, + executor, TransactionParticipantResourceYielder::make("dropIndexCatalogEntry"), inlineExecutor); diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp index 6482c797c0f..efd96236e44 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp @@ -557,11 +557,10 @@ void setInitializationTimeOnPlacementHistory( ScopeGuard resetWriteConcerGuard([opCtx, &originalWC] { opCtx->setWriteConcern(originalWC); }); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor( - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); + auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); txn_api::SyncTransactionWithRetries txn( - opCtx, sleepInlineExecutor, nullptr /* resourceYielder */, inlineExecutor); + opCtx, executor, nullptr /* resourceYielder */, inlineExecutor); txn.run(opCtx, transactionChain); LOGV2(7068807, @@ -1113,11 +1112,10 @@ void ShardingCatalogManager::withTransactionAPI(OperationContext* opCtx, const NamespaceString& namespaceForInitialFind, txn_api::Callback callback) { auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor( - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); + auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto txn = txn_api::SyncTransactionWithRetries( - opCtx, sleepInlineExecutor, nullptr /* resourceYielder */, inlineExecutor); + opCtx, executor, nullptr /* resourceYielder */, inlineExecutor); txn.run(opCtx, [innerCallback = std::move(callback), namespaceForInitialFind](const txn_api::TransactionClient& txnClient, diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 19806d4d8ec..23846d11a1a 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -458,9 +458,8 @@ void mergeAllChunksOnShardInTransaction(OperationContext* opCtx, auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); - txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor); + txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor); txn.run(opCtx, updateChunksFn); } @@ -676,9 +675,8 @@ ShardingCatalogManager::_splitChunkInTransaction(OperationContext* opCtx, auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); - txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor); + txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor); // TODO: SERVER-72431 Make split chunk commit idempotent, with that we won't need anymore the // transaction precondition and we will be able to remove the try/catch on the transaction run @@ -904,9 +902,8 @@ void ShardingCatalogManager::_mergeChunksInTransaction( auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); - txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor); + txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor); txn.run(opCtx, updateChunksFn); } @@ -2261,11 +2258,10 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk( }; auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); txn_api::SyncTransactionWithRetries txn( opCtx, - sleepInlineExecutor, + executor, TransactionParticipantResourceYielder::make("setAllowMigrationsAndBumpOneChunk"), inlineExecutor); @@ -2490,9 +2486,8 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction( auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); - txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor); + txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor); txn.run(opCtx, transactionChain); } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp index 3f1753b5dcd..2f77383c745 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp @@ -271,10 +271,9 @@ DatabaseType ShardingCatalogManager::createDatabase( auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); txn_api::SyncTransactionWithRetries txn( - opCtx, sleepInlineExecutor, nullptr /*resourceYielder*/, inlineExecutor); + opCtx, executor, nullptr /*resourceYielder*/, inlineExecutor); txn.run(opCtx, transactionChain); hangBeforeNotifyingCreateDatabaseCommitted.pauseWhileSet(); @@ -411,10 +410,9 @@ void ShardingCatalogManager::commitMovePrimary(OperationContext* opCtx, auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); txn_api::SyncTransactionWithRetries txn(opCtx, - sleepInlineExecutor, + executor, nullptr, /*resourceYielder*/ inlineExecutor); txn.run(opCtx, transactionChain); diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp index 01702c95259..d964c31d4c6 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp @@ -1508,9 +1508,8 @@ void ShardingCatalogManager::_addShardInTransaction( auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); - txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor); + txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor); txn.run(opCtx, transactionChain); hangBeforeNotifyingaddShardCommitted.pauseWhileSet(); @@ -1571,11 +1570,10 @@ void ShardingCatalogManager::_removeShardInTransaction(OperationContext* opCtx, .semi(); }; + auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor( - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); - txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor); + txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor); txn.run(opCtx, removeShardFn); } diff --git a/src/mongo/db/s/global_index/global_index_inserter.cpp b/src/mongo/db/s/global_index/global_index_inserter.cpp index ec9d4883d63..673810360cd 100644 --- a/src/mongo/db/s/global_index/global_index_inserter.cpp +++ b/src/mongo/db/s/global_index/global_index_inserter.cpp @@ -103,9 +103,7 @@ void GlobalIndexInserter::processDoc(OperationContext* opCtx, }; auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(_executor); - - txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor); + txn_api::SyncTransactionWithRetries txn(opCtx, _executor, nullptr, inlineExecutor); txn.run(opCtx, insertToGlobalIndexFn); } diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp index 03b011f1f73..835728a8da1 100644 --- a/src/mongo/db/s/sharding_ddl_util.cpp +++ b/src/mongo/db/s/sharding_ddl_util.cpp @@ -896,7 +896,6 @@ void runTransactionOnShardingCatalog(OperationContext* opCtx, }(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); // Instantiate the right custom TXN client to ensure that the queries to the config DB will be // routed to the CSRS. @@ -909,6 +908,7 @@ void runTransactionOnShardingCatalog(OperationContext* opCtx, return nullptr; } + auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); return std::make_unique<txn_api::details::SEPTransactionClient>( newOpCtx, inlineExecutor, @@ -925,7 +925,7 @@ void runTransactionOnShardingCatalog(OperationContext* opCtx, newOpCtx->setWriteConcern(writeConcern); txn_api::SyncTransactionWithRetries txn(newOpCtx, - sleepInlineExecutor, + executor, nullptr /*resourceYielder*/, inlineExecutor, std::move(customTxnClient)); diff --git a/src/mongo/db/transaction/transaction_api.cpp b/src/mongo/db/transaction/transaction_api.cpp index 6550f900611..17e50980c99 100644 --- a/src/mongo/db/transaction/transaction_api.cpp +++ b/src/mongo/db/transaction/transaction_api.cpp @@ -86,13 +86,14 @@ void runFutureInline(executor::InlineExecutor* inlineExecutor, Notification<void SyncTransactionWithRetries::SyncTransactionWithRetries( OperationContext* opCtx, - std::shared_ptr<executor::InlineExecutor::SleepableExecutor> sleepableExecutor, + std::shared_ptr<executor::TaskExecutor> sleepAndCleanupExecutor, std::unique_ptr<ResourceYielder> resourceYielder, std::shared_ptr<executor::InlineExecutor> inlineExecutor, std::unique_ptr<TransactionClient> txnClient) : _resourceYielder(std::move(resourceYielder)), _inlineExecutor(inlineExecutor), - _sleepExec(sleepableExecutor), + _sleepExec(inlineExecutor->getSleepableExecutor(sleepAndCleanupExecutor)), + _cleanupExecutor(sleepAndCleanupExecutor), _txn(std::make_shared<details::TransactionWithRetries>( opCtx, _sleepExec, @@ -101,9 +102,8 @@ SyncTransactionWithRetries::SyncTransactionWithRetries( : std::make_unique<details::SEPTransactionClient>( opCtx, inlineExecutor, - sleepableExecutor, + _sleepExec, std::make_unique<details::DefaultSEPTransactionClientBehaviors>()))) { - // Callers should always provide a yielder when using the API with a session checked out, // otherwise commands run by the API won't be able to check out that session. invariant(!OperationContextSession::get(opCtx) || _resourceYielder); @@ -123,7 +123,6 @@ StatusWith<CommitResult> SyncTransactionWithRetries::runNoThrow(OperationContext .tapAll([&](auto&&) { mayReturn.set(); }) .semi(); - runFutureInline(_inlineExecutor.get(), mayReturn); auto txnResult = txnFuture.getNoThrow(opCtx); @@ -131,28 +130,29 @@ StatusWith<CommitResult> SyncTransactionWithRetries::runNoThrow(OperationContext // Cancel the source to guarantee the transaction will terminate if our opCtx was interrupted. _source.cancel(); - // Wait for transaction to complete before returning so variables referenced by its callback are - // guaranteed to be in scope even if the API caller's opCtx was interrupted. - txnFuture.wait(); - // Post transaction processing, which must also happen inline. OperationTimeTracker::get(opCtx)->updateOperationTime(_txn->getOperationTime()); repl::ReplClientInfo::forClient(opCtx->getClient()) .setLastProxyWriteTimestampForward(_txn->getOperationTime().asTimestamp()); - // Run cleanup tasks after the caller has finished waiting so the caller can't be blocked. - // Attempt to wait for cleanup so it appears synchronous for most callers, but allow - // interruptions so we return immediately if the opCtx has been cancelled. - // - // Also schedule after getting the transaction's operation time so the best effort abort can't - // unnecessarily advance it. - Notification<void> mayReturnFromCleanup; - auto cleanUpFuture = _txn->cleanUpIfNecessary().unsafeToInlineFuture().tapAll( - [&](auto&&) { mayReturnFromCleanup.set(); }); - - runFutureInline(_inlineExecutor.get(), mayReturnFromCleanup); - - cleanUpFuture.getNoThrow(opCtx).ignore(); + if (_txn->needsCleanup()) { + // Schedule cleanup on an out of line executor so it runs even if the transaction was + // cancelled. Attempt to wait for cleanup so it appears synchronous for most callers, but + // allow interruptions so we return immediately if the opCtx has been cancelled. + // + // Also schedule after getting the transaction's operation time so the best effort abort + // can't unnecessarily advance it. + ExecutorFuture<void>(_cleanupExecutor) + .then([txn = _txn, inlineExecutor = _inlineExecutor]() mutable { + Notification<void> mayReturnFromCleanup; + auto cleanUpFuture = txn->cleanUp().unsafeToInlineFuture().tapAll( + [&](auto&&) { mayReturnFromCleanup.set(); }); + runFutureInline(inlineExecutor.get(), mayReturnFromCleanup); + return cleanUpFuture; + }) + .getNoThrow(opCtx) + .ignore(); + } auto unyieldStatus = _resourceYielder ? _resourceYielder->unyieldNoThrow(opCtx) : Status::OK(); @@ -509,7 +509,6 @@ SemiFuture<BatchedCommandResponse> SEPTransactionClient::runCRUDOp( BatchedCommandResponse SEPTransactionClient::runCRUDOpSync(const BatchedCommandRequest& cmd, std::vector<StmtId> stmtIds) const { - Notification<void> mayReturn; auto result = @@ -832,10 +831,12 @@ Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitR return ErrorHandlingStep::kAbortAndDoNotRetry; } -SemiFuture<void> TransactionWithRetries::cleanUpIfNecessary() { - if (!_internalTxn->needsCleanup()) { - return SemiFuture<void>(Status::OK()); - } +bool TransactionWithRetries::needsCleanup() { + return _internalTxn->needsCleanup(); +} + +SemiFuture<void> TransactionWithRetries::cleanUp() { + tassert(7567600, "Unnecessarily cleaning up transaction", _internalTxn->needsCleanup()); return _bestEffortAbort() // Safe to inline because the continuation only holds state. diff --git a/src/mongo/db/transaction/transaction_api.h b/src/mongo/db/transaction/transaction_api.h index d754ab7390f..323a4af98f9 100644 --- a/src/mongo/db/transaction/transaction_api.h +++ b/src/mongo/db/transaction/transaction_api.h @@ -178,13 +178,16 @@ public: * * Optionally accepts a custom TransactionClient and will default to a client that runs commands * against the local service entry point. + * + * Will run all tasks synchronously on the caller's thread via the InlineExecutor. Will sleep + * between retries and schedule any necessary cleanup (e.g. abortTransaction commands) using the + * sleepAndCleanupExecutor. */ - SyncTransactionWithRetries( - OperationContext* opCtx, - std::shared_ptr<executor::InlineExecutor::SleepableExecutor> sleepableExecutor, - std::unique_ptr<ResourceYielder> resourceYielder, - std::shared_ptr<executor::InlineExecutor> executor, - std::unique_ptr<TransactionClient> txnClient = nullptr); + SyncTransactionWithRetries(OperationContext* opCtx, + std::shared_ptr<executor::TaskExecutor> sleepAndCleanupExecutor, + std::unique_ptr<ResourceYielder> resourceYielder, + std::shared_ptr<executor::InlineExecutor> executor, + std::unique_ptr<TransactionClient> txnClient = nullptr); /** * Returns a bundle with the commit command status and write concern error, if any. Any error * prior to receiving a response from commit (e.g. an interruption or a user assertion in the @@ -215,6 +218,7 @@ private: std::unique_ptr<ResourceYielder> _resourceYielder; std::shared_ptr<executor::InlineExecutor> _inlineExecutor; std::shared_ptr<executor::InlineExecutor::SleepableExecutor> _sleepExec; + std::shared_ptr<executor::TaskExecutor> _cleanupExecutor; std::shared_ptr<details::TransactionWithRetries> _txn; }; @@ -607,10 +611,15 @@ public: } /** - * If the transaction needs to be cleaned up, i.e. aborted, this will schedule the necessary - * work. Callers can wait for cleanup by waiting on the returned future. + * Returns if the transaction needs to be cleaned up, i.e. aborted. + */ + bool needsCleanup(); + + /** + * Schedules the necessary work to clean up the transacton, assuming it needs cleanup. Callers + * can wait for cleanup by waiting on the returned future. */ - SemiFuture<void> cleanUpIfNecessary(); + SemiFuture<void> cleanUp(); private: // Helper methods for running a transaction. diff --git a/src/mongo/db/transaction/transaction_api_test.cpp b/src/mongo/db/transaction/transaction_api_test.cpp index b14b6918644..b9290d0cd86 100644 --- a/src/mongo/db/transaction/transaction_api_test.cpp +++ b/src/mongo/db/transaction/transaction_api_test.cpp @@ -388,14 +388,12 @@ protected: _executor->startup(); _inlineExecutor = std::make_shared<executor::InlineExecutor>(); - _sleepInlineExecutor = _inlineExecutor->getSleepableExecutor(_executor); - auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>( - opCtx(), _inlineExecutor, _sleepInlineExecutor, nullptr); + opCtx(), _inlineExecutor, _inlineExecutor->getSleepableExecutor(_executor), nullptr); _mockClient = mockClient.get(); _txnWithRetries = std::make_unique<txn_api::SyncTransactionWithRetries>(opCtx(), - _sleepInlineExecutor, + _executor, nullptr /* resourceYielder */, _inlineExecutor, std::move(mockClient)); @@ -435,14 +433,15 @@ protected: return *_txnWithRetries; } - void resetTxnWithRetries( - std::unique_ptr<MockResourceYielder> resourceYielder = nullptr, - std::shared_ptr<executor::InlineExecutor::SleepableExecutor> executor = nullptr) { - auto executorToUse = _sleepInlineExecutor; - + void resetTxnWithRetries(std::unique_ptr<MockResourceYielder> resourceYielder = nullptr, + std::shared_ptr<executor::TaskExecutor> executor = nullptr) { + auto executorToUse = executor ? executor : _executor; auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>( - opCtx(), _inlineExecutor, executorToUse, nullptr); + opCtx(), + _inlineExecutor, + _inlineExecutor->getSleepableExecutor(executorToUse), + nullptr); _mockClient = mockClient.get(); if (resourceYielder) { _resourceYielder = resourceYielder.get(); @@ -468,7 +467,7 @@ protected: waitForAllEarlierTasksToComplete(); _txnWithRetries = nullptr; _txnWithRetries = std::make_unique<txn_api::SyncTransactionWithRetries>( - opCtx(), _sleepInlineExecutor, nullptr, _inlineExecutor, std::move(txnClient)); + opCtx(), _executor, nullptr, _inlineExecutor, std::move(txnClient)); } void expectSentAbort(TxnNumber txnNumber, BSONObj writeConcern) { @@ -486,7 +485,6 @@ private: std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor; std::shared_ptr<executor::InlineExecutor> _inlineExecutor; - std::shared_ptr<executor::InlineExecutor::SleepableExecutor> _sleepInlineExecutor; txn_api::details::MockTransactionClient* _mockClient{nullptr}; MockResourceYielder* _resourceYielder{nullptr}; std::unique_ptr<txn_api::SyncTransactionWithRetries> _txnWithRetries; @@ -1377,48 +1375,34 @@ TEST_F(TxnAPITest, HandlesExceptionWhileUnyielding) { ASSERT_EQ(resourceYielder()->timesUnyielded(), 1); } -// TODO SERVER-75553 - with inline executors, this test runs in the wrong order -#if 0 -TEST_F(TxnAPITest, UnyieldsAfterCancellation) { +TEST_F(TxnAPITest, TransactionErrorTakesPrecedenceOverUnyieldError) { resetTxnWithRetries(std::make_unique<MockResourceYielder>()); - unittest::Barrier txnApiStarted(2); - unittest::Barrier opCtxKilled(2); - - auto killerThread = stdx::thread([&txnApiStarted, &opCtxKilled, opCtx = opCtx()] { - txnApiStarted.countDownAndWait(); - opCtx->markKilled(); - opCtxKilled.countDownAndWait(); - }); - auto swResult = txnWithRetries().runNoThrow( opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { mockClient()->setNextCommandResponse(kOKInsertResponse); - auto insertRes = txnClient - .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd), - BSON("insert" - << "foo" - << "documents" << BSON_ARRAY(BSON("x" << 1)))) - .get(); + auto insertRes = + txnClient + .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd), + BSON("insert" + << "foo" + << "documents" << BSON_ARRAY(BSON("x" << 1)))) + .get(); - resourceYielder()->throwInUnyield(ErrorCodes::InternalError); + resourceYielder()->throwInUnyield(ErrorCodes::Interrupted); - txnApiStarted.countDownAndWait(); - opCtxKilled.countDownAndWait(); + uasserted(ErrorCodes::InternalError, "Mock error"); return SemiFuture<void>::makeReady(); }); - // The transaction should fail with an Interrupted error from killing the opCtx using the - // API instead of the ResourceYielder error from within the API callback. - ASSERT_EQ(swResult.getStatus(), ErrorCodes::Interrupted); + // The transaction should fail with the error the transaction failed with instead of the + // ResourceYielder error. + ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError); // Yield before starting and corresponding unyield. ASSERT_EQ(resourceYielder()->timesYielded(), 1); ASSERT_EQ(resourceYielder()->timesUnyielded(), 1); - - killerThread.join(); } -#endif TEST_F(TxnAPITest, ClientSession_UsesNonRetryableInternalSession) { opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); @@ -2343,8 +2327,6 @@ TEST_F(TxnAPITest, FailoverAndShutdownErrorsAreFatalForLocalTransactionWCError) runTest(true, Status(ErrorCodes::HostUnreachable, "mock retriable error")); } -// TODO SERVER-75553 - test needs to be aborted and assumes multiple-threads -#if 0 TEST_F(TxnAPITest, DoesNotWaitForBestEffortAbortIfCancelled) { // Start the transaction with an insert. mockClient()->setNextCommandResponse(kOKInsertResponse); @@ -2394,8 +2376,7 @@ TEST_F(TxnAPITest, WaitsForBestEffortAbortOnNonTransientErrorIfNotCancelled) { std::make_unique<ThreadPool>(std::move(options)), executor::makeNetworkInterface("TxnAPITestNetwork")); executor->startup(); - auto exec1 = executor::InlineExecutor().getSleepableExecutor(executor); - resetTxnWithRetries(nullptr /* resourceYielder */, exec1); + resetTxnWithRetries(nullptr /* resourceYielder */, executor); // Start the transaction with an insert. mockClient()->setNextCommandResponse(kOKInsertResponse); @@ -2454,9 +2435,7 @@ TEST_F(TxnAPITest, WaitsForBestEffortAbortOnTransientError) { std::make_unique<ThreadPool>(std::move(options)), executor::makeNetworkInterface("TxnAPITestNetwork")); executor->startup(); - auto exec1 = executor::InlineExecutor().getSleepableExecutor(executor); - - resetTxnWithRetries(nullptr /* resourceYielder */, exec1); + resetTxnWithRetries(nullptr /* resourceYielder */, executor); // Start the transaction with an insert. mockClient()->setNextCommandResponse(kOKInsertResponse); @@ -2514,7 +2493,6 @@ TEST_F(TxnAPITest, WaitsForBestEffortAbortOnTransientError) { executor->shutdown(); executor->join(); } -#endif } // namespace } // namespace mongo diff --git a/src/mongo/idl/cluster_server_parameter_refresher.cpp b/src/mongo/idl/cluster_server_parameter_refresher.cpp index efce752280b..13cc743c477 100644 --- a/src/mongo/idl/cluster_server_parameter_refresher.cpp +++ b/src/mongo/idl/cluster_server_parameter_refresher.cpp @@ -149,9 +149,7 @@ getFCVAndClusterParametersFromConfigServer() { auto executor = Grid::get(opCtx.get())->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); - txn_api::SyncTransactionWithRetries txn( - opCtx.get(), sleepInlineExecutor, nullptr, inlineExecutor); + txn_api::SyncTransactionWithRetries txn(opCtx.get(), executor, nullptr, inlineExecutor); txn.run(opCtx.get(), doFetch); return {*fcv, *allDocs}; } diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index d8b3c820341..7dfec971f08 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -157,11 +157,10 @@ void handleWouldChangeOwningShardErrorNonTransaction( const write_ops::FindAndModifyCommandRequest& request, BSONObjBuilder* result) { auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor( - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); + auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto txn = txn_api::SyncTransactionWithRetries( - opCtx, sleepInlineExecutor, nullptr /* resourceYielder */, inlineExecutor); + opCtx, executor, nullptr /* resourceYielder */, inlineExecutor); // Shared state for the transaction API use below. struct SharedBlock { @@ -258,11 +257,10 @@ void handleWouldChangeOwningShardErrorTransaction( try { auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); auto txn = txn_api::SyncTransactionWithRetries( opCtx, - sleepInlineExecutor, + executor, TransactionRouterResourceYielder::makeForLocalHandoff(), inlineExecutor); diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 169fcc0ca74..147f8ad0924 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -163,10 +163,9 @@ void handleWouldChangeOwningShardErrorNonTransaction(OperationContext* opCtx, auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); auto txn = txn_api::SyncTransactionWithRetries( - opCtx, sleepInlineExecutor, nullptr /* resourceYielder */, inlineExecutor); + opCtx, executor, nullptr /* resourceYielder */, inlineExecutor); // Shared state for the transaction API use below. struct SharedBlock { @@ -243,12 +242,8 @@ UpdateShardKeyResult handleWouldChangeOwningShardErrorTransaction( auto sharedBlock = std::make_shared<SharedBlock>(changeInfo, request->getNS()); auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); - auto txn = - txn_api::SyncTransactionWithRetries(opCtx, - sleepInlineExecutor, - TransactionRouterResourceYielder::makeForLocalHandoff(), - inlineExecutor); + auto txn = txn_api::SyncTransactionWithRetries( + opCtx, executor, TransactionRouterResourceYielder::makeForLocalHandoff(), inlineExecutor); try { txn.run(opCtx, diff --git a/src/mongo/s/commands/internal_transactions_test_command_s.cpp b/src/mongo/s/commands/internal_transactions_test_command_s.cpp index 5ea97bc1367..ac0fed2914a 100644 --- a/src/mongo/s/commands/internal_transactions_test_command_s.cpp +++ b/src/mongo/s/commands/internal_transactions_test_command_s.cpp @@ -43,12 +43,10 @@ public: StringData commandName, bool useClusterClient) { auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor( - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()); return txn_api::SyncTransactionWithRetries( opCtx, - sleepInlineExecutor, + executor, TransactionRouterResourceYielder::makeForLocalHandoff(), inlineExecutor); } diff --git a/src/mongo/s/write_ops/write_without_shard_key_util.cpp b/src/mongo/s/write_ops/write_without_shard_key_util.cpp index 9e8366cfeb8..ba2bca7c03b 100644 --- a/src/mongo/s/write_ops/write_without_shard_key_util.cpp +++ b/src/mongo/s/write_ops/write_without_shard_key_util.cpp @@ -270,13 +270,9 @@ StatusWith<ClusterWriteWithoutShardKeyResponse> runTwoPhaseWriteProtocol(Operati auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); auto inlineExecutor = std::make_shared<executor::InlineExecutor>(); - auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor); - auto txn = - txn_api::SyncTransactionWithRetries(opCtx, - sleepInlineExecutor, - TransactionRouterResourceYielder::makeForLocalHandoff(), - inlineExecutor); + auto txn = txn_api::SyncTransactionWithRetries( + opCtx, executor, TransactionRouterResourceYielder::makeForLocalHandoff(), inlineExecutor); auto sharedBlock = std::make_shared<SharedBlock>(nss, cmdObj); auto swResult = txn.runNoThrow( |