diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-02-12 16:56:00 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-02-22 07:19:19 -0500 |
commit | 63220b7df242b26f83c7083394b463c4e4b3a5d8 (patch) | |
tree | 203129f84d8afad520169a95fd03138af7b4107b /src/mongo/db/s/transaction_coordinator_service.cpp | |
parent | cdc5a7e8fe27e66b4a5f862cf9624130fd5e4eeb (diff) | |
download | mongo-63220b7df242b26f83c7083394b463c4e4b3a5d8.tar.gz |
SERVER-38522/SERVER-38715 Make all TransactionCoordinator tasks interruptible
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_service.cpp')
-rw-r--r-- | src/mongo/db/s/transaction_coordinator_service.cpp | 64 |
1 files changed, 32 insertions, 32 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp index 8f1301873e4..e5b5e15e300 100644 --- a/src/mongo/db/s/transaction_coordinator_service.cpp +++ b/src/mongo/db/s/transaction_coordinator_service.cpp @@ -36,8 +36,6 @@ #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/s/transaction_coordinator_document_gen.h" #include "mongo/db/write_concern.h" -#include "mongo/executor/task_executor.h" -#include "mongo/executor/task_executor_pool.h" #include "mongo/s/grid.h" #include "mongo/util/log.h" @@ -69,6 +67,7 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, Date_t commitDeadline) { auto cas = _getCatalogAndScheduler(opCtx); auto& catalog = cas->catalog; + auto& scheduler = cas->scheduler; if (auto latestTxnNumAndCoordinator = catalog.getLatestOnSession(opCtx, lsid)) { auto latestCoordinator = latestTxnNumAndCoordinator->second; @@ -78,27 +77,14 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx, latestCoordinator->cancelIfCommitNotYetStarted(); } - auto coordinator = - std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), lsid, txnNumber); - - catalog.insert(opCtx, lsid, txnNumber, coordinator); - - // Schedule a task in the future to cancel the commit coordination on the coordinator, so that - // the coordinator does not remain in memory forever (in case the particpant list is never - // received). - const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - auto cbHandle = uassertStatusOK(executor->scheduleWorkAt( - commitDeadline, - [coordinatorWeakPtr = std::weak_ptr<TransactionCoordinator>(coordinator)]( - const mongo::executor::TaskExecutor::CallbackArgs& cbArgs) mutable { - auto coordinator = coordinatorWeakPtr.lock(); - if (coordinator) { - coordinator->cancelIfCommitNotYetStarted(); - } - })); - - // TODO (SERVER-38715): Store the callback handle in the coordinator, so that the coordinator - // can cancel the cancel task on receiving the participant list. + catalog.insert(opCtx, + lsid, + txnNumber, + std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), + lsid, + txnNumber, + scheduler.makeChildScheduler(), + commitDeadline)); } boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coordinateCommit( @@ -114,8 +100,7 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor return boost::none; } - std::vector<ShardId> participants(participantList.begin(), participantList.end()); - auto decisionFuture = coordinator->runCommit(participants); + coordinator->runCommit(std::vector<ShardId>{participantList.begin(), participantList.end()}); return coordinator->onCompletion().then( [coordinator] { return coordinator->getDecision().get(); }); @@ -123,7 +108,7 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor // TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision // is made durable. Currently the coordinator waits to hear acks because participants in prepare // reject requests with a higher transaction number, causing tests to fail. - // return coordinator->runCommit(participants); + // return coordinator->getDecision(); } boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::recoverCommit( @@ -181,6 +166,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, << " transactions"; auto& catalog = catalogAndScheduler->catalog; + auto& scheduler = catalogAndScheduler->scheduler; for (const auto& doc : coordinatorDocs) { LOG(3) << "Going to resume coordinating commit for " << doc.toBSON(); @@ -188,8 +174,13 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx, const auto lsid = *doc.getId().getSessionId(); const auto txnNumber = *doc.getId().getTxnNumber(); - auto coordinator = std::make_shared<TransactionCoordinator>( - opCtx->getServiceContext(), lsid, txnNumber); + auto coordinator = + std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), + lsid, + txnNumber, + scheduler.makeChildScheduler(), + boost::none /* No deadline */); + catalog.insert(opCtx, lsid, txnNumber, coordinator, true /* forStepUp */); coordinator->continueCommit(doc); } @@ -214,8 +205,7 @@ void TransactionCoordinatorService::onStepDown() { _catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler); } - _catalogAndSchedulerToCleanup->scheduler.shutdown( - {ErrorCodes::InterruptedDueToStepDown, "Transaction coordinator service stepping down"}); + _catalogAndSchedulerToCleanup->onStepDown(); } void TransactionCoordinatorService::onShardingInitialization(OperationContext* opCtx, @@ -253,9 +243,19 @@ void TransactionCoordinatorService::_joinPreviousRound() { // Block until all coordinators scheduled the previous time the service was primary to have // drained. Because the scheduler was interrupted, it should be extremely rare for there to be // any coordinators left, so if this actually causes blocking, it would most likely be a bug. - _catalogAndSchedulerToCleanup->recoveryTaskCompleted->wait(); - _catalogAndSchedulerToCleanup->catalog.join(); + _catalogAndSchedulerToCleanup->join(); _catalogAndSchedulerToCleanup.reset(); } +void TransactionCoordinatorService::CatalogAndScheduler::onStepDown() { + scheduler.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown, + "Transaction coordinator service stepping down"}); + catalog.onStepDown(); +} + +void TransactionCoordinatorService::CatalogAndScheduler::join() { + recoveryTaskCompleted->wait(); + catalog.join(); +} + } // namespace mongo |