summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_coordinator_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_coordinator_service.cpp')
-rw-r--r--src/mongo/db/transaction_coordinator_service.cpp160
1 files changed, 66 insertions, 94 deletions
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp
index 2cf14972c0b..3731a1853f5 100644
--- a/src/mongo/db/transaction_coordinator_service.cpp
+++ b/src/mongo/db/transaction_coordinator_service.cpp
@@ -51,42 +51,10 @@ namespace {
const auto transactionCoordinatorServiceDecoration =
ServiceContext::declareDecoration<TransactionCoordinatorService>();
-/**
- * Constructs the default options for the thread pool used to run commit.
- */
-ThreadPool::Options makeDefaultThreadPoolOptions() {
- ThreadPool::Options options;
- options.poolName = "TransactionCoordinatorService";
- options.minThreads = 0;
- options.maxThreads = ThreadPool::Options::kUnlimited;
-
- // Ensure all threads have a client
- options.onCreateThread = [](const std::string& threadName) {
- Client::initThread(threadName.c_str());
- };
- return options;
-}
-
} // namespace
TransactionCoordinatorService::TransactionCoordinatorService()
- : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()),
- _threadPool(std::make_unique<ThreadPool>(makeDefaultThreadPoolOptions())) {}
-
-void TransactionCoordinatorService::setThreadPoolForTest(std::unique_ptr<ThreadPool> pool) {
- shutdown();
- _threadPool = std::move(pool);
- startup();
-}
-
-void TransactionCoordinatorService::startup() {
- _threadPool->startup();
-}
-
-void TransactionCoordinatorService::shutdown() {
- _threadPool->shutdown();
- _threadPool->join();
-}
+ : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {}
TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) {
return get(opCtx->getServiceContext());
@@ -108,16 +76,16 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
latestCoordinator->cancelIfCommitNotYetStarted();
}
- auto networkExecutor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- auto coordinator = std::make_shared<TransactionCoordinator>(
- opCtx->getServiceContext(), networkExecutor, _threadPool.get(), lsid, txnNumber);
+ auto coordinator =
+ std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), lsid, txnNumber);
_coordinatorCatalog->insert(opCtx, lsid, txnNumber, coordinator);
// Schedule a task in the future to cancel the commit coordination on the coordinator, so that
// the coordinator does not remain in memory forever (in case the particpant list is never
// received).
- auto cbHandle = uassertStatusOK(networkExecutor->scheduleWorkAt(
+ const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ auto cbHandle = uassertStatusOK(executor->scheduleWorkAt(
commitDeadline,
[coordinatorWeakPtr = std::weak_ptr<TransactionCoordinator>(coordinator)](
const mongo::executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
@@ -176,66 +144,70 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) {
// stepup task.
_coordinatorCatalog->enterStepUp(opCtx);
- auto scheduleStatus = _threadPool->schedule([this]() {
- try {
- // The opCtx destructor handles unsetting itself from the Client
- auto opCtxPtr = Client::getCurrent()->makeOperationContext();
- auto opCtx = opCtxPtr.get();
-
- repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx);
- const auto lastOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- LOG(3) << "Going to wait for client's last OpTime " << lastOpTime
- << " to become majority committed";
- WriteConcernResult unusedWCResult;
- uassertStatusOK(waitForWriteConcern(
- opCtx,
- lastOpTime,
- WriteConcernOptions{WriteConcernOptions::kInternalMajorityNoSnapshot,
- WriteConcernOptions::SyncMode::UNSET,
- WriteConcernOptions::kNoTimeout},
- &unusedWCResult));
-
- auto coordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
- LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size()
- << " transactions";
-
- for (const auto& doc : coordinatorDocs) {
- LOG(3) << "Going to resume coordinating commit for " << doc.toBSON();
- const auto lsid = *doc.getId().getSessionId();
- const auto txnNumber = *doc.getId().getTxnNumber();
-
- auto networkExecutor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- auto coordinator =
- std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(),
- networkExecutor,
- _threadPool.get(),
- lsid,
- txnNumber);
- _coordinatorCatalog->insert(
- opCtx, lsid, txnNumber, coordinator, true /* forStepUp */);
- coordinator->continueCommit(doc);
- }
-
- _coordinatorCatalog->exitStepUp();
-
- LOG(3) << "Incoming coordinateCommit requests now accepted";
- } catch (const DBException& e) {
- LOG(3) << "Failed while executing thread to resume coordinating commit for pending "
- "transactions "
- << causedBy(e.toStatus());
- _coordinatorCatalog->exitStepUp();
- }
- });
-
- if (scheduleStatus.code() == ErrorCodes::ShutdownInProgress) {
+ const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
+ auto status =
+ executor
+ ->scheduleWork([ this, service = opCtx->getServiceContext() ](
+ const executor::TaskExecutor::CallbackArgs&) {
+ try {
+ // The opCtx destructor handles unsetting itself from the Client
+ ThreadClient threadClient("TransactionCoordinator-StepUp", service);
+ auto opCtxPtr = Client::getCurrent()->makeOperationContext();
+ auto opCtx = opCtxPtr.get();
+
+ auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient());
+ replClientInfo.setLastOpToSystemLastOpTime(opCtx);
+
+ const auto lastOpTime = replClientInfo.getLastOp();
+ LOG(3) << "Going to wait for client's last OpTime " << lastOpTime
+ << " to become majority committed";
+ WriteConcernResult unusedWCResult;
+ uassertStatusOK(waitForWriteConcern(
+ opCtx,
+ lastOpTime,
+ WriteConcernOptions{WriteConcernOptions::kInternalMajorityNoSnapshot,
+ WriteConcernOptions::SyncMode::UNSET,
+ WriteConcernOptions::kNoTimeout},
+ &unusedWCResult));
+
+ auto coordinatorDocs =
+ TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx);
+
+ LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size()
+ << " transactions";
+
+ for (const auto& doc : coordinatorDocs) {
+ LOG(3) << "Going to resume coordinating commit for " << doc.toBSON();
+ const auto lsid = *doc.getId().getSessionId();
+ const auto txnNumber = *doc.getId().getTxnNumber();
+
+ auto coordinator = std::make_shared<TransactionCoordinator>(
+ opCtx->getServiceContext(), lsid, txnNumber);
+ _coordinatorCatalog->insert(
+ opCtx, lsid, txnNumber, coordinator, true /* forStepUp */);
+ coordinator->continueCommit(doc);
+ }
+
+ _coordinatorCatalog->exitStepUp();
+
+ LOG(3) << "Incoming coordinateCommit requests now accepted";
+ } catch (const DBException& e) {
+ LOG(3) << "Failed while executing thread to resume coordinating commit for "
+ "pending "
+ "transactions "
+ << causedBy(e.toStatus());
+ _coordinatorCatalog->exitStepUp();
+ }
+ })
+ .getStatus();
+
+ // TODO (SERVER-38320): Reschedule the stepup task if the interruption was not due to stepdown.
+ if (status == ErrorCodes::ShutdownInProgress || ErrorCodes::isInterruption(status.code())) {
return;
}
- fassert(51031, scheduleStatus.isOK());
+ invariant(status);
}
-ServiceContext::ConstructorActionRegisterer transactionCoordinatorServiceRegisterer{
- "TransactionCoordinatorService",
- [](ServiceContext* service) { TransactionCoordinatorService::get(service)->startup(); },
- [](ServiceContext* service) { TransactionCoordinatorService::get(service)->shutdown(); }};
+void TransactionCoordinatorService::onStepDown(OperationContext* opCtx) {}
} // namespace mongo