summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2023-04-24 20:32:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-04 16:35:50 +0000
commit2fafe2adeffb424d16bf14ef4686caf645adfba5 (patch)
tree9cec7377b1a37290850dd9a67de1d19923a2f809
parent6adceba617ddf90c2d739ccf7b383f4a3b5f189b (diff)
downloadmongo-2fafe2adeffb424d16bf14ef4686caf645adfba5.tar.gz
SERVER-75676 Run transaction API cleanup tasks on out of line executor
-rw-r--r--src/mongo/db/commands/internal_transactions_test_command_d.cpp6
-rw-r--r--src/mongo/db/fle_crud.cpp4
-rw-r--r--src/mongo/db/fle_crud_mongod.cpp3
-rw-r--r--src/mongo/db/s/config/configsvr_commit_index_command.cpp4
-rw-r--r--src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp3
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager.cpp10
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp15
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp6
-rw-r--r--src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp8
-rw-r--r--src/mongo/db/s/global_index/global_index_inserter.cpp4
-rw-r--r--src/mongo/db/s/sharding_ddl_util.cpp4
-rw-r--r--src/mongo/db/transaction/transaction_api.cpp55
-rw-r--r--src/mongo/db/transaction/transaction_api.h27
-rw-r--r--src/mongo/db/transaction/transaction_api_test.cpp72
-rw-r--r--src/mongo/idl/cluster_server_parameter_refresher.cpp4
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp8
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp11
-rw-r--r--src/mongo/s/commands/internal_transactions_test_command_s.cpp4
-rw-r--r--src/mongo/s/write_ops/write_without_shard_key_util.cpp8
19 files changed, 106 insertions, 150 deletions
diff --git a/src/mongo/db/commands/internal_transactions_test_command_d.cpp b/src/mongo/db/commands/internal_transactions_test_command_d.cpp
index 6f7a4ad8bab..963a309b8ef 100644
--- a/src/mongo/db/commands/internal_transactions_test_command_d.cpp
+++ b/src/mongo/db/commands/internal_transactions_test_command_d.cpp
@@ -43,12 +43,12 @@ public:
StringData commandName,
bool useClusterClient) {
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
// If a sharded mongod is acting as a mongos, it will need special routing behaviors.
if (useClusterClient) {
+ auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
return txn_api::SyncTransactionWithRetries(
opCtx,
- sleepInlineExecutor,
+ executor,
TransactionParticipantResourceYielder::make(commandName),
inlineExecutor,
std::make_unique<txn_api::details::SEPTransactionClient>(
@@ -61,7 +61,7 @@ public:
return txn_api::SyncTransactionWithRetries(
opCtx,
- sleepInlineExecutor,
+ executor,
TransactionParticipantResourceYielder::make(commandName),
inlineExecutor);
}
diff --git a/src/mongo/db/fle_crud.cpp b/src/mongo/db/fle_crud.cpp
index 7c41c7374d7..06610a08590 100644
--- a/src/mongo/db/fle_crud.cpp
+++ b/src/mongo/db/fle_crud.cpp
@@ -240,12 +240,12 @@ std::vector<std::vector<FLEEdgeCountInfo>> toEdgeCounts(
std::shared_ptr<txn_api::SyncTransactionWithRetries> getTransactionWithRetriesForMongoS(
OperationContext* opCtx) {
+ auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto fleInlineCrudExecutor = std::make_shared<executor::InlineExecutor>();
return std::make_shared<txn_api::SyncTransactionWithRetries>(
opCtx,
- fleInlineCrudExecutor->getSleepableExecutor(
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor()),
+ executor,
TransactionRouterResourceYielder::makeForLocalHandoff(),
fleInlineCrudExecutor);
}
diff --git a/src/mongo/db/fle_crud_mongod.cpp b/src/mongo/db/fle_crud_mongod.cpp
index a13e25c6685..1aea4755bc5 100644
--- a/src/mongo/db/fle_crud_mongod.cpp
+++ b/src/mongo/db/fle_crud_mongod.cpp
@@ -336,11 +336,10 @@ std::shared_ptr<txn_api::SyncTransactionWithRetries> getTransactionWithRetriesFo
OperationContext* opCtx) {
auto fleInlineCrudExecutor = std::make_shared<executor::InlineExecutor>();
- auto inlineSleepExecutor = fleInlineCrudExecutor->getSleepableExecutor(_fleCrudExecutor);
return std::make_shared<txn_api::SyncTransactionWithRetries>(
opCtx,
- inlineSleepExecutor,
+ _fleCrudExecutor,
std::make_unique<FLEMongoDResourceYielder>(),
fleInlineCrudExecutor);
}
diff --git a/src/mongo/db/s/config/configsvr_commit_index_command.cpp b/src/mongo/db/s/config/configsvr_commit_index_command.cpp
index 4d39704890c..d85bbb576df 100644
--- a/src/mongo/db/s/config/configsvr_commit_index_command.cpp
+++ b/src/mongo/db/s/config/configsvr_commit_index_command.cpp
@@ -94,11 +94,9 @@ void commitIndexInTransaction(OperationContext* opCtx,
}()});
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
-
txn_api::SyncTransactionWithRetries txn(
opCtx,
- sleepInlineExecutor,
+ executor,
TransactionParticipantResourceYielder::make("commitIndexCatalogEntry"),
inlineExecutor);
diff --git a/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp b/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp
index 7875a4e8178..4528c761e61 100644
--- a/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp
+++ b/src/mongo/db/s/config/configsvr_drop_index_catalog_command.cpp
@@ -86,11 +86,10 @@ void dropIndexInTransaction(OperationContext* opCtx,
}()});
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
txn_api::SyncTransactionWithRetries txn(
opCtx,
- sleepInlineExecutor,
+ executor,
TransactionParticipantResourceYielder::make("dropIndexCatalogEntry"),
inlineExecutor);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager.cpp b/src/mongo/db/s/config/sharding_catalog_manager.cpp
index 6482c797c0f..efd96236e44 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager.cpp
@@ -557,11 +557,10 @@ void setInitializationTimeOnPlacementHistory(
ScopeGuard resetWriteConcerGuard([opCtx, &originalWC] { opCtx->setWriteConcern(originalWC); });
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
+ auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
txn_api::SyncTransactionWithRetries txn(
- opCtx, sleepInlineExecutor, nullptr /* resourceYielder */, inlineExecutor);
+ opCtx, executor, nullptr /* resourceYielder */, inlineExecutor);
txn.run(opCtx, transactionChain);
LOGV2(7068807,
@@ -1113,11 +1112,10 @@ void ShardingCatalogManager::withTransactionAPI(OperationContext* opCtx,
const NamespaceString& namespaceForInitialFind,
txn_api::Callback callback) {
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
+ auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto txn = txn_api::SyncTransactionWithRetries(
- opCtx, sleepInlineExecutor, nullptr /* resourceYielder */, inlineExecutor);
+ opCtx, executor, nullptr /* resourceYielder */, inlineExecutor);
txn.run(opCtx,
[innerCallback = std::move(callback),
namespaceForInitialFind](const txn_api::TransactionClient& txnClient,
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
index 19806d4d8ec..23846d11a1a 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp
@@ -458,9 +458,8 @@ void mergeAllChunksOnShardInTransaction(OperationContext* opCtx,
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
- txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor);
+ txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor);
txn.run(opCtx, updateChunksFn);
}
@@ -676,9 +675,8 @@ ShardingCatalogManager::_splitChunkInTransaction(OperationContext* opCtx,
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
- txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor);
+ txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor);
// TODO: SERVER-72431 Make split chunk commit idempotent, with that we won't need anymore the
// transaction precondition and we will be able to remove the try/catch on the transaction run
@@ -904,9 +902,8 @@ void ShardingCatalogManager::_mergeChunksInTransaction(
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
- txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor);
+ txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor);
txn.run(opCtx, updateChunksFn);
}
@@ -2261,11 +2258,10 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(
};
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
txn_api::SyncTransactionWithRetries txn(
opCtx,
- sleepInlineExecutor,
+ executor,
TransactionParticipantResourceYielder::make("setAllowMigrationsAndBumpOneChunk"),
inlineExecutor);
@@ -2490,9 +2486,8 @@ void ShardingCatalogManager::_commitChunkMigrationInTransaction(
auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
- txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor);
+ txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor);
txn.run(opCtx, transactionChain);
}
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp
index 3f1753b5dcd..2f77383c745 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_database_operations.cpp
@@ -271,10 +271,9 @@ DatabaseType ShardingCatalogManager::createDatabase(
auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
txn_api::SyncTransactionWithRetries txn(
- opCtx, sleepInlineExecutor, nullptr /*resourceYielder*/, inlineExecutor);
+ opCtx, executor, nullptr /*resourceYielder*/, inlineExecutor);
txn.run(opCtx, transactionChain);
hangBeforeNotifyingCreateDatabaseCommitted.pauseWhileSet();
@@ -411,10 +410,9 @@ void ShardingCatalogManager::commitMovePrimary(OperationContext* opCtx,
auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
txn_api::SyncTransactionWithRetries txn(opCtx,
- sleepInlineExecutor,
+ executor,
nullptr, /*resourceYielder*/
inlineExecutor);
txn.run(opCtx, transactionChain);
diff --git a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp
index 01702c95259..d964c31d4c6 100644
--- a/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp
+++ b/src/mongo/db/s/config/sharding_catalog_manager_shard_operations.cpp
@@ -1508,9 +1508,8 @@ void ShardingCatalogManager::_addShardInTransaction(
auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
- txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor);
+ txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor);
txn.run(opCtx, transactionChain);
hangBeforeNotifyingaddShardCommitted.pauseWhileSet();
@@ -1571,11 +1570,10 @@ void ShardingCatalogManager::_removeShardInTransaction(OperationContext* opCtx,
.semi();
};
+ auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
- txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor);
+ txn_api::SyncTransactionWithRetries txn(opCtx, executor, nullptr, inlineExecutor);
txn.run(opCtx, removeShardFn);
}
diff --git a/src/mongo/db/s/global_index/global_index_inserter.cpp b/src/mongo/db/s/global_index/global_index_inserter.cpp
index ec9d4883d63..673810360cd 100644
--- a/src/mongo/db/s/global_index/global_index_inserter.cpp
+++ b/src/mongo/db/s/global_index/global_index_inserter.cpp
@@ -103,9 +103,7 @@ void GlobalIndexInserter::processDoc(OperationContext* opCtx,
};
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(_executor);
-
- txn_api::SyncTransactionWithRetries txn(opCtx, sleepInlineExecutor, nullptr, inlineExecutor);
+ txn_api::SyncTransactionWithRetries txn(opCtx, _executor, nullptr, inlineExecutor);
txn.run(opCtx, insertToGlobalIndexFn);
}
diff --git a/src/mongo/db/s/sharding_ddl_util.cpp b/src/mongo/db/s/sharding_ddl_util.cpp
index 03b011f1f73..835728a8da1 100644
--- a/src/mongo/db/s/sharding_ddl_util.cpp
+++ b/src/mongo/db/s/sharding_ddl_util.cpp
@@ -896,7 +896,6 @@ void runTransactionOnShardingCatalog(OperationContext* opCtx,
}();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
// Instantiate the right custom TXN client to ensure that the queries to the config DB will be
// routed to the CSRS.
@@ -909,6 +908,7 @@ void runTransactionOnShardingCatalog(OperationContext* opCtx,
return nullptr;
}
+ auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
return std::make_unique<txn_api::details::SEPTransactionClient>(
newOpCtx,
inlineExecutor,
@@ -925,7 +925,7 @@ void runTransactionOnShardingCatalog(OperationContext* opCtx,
newOpCtx->setWriteConcern(writeConcern);
txn_api::SyncTransactionWithRetries txn(newOpCtx,
- sleepInlineExecutor,
+ executor,
nullptr /*resourceYielder*/,
inlineExecutor,
std::move(customTxnClient));
diff --git a/src/mongo/db/transaction/transaction_api.cpp b/src/mongo/db/transaction/transaction_api.cpp
index 6550f900611..17e50980c99 100644
--- a/src/mongo/db/transaction/transaction_api.cpp
+++ b/src/mongo/db/transaction/transaction_api.cpp
@@ -86,13 +86,14 @@ void runFutureInline(executor::InlineExecutor* inlineExecutor, Notification<void
SyncTransactionWithRetries::SyncTransactionWithRetries(
OperationContext* opCtx,
- std::shared_ptr<executor::InlineExecutor::SleepableExecutor> sleepableExecutor,
+ std::shared_ptr<executor::TaskExecutor> sleepAndCleanupExecutor,
std::unique_ptr<ResourceYielder> resourceYielder,
std::shared_ptr<executor::InlineExecutor> inlineExecutor,
std::unique_ptr<TransactionClient> txnClient)
: _resourceYielder(std::move(resourceYielder)),
_inlineExecutor(inlineExecutor),
- _sleepExec(sleepableExecutor),
+ _sleepExec(inlineExecutor->getSleepableExecutor(sleepAndCleanupExecutor)),
+ _cleanupExecutor(sleepAndCleanupExecutor),
_txn(std::make_shared<details::TransactionWithRetries>(
opCtx,
_sleepExec,
@@ -101,9 +102,8 @@ SyncTransactionWithRetries::SyncTransactionWithRetries(
: std::make_unique<details::SEPTransactionClient>(
opCtx,
inlineExecutor,
- sleepableExecutor,
+ _sleepExec,
std::make_unique<details::DefaultSEPTransactionClientBehaviors>()))) {
-
// Callers should always provide a yielder when using the API with a session checked out,
// otherwise commands run by the API won't be able to check out that session.
invariant(!OperationContextSession::get(opCtx) || _resourceYielder);
@@ -123,7 +123,6 @@ StatusWith<CommitResult> SyncTransactionWithRetries::runNoThrow(OperationContext
.tapAll([&](auto&&) { mayReturn.set(); })
.semi();
-
runFutureInline(_inlineExecutor.get(), mayReturn);
auto txnResult = txnFuture.getNoThrow(opCtx);
@@ -131,28 +130,29 @@ StatusWith<CommitResult> SyncTransactionWithRetries::runNoThrow(OperationContext
// Cancel the source to guarantee the transaction will terminate if our opCtx was interrupted.
_source.cancel();
- // Wait for transaction to complete before returning so variables referenced by its callback are
- // guaranteed to be in scope even if the API caller's opCtx was interrupted.
- txnFuture.wait();
-
// Post transaction processing, which must also happen inline.
OperationTimeTracker::get(opCtx)->updateOperationTime(_txn->getOperationTime());
repl::ReplClientInfo::forClient(opCtx->getClient())
.setLastProxyWriteTimestampForward(_txn->getOperationTime().asTimestamp());
- // Run cleanup tasks after the caller has finished waiting so the caller can't be blocked.
- // Attempt to wait for cleanup so it appears synchronous for most callers, but allow
- // interruptions so we return immediately if the opCtx has been cancelled.
- //
- // Also schedule after getting the transaction's operation time so the best effort abort can't
- // unnecessarily advance it.
- Notification<void> mayReturnFromCleanup;
- auto cleanUpFuture = _txn->cleanUpIfNecessary().unsafeToInlineFuture().tapAll(
- [&](auto&&) { mayReturnFromCleanup.set(); });
-
- runFutureInline(_inlineExecutor.get(), mayReturnFromCleanup);
-
- cleanUpFuture.getNoThrow(opCtx).ignore();
+ if (_txn->needsCleanup()) {
+ // Schedule cleanup on an out of line executor so it runs even if the transaction was
+ // cancelled. Attempt to wait for cleanup so it appears synchronous for most callers, but
+ // allow interruptions so we return immediately if the opCtx has been cancelled.
+ //
+ // Also schedule after getting the transaction's operation time so the best effort abort
+ // can't unnecessarily advance it.
+ ExecutorFuture<void>(_cleanupExecutor)
+ .then([txn = _txn, inlineExecutor = _inlineExecutor]() mutable {
+ Notification<void> mayReturnFromCleanup;
+ auto cleanUpFuture = txn->cleanUp().unsafeToInlineFuture().tapAll(
+ [&](auto&&) { mayReturnFromCleanup.set(); });
+ runFutureInline(inlineExecutor.get(), mayReturnFromCleanup);
+ return cleanUpFuture;
+ })
+ .getNoThrow(opCtx)
+ .ignore();
+ }
auto unyieldStatus = _resourceYielder ? _resourceYielder->unyieldNoThrow(opCtx) : Status::OK();
@@ -509,7 +509,6 @@ SemiFuture<BatchedCommandResponse> SEPTransactionClient::runCRUDOp(
BatchedCommandResponse SEPTransactionClient::runCRUDOpSync(const BatchedCommandRequest& cmd,
std::vector<StmtId> stmtIds) const {
-
Notification<void> mayReturn;
auto result =
@@ -832,10 +831,12 @@ Transaction::ErrorHandlingStep Transaction::handleError(const StatusWith<CommitR
return ErrorHandlingStep::kAbortAndDoNotRetry;
}
-SemiFuture<void> TransactionWithRetries::cleanUpIfNecessary() {
- if (!_internalTxn->needsCleanup()) {
- return SemiFuture<void>(Status::OK());
- }
+bool TransactionWithRetries::needsCleanup() {
+ return _internalTxn->needsCleanup();
+}
+
+SemiFuture<void> TransactionWithRetries::cleanUp() {
+ tassert(7567600, "Unnecessarily cleaning up transaction", _internalTxn->needsCleanup());
return _bestEffortAbort()
// Safe to inline because the continuation only holds state.
diff --git a/src/mongo/db/transaction/transaction_api.h b/src/mongo/db/transaction/transaction_api.h
index d754ab7390f..323a4af98f9 100644
--- a/src/mongo/db/transaction/transaction_api.h
+++ b/src/mongo/db/transaction/transaction_api.h
@@ -178,13 +178,16 @@ public:
*
* Optionally accepts a custom TransactionClient and will default to a client that runs commands
* against the local service entry point.
+ *
+ * Will run all tasks synchronously on the caller's thread via the InlineExecutor. Will sleep
+ * between retries and schedule any necessary cleanup (e.g. abortTransaction commands) using the
+ * sleepAndCleanupExecutor.
*/
- SyncTransactionWithRetries(
- OperationContext* opCtx,
- std::shared_ptr<executor::InlineExecutor::SleepableExecutor> sleepableExecutor,
- std::unique_ptr<ResourceYielder> resourceYielder,
- std::shared_ptr<executor::InlineExecutor> executor,
- std::unique_ptr<TransactionClient> txnClient = nullptr);
+ SyncTransactionWithRetries(OperationContext* opCtx,
+ std::shared_ptr<executor::TaskExecutor> sleepAndCleanupExecutor,
+ std::unique_ptr<ResourceYielder> resourceYielder,
+ std::shared_ptr<executor::InlineExecutor> executor,
+ std::unique_ptr<TransactionClient> txnClient = nullptr);
/**
* Returns a bundle with the commit command status and write concern error, if any. Any error
* prior to receiving a response from commit (e.g. an interruption or a user assertion in the
@@ -215,6 +218,7 @@ private:
std::unique_ptr<ResourceYielder> _resourceYielder;
std::shared_ptr<executor::InlineExecutor> _inlineExecutor;
std::shared_ptr<executor::InlineExecutor::SleepableExecutor> _sleepExec;
+ std::shared_ptr<executor::TaskExecutor> _cleanupExecutor;
std::shared_ptr<details::TransactionWithRetries> _txn;
};
@@ -607,10 +611,15 @@ public:
}
/**
- * If the transaction needs to be cleaned up, i.e. aborted, this will schedule the necessary
- * work. Callers can wait for cleanup by waiting on the returned future.
+ * Returns if the transaction needs to be cleaned up, i.e. aborted.
+ */
+ bool needsCleanup();
+
+ /**
+ * Schedules the necessary work to clean up the transacton, assuming it needs cleanup. Callers
+ * can wait for cleanup by waiting on the returned future.
*/
- SemiFuture<void> cleanUpIfNecessary();
+ SemiFuture<void> cleanUp();
private:
// Helper methods for running a transaction.
diff --git a/src/mongo/db/transaction/transaction_api_test.cpp b/src/mongo/db/transaction/transaction_api_test.cpp
index b14b6918644..b9290d0cd86 100644
--- a/src/mongo/db/transaction/transaction_api_test.cpp
+++ b/src/mongo/db/transaction/transaction_api_test.cpp
@@ -388,14 +388,12 @@ protected:
_executor->startup();
_inlineExecutor = std::make_shared<executor::InlineExecutor>();
- _sleepInlineExecutor = _inlineExecutor->getSleepableExecutor(_executor);
-
auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>(
- opCtx(), _inlineExecutor, _sleepInlineExecutor, nullptr);
+ opCtx(), _inlineExecutor, _inlineExecutor->getSleepableExecutor(_executor), nullptr);
_mockClient = mockClient.get();
_txnWithRetries =
std::make_unique<txn_api::SyncTransactionWithRetries>(opCtx(),
- _sleepInlineExecutor,
+ _executor,
nullptr /* resourceYielder */,
_inlineExecutor,
std::move(mockClient));
@@ -435,14 +433,15 @@ protected:
return *_txnWithRetries;
}
- void resetTxnWithRetries(
- std::unique_ptr<MockResourceYielder> resourceYielder = nullptr,
- std::shared_ptr<executor::InlineExecutor::SleepableExecutor> executor = nullptr) {
- auto executorToUse = _sleepInlineExecutor;
-
+ void resetTxnWithRetries(std::unique_ptr<MockResourceYielder> resourceYielder = nullptr,
+ std::shared_ptr<executor::TaskExecutor> executor = nullptr) {
+ auto executorToUse = executor ? executor : _executor;
auto mockClient = std::make_unique<txn_api::details::MockTransactionClient>(
- opCtx(), _inlineExecutor, executorToUse, nullptr);
+ opCtx(),
+ _inlineExecutor,
+ _inlineExecutor->getSleepableExecutor(executorToUse),
+ nullptr);
_mockClient = mockClient.get();
if (resourceYielder) {
_resourceYielder = resourceYielder.get();
@@ -468,7 +467,7 @@ protected:
waitForAllEarlierTasksToComplete();
_txnWithRetries = nullptr;
_txnWithRetries = std::make_unique<txn_api::SyncTransactionWithRetries>(
- opCtx(), _sleepInlineExecutor, nullptr, _inlineExecutor, std::move(txnClient));
+ opCtx(), _executor, nullptr, _inlineExecutor, std::move(txnClient));
}
void expectSentAbort(TxnNumber txnNumber, BSONObj writeConcern) {
@@ -486,7 +485,6 @@ private:
std::shared_ptr<executor::ThreadPoolTaskExecutor> _executor;
std::shared_ptr<executor::InlineExecutor> _inlineExecutor;
- std::shared_ptr<executor::InlineExecutor::SleepableExecutor> _sleepInlineExecutor;
txn_api::details::MockTransactionClient* _mockClient{nullptr};
MockResourceYielder* _resourceYielder{nullptr};
std::unique_ptr<txn_api::SyncTransactionWithRetries> _txnWithRetries;
@@ -1377,48 +1375,34 @@ TEST_F(TxnAPITest, HandlesExceptionWhileUnyielding) {
ASSERT_EQ(resourceYielder()->timesUnyielded(), 1);
}
-// TODO SERVER-75553 - with inline executors, this test runs in the wrong order
-#if 0
-TEST_F(TxnAPITest, UnyieldsAfterCancellation) {
+TEST_F(TxnAPITest, TransactionErrorTakesPrecedenceOverUnyieldError) {
resetTxnWithRetries(std::make_unique<MockResourceYielder>());
- unittest::Barrier txnApiStarted(2);
- unittest::Barrier opCtxKilled(2);
-
- auto killerThread = stdx::thread([&txnApiStarted, &opCtxKilled, opCtx = opCtx()] {
- txnApiStarted.countDownAndWait();
- opCtx->markKilled();
- opCtxKilled.countDownAndWait();
- });
-
auto swResult = txnWithRetries().runNoThrow(
opCtx(), [&](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) {
mockClient()->setNextCommandResponse(kOKInsertResponse);
- auto insertRes = txnClient
- .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd),
- BSON("insert"
- << "foo"
- << "documents" << BSON_ARRAY(BSON("x" << 1))))
- .get();
+ auto insertRes =
+ txnClient
+ .runCommand(DatabaseName::createDatabaseName_forTest(boost::none, "user"_sd),
+ BSON("insert"
+ << "foo"
+ << "documents" << BSON_ARRAY(BSON("x" << 1))))
+ .get();
- resourceYielder()->throwInUnyield(ErrorCodes::InternalError);
+ resourceYielder()->throwInUnyield(ErrorCodes::Interrupted);
- txnApiStarted.countDownAndWait();
- opCtxKilled.countDownAndWait();
+ uasserted(ErrorCodes::InternalError, "Mock error");
return SemiFuture<void>::makeReady();
});
- // The transaction should fail with an Interrupted error from killing the opCtx using the
- // API instead of the ResourceYielder error from within the API callback.
- ASSERT_EQ(swResult.getStatus(), ErrorCodes::Interrupted);
+ // The transaction should fail with the error the transaction failed with instead of the
+ // ResourceYielder error.
+ ASSERT_EQ(swResult.getStatus(), ErrorCodes::InternalError);
// Yield before starting and corresponding unyield.
ASSERT_EQ(resourceYielder()->timesYielded(), 1);
ASSERT_EQ(resourceYielder()->timesUnyielded(), 1);
-
- killerThread.join();
}
-#endif
TEST_F(TxnAPITest, ClientSession_UsesNonRetryableInternalSession) {
opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
@@ -2343,8 +2327,6 @@ TEST_F(TxnAPITest, FailoverAndShutdownErrorsAreFatalForLocalTransactionWCError)
runTest(true, Status(ErrorCodes::HostUnreachable, "mock retriable error"));
}
-// TODO SERVER-75553 - test needs to be aborted and assumes multiple-threads
-#if 0
TEST_F(TxnAPITest, DoesNotWaitForBestEffortAbortIfCancelled) {
// Start the transaction with an insert.
mockClient()->setNextCommandResponse(kOKInsertResponse);
@@ -2394,8 +2376,7 @@ TEST_F(TxnAPITest, WaitsForBestEffortAbortOnNonTransientErrorIfNotCancelled) {
std::make_unique<ThreadPool>(std::move(options)),
executor::makeNetworkInterface("TxnAPITestNetwork"));
executor->startup();
- auto exec1 = executor::InlineExecutor().getSleepableExecutor(executor);
- resetTxnWithRetries(nullptr /* resourceYielder */, exec1);
+ resetTxnWithRetries(nullptr /* resourceYielder */, executor);
// Start the transaction with an insert.
mockClient()->setNextCommandResponse(kOKInsertResponse);
@@ -2454,9 +2435,7 @@ TEST_F(TxnAPITest, WaitsForBestEffortAbortOnTransientError) {
std::make_unique<ThreadPool>(std::move(options)),
executor::makeNetworkInterface("TxnAPITestNetwork"));
executor->startup();
- auto exec1 = executor::InlineExecutor().getSleepableExecutor(executor);
-
- resetTxnWithRetries(nullptr /* resourceYielder */, exec1);
+ resetTxnWithRetries(nullptr /* resourceYielder */, executor);
// Start the transaction with an insert.
mockClient()->setNextCommandResponse(kOKInsertResponse);
@@ -2514,7 +2493,6 @@ TEST_F(TxnAPITest, WaitsForBestEffortAbortOnTransientError) {
executor->shutdown();
executor->join();
}
-#endif
} // namespace
} // namespace mongo
diff --git a/src/mongo/idl/cluster_server_parameter_refresher.cpp b/src/mongo/idl/cluster_server_parameter_refresher.cpp
index efce752280b..13cc743c477 100644
--- a/src/mongo/idl/cluster_server_parameter_refresher.cpp
+++ b/src/mongo/idl/cluster_server_parameter_refresher.cpp
@@ -149,9 +149,7 @@ getFCVAndClusterParametersFromConfigServer() {
auto executor = Grid::get(opCtx.get())->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
- txn_api::SyncTransactionWithRetries txn(
- opCtx.get(), sleepInlineExecutor, nullptr, inlineExecutor);
+ txn_api::SyncTransactionWithRetries txn(opCtx.get(), executor, nullptr, inlineExecutor);
txn.run(opCtx.get(), doFetch);
return {*fcv, *allDocs};
}
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index d8b3c820341..7dfec971f08 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -157,11 +157,10 @@ void handleWouldChangeOwningShardErrorNonTransaction(
const write_ops::FindAndModifyCommandRequest& request,
BSONObjBuilder* result) {
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
+ auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto txn = txn_api::SyncTransactionWithRetries(
- opCtx, sleepInlineExecutor, nullptr /* resourceYielder */, inlineExecutor);
+ opCtx, executor, nullptr /* resourceYielder */, inlineExecutor);
// Shared state for the transaction API use below.
struct SharedBlock {
@@ -258,11 +257,10 @@ void handleWouldChangeOwningShardErrorTransaction(
try {
auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
auto txn = txn_api::SyncTransactionWithRetries(
opCtx,
- sleepInlineExecutor,
+ executor,
TransactionRouterResourceYielder::makeForLocalHandoff(),
inlineExecutor);
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index 169fcc0ca74..147f8ad0924 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -163,10 +163,9 @@ void handleWouldChangeOwningShardErrorNonTransaction(OperationContext* opCtx,
auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
auto txn = txn_api::SyncTransactionWithRetries(
- opCtx, sleepInlineExecutor, nullptr /* resourceYielder */, inlineExecutor);
+ opCtx, executor, nullptr /* resourceYielder */, inlineExecutor);
// Shared state for the transaction API use below.
struct SharedBlock {
@@ -243,12 +242,8 @@ UpdateShardKeyResult handleWouldChangeOwningShardErrorTransaction(
auto sharedBlock = std::make_shared<SharedBlock>(changeInfo, request->getNS());
auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
- auto txn =
- txn_api::SyncTransactionWithRetries(opCtx,
- sleepInlineExecutor,
- TransactionRouterResourceYielder::makeForLocalHandoff(),
- inlineExecutor);
+ auto txn = txn_api::SyncTransactionWithRetries(
+ opCtx, executor, TransactionRouterResourceYielder::makeForLocalHandoff(), inlineExecutor);
try {
txn.run(opCtx,
diff --git a/src/mongo/s/commands/internal_transactions_test_command_s.cpp b/src/mongo/s/commands/internal_transactions_test_command_s.cpp
index 5ea97bc1367..ac0fed2914a 100644
--- a/src/mongo/s/commands/internal_transactions_test_command_s.cpp
+++ b/src/mongo/s/commands/internal_transactions_test_command_s.cpp
@@ -43,12 +43,10 @@ public:
StringData commandName,
bool useClusterClient) {
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor());
return txn_api::SyncTransactionWithRetries(
opCtx,
- sleepInlineExecutor,
+ executor,
TransactionRouterResourceYielder::makeForLocalHandoff(),
inlineExecutor);
}
diff --git a/src/mongo/s/write_ops/write_without_shard_key_util.cpp b/src/mongo/s/write_ops/write_without_shard_key_util.cpp
index 9e8366cfeb8..ba2bca7c03b 100644
--- a/src/mongo/s/write_ops/write_without_shard_key_util.cpp
+++ b/src/mongo/s/write_ops/write_without_shard_key_util.cpp
@@ -270,13 +270,9 @@ StatusWith<ClusterWriteWithoutShardKeyResponse> runTwoPhaseWriteProtocol(Operati
auto& executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
auto inlineExecutor = std::make_shared<executor::InlineExecutor>();
- auto sleepInlineExecutor = inlineExecutor->getSleepableExecutor(executor);
- auto txn =
- txn_api::SyncTransactionWithRetries(opCtx,
- sleepInlineExecutor,
- TransactionRouterResourceYielder::makeForLocalHandoff(),
- inlineExecutor);
+ auto txn = txn_api::SyncTransactionWithRetries(
+ opCtx, executor, TransactionRouterResourceYielder::makeForLocalHandoff(), inlineExecutor);
auto sharedBlock = std::make_shared<SharedBlock>(nss, cmdObj);
auto swResult = txn.runNoThrow(