diff options
Diffstat (limited to 'src/mongo/db/transaction_coordinator_service.cpp')
-rw-r--r-- | src/mongo/db/transaction_coordinator_service.cpp | 160 |
1 files changed, 66 insertions, 94 deletions
diff --git a/src/mongo/db/transaction_coordinator_service.cpp b/src/mongo/db/transaction_coordinator_service.cpp index 2cf14972c0b..3731a1853f5 100644 --- a/src/mongo/db/transaction_coordinator_service.cpp +++ b/src/mongo/db/transaction_coordinator_service.cpp @@ -51,42 +51,10 @@ namespace { const auto transactionCoordinatorServiceDecoration = ServiceContext::declareDecoration<TransactionCoordinatorService>(); -/** - * Constructs the default options for the thread pool used to run commit. - */ -ThreadPool::Options makeDefaultThreadPoolOptions() { - ThreadPool::Options options; - options.poolName = "TransactionCoordinatorService"; - options.minThreads = 0; - options.maxThreads = ThreadPool::Options::kUnlimited; - - // Ensure all threads have a client - options.onCreateThread = [](const std::string& threadName) { - Client::initThread(threadName.c_str()); - }; - return options; -} - } // namespace TransactionCoordinatorService::TransactionCoordinatorService() - : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()), - _threadPool(std::make_unique<ThreadPool>(makeDefaultThreadPoolOptions())) {} - -void TransactionCoordinatorService::setThreadPoolForTest(std::unique_ptr<ThreadPool> pool) { - shutdown(); - _threadPool = std::move(pool); - startup(); -} - -void TransactionCoordinatorService::startup() { - _threadPool->startup(); -} - -void TransactionCoordinatorService::shutdown() { - _threadPool->shutdown(); - _threadPool->join(); -} + : _coordinatorCatalog(std::make_shared<TransactionCoordinatorCatalog>()) {} TransactionCoordinatorService* TransactionCoordinatorService::get(OperationContext* opCtx) { return get(opCtx->getServiceContext()); @@ -108,16 +76,16 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, latestCoordinator->cancelIfCommitNotYetStarted(); } - auto networkExecutor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - auto coordinator = std::make_shared<TransactionCoordinator>( - opCtx->getServiceContext(), networkExecutor, _threadPool.get(), lsid, txnNumber); + auto coordinator = + std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), lsid, txnNumber); _coordinatorCatalog->insert(opCtx, lsid, txnNumber, coordinator); // Schedule a task in the future to cancel the commit coordination on the coordinator, so that // the coordinator does not remain in memory forever (in case the particpant list is never // received). - auto cbHandle = uassertStatusOK(networkExecutor->scheduleWorkAt( + const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto cbHandle = uassertStatusOK(executor->scheduleWorkAt( commitDeadline, [coordinatorWeakPtr = std::weak_ptr<TransactionCoordinator>(coordinator)]( const mongo::executor::TaskExecutor::CallbackArgs& cbArgs) mutable { @@ -176,66 +144,70 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx) { // stepup task. _coordinatorCatalog->enterStepUp(opCtx); - auto scheduleStatus = _threadPool->schedule([this]() { - try { - // The opCtx destructor handles unsetting itself from the Client - auto opCtxPtr = Client::getCurrent()->makeOperationContext(); - auto opCtx = opCtxPtr.get(); - - repl::ReplClientInfo::forClient(opCtx->getClient()).setLastOpToSystemLastOpTime(opCtx); - const auto lastOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - LOG(3) << "Going to wait for client's last OpTime " << lastOpTime - << " to become majority committed"; - WriteConcernResult unusedWCResult; - uassertStatusOK(waitForWriteConcern( - opCtx, - lastOpTime, - WriteConcernOptions{WriteConcernOptions::kInternalMajorityNoSnapshot, - WriteConcernOptions::SyncMode::UNSET, - WriteConcernOptions::kNoTimeout}, - &unusedWCResult)); - - auto coordinatorDocs = TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx); - LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size() - << " transactions"; - - for (const auto& doc : coordinatorDocs) { - LOG(3) << "Going to resume coordinating commit for " << doc.toBSON(); - const auto lsid = *doc.getId().getSessionId(); - const auto txnNumber = *doc.getId().getTxnNumber(); - - auto networkExecutor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - auto coordinator = - std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), - networkExecutor, - _threadPool.get(), - lsid, - txnNumber); - _coordinatorCatalog->insert( - opCtx, lsid, txnNumber, coordinator, true /* forStepUp */); - coordinator->continueCommit(doc); - } - - _coordinatorCatalog->exitStepUp(); - - LOG(3) << "Incoming coordinateCommit requests now accepted"; - } catch (const DBException& e) { - LOG(3) << "Failed while executing thread to resume coordinating commit for pending " - "transactions " - << causedBy(e.toStatus()); - _coordinatorCatalog->exitStepUp(); - } - }); - - if (scheduleStatus.code() == ErrorCodes::ShutdownInProgress) { + const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); + auto status = + executor + ->scheduleWork([ this, service = opCtx->getServiceContext() ]( + const executor::TaskExecutor::CallbackArgs&) { + try { + // The opCtx destructor handles unsetting itself from the Client + ThreadClient threadClient("TransactionCoordinator-StepUp", service); + auto opCtxPtr = Client::getCurrent()->makeOperationContext(); + auto opCtx = opCtxPtr.get(); + + auto& replClientInfo = repl::ReplClientInfo::forClient(opCtx->getClient()); + replClientInfo.setLastOpToSystemLastOpTime(opCtx); + + const auto lastOpTime = replClientInfo.getLastOp(); + LOG(3) << "Going to wait for client's last OpTime " << lastOpTime + << " to become majority committed"; + WriteConcernResult unusedWCResult; + uassertStatusOK(waitForWriteConcern( + opCtx, + lastOpTime, + WriteConcernOptions{WriteConcernOptions::kInternalMajorityNoSnapshot, + WriteConcernOptions::SyncMode::UNSET, + WriteConcernOptions::kNoTimeout}, + &unusedWCResult)); + + auto coordinatorDocs = + TransactionCoordinatorDriver::readAllCoordinatorDocs(opCtx); + + LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size() + << " transactions"; + + for (const auto& doc : coordinatorDocs) { + LOG(3) << "Going to resume coordinating commit for " << doc.toBSON(); + const auto lsid = *doc.getId().getSessionId(); + const auto txnNumber = *doc.getId().getTxnNumber(); + + auto coordinator = std::make_shared<TransactionCoordinator>( + opCtx->getServiceContext(), lsid, txnNumber); + _coordinatorCatalog->insert( + opCtx, lsid, txnNumber, coordinator, true /* forStepUp */); + coordinator->continueCommit(doc); + } + + _coordinatorCatalog->exitStepUp(); + + LOG(3) << "Incoming coordinateCommit requests now accepted"; + } catch (const DBException& e) { + LOG(3) << "Failed while executing thread to resume coordinating commit for " + "pending " + "transactions " + << causedBy(e.toStatus()); + _coordinatorCatalog->exitStepUp(); + } + }) + .getStatus(); + + // TODO (SERVER-38320): Reschedule the stepup task if the interruption was not due to stepdown. + if (status == ErrorCodes::ShutdownInProgress || ErrorCodes::isInterruption(status.code())) { return; } - fassert(51031, scheduleStatus.isOK()); + invariant(status); } -ServiceContext::ConstructorActionRegisterer transactionCoordinatorServiceRegisterer{ - "TransactionCoordinatorService", - [](ServiceContext* service) { TransactionCoordinatorService::get(service)->startup(); }, - [](ServiceContext* service) { TransactionCoordinatorService::get(service)->shutdown(); }}; +void TransactionCoordinatorService::onStepDown(OperationContext* opCtx) {} } // namespace mongo |