summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/transaction_coordinator_service.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-02-12 16:56:00 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-02-22 07:19:19 -0500
commit63220b7df242b26f83c7083394b463c4e4b3a5d8 (patch)
tree203129f84d8afad520169a95fd03138af7b4107b /src/mongo/db/s/transaction_coordinator_service.cpp
parentcdc5a7e8fe27e66b4a5f862cf9624130fd5e4eeb (diff)
downloadmongo-63220b7df242b26f83c7083394b463c4e4b3a5d8.tar.gz
SERVER-38522/SERVER-38715 Make all TransactionCoordinator tasks interruptible
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator_service.cpp')
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.cpp64
1 files changed, 32 insertions, 32 deletions
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp
index 8f1301873e4..e5b5e15e300 100644
--- a/src/mongo/db/s/transaction_coordinator_service.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service.cpp
@@ -36,8 +36,6 @@
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/s/transaction_coordinator_document_gen.h"
#include "mongo/db/write_concern.h"
-#include "mongo/executor/task_executor.h"
-#include "mongo/executor/task_executor_pool.h"
#include "mongo/s/grid.h"
#include "mongo/util/log.h"
@@ -69,6 +67,7 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
Date_t commitDeadline) {
auto cas = _getCatalogAndScheduler(opCtx);
auto& catalog = cas->catalog;
+ auto& scheduler = cas->scheduler;
if (auto latestTxnNumAndCoordinator = catalog.getLatestOnSession(opCtx, lsid)) {
auto latestCoordinator = latestTxnNumAndCoordinator->second;
@@ -78,27 +77,14 @@ void TransactionCoordinatorService::createCoordinator(OperationContext* opCtx,
latestCoordinator->cancelIfCommitNotYetStarted();
}
- auto coordinator =
- std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(), lsid, txnNumber);
-
- catalog.insert(opCtx, lsid, txnNumber, coordinator);
-
- // Schedule a task in the future to cancel the commit coordination on the coordinator, so that
- // the coordinator does not remain in memory forever (in case the particpant list is never
- // received).
- const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor();
- auto cbHandle = uassertStatusOK(executor->scheduleWorkAt(
- commitDeadline,
- [coordinatorWeakPtr = std::weak_ptr<TransactionCoordinator>(coordinator)](
- const mongo::executor::TaskExecutor::CallbackArgs& cbArgs) mutable {
- auto coordinator = coordinatorWeakPtr.lock();
- if (coordinator) {
- coordinator->cancelIfCommitNotYetStarted();
- }
- }));
-
- // TODO (SERVER-38715): Store the callback handle in the coordinator, so that the coordinator
- // can cancel the cancel task on receiving the participant list.
+ catalog.insert(opCtx,
+ lsid,
+ txnNumber,
+ std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(),
+ lsid,
+ txnNumber,
+ scheduler.makeChildScheduler(),
+ commitDeadline));
}
boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coordinateCommit(
@@ -114,8 +100,7 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor
return boost::none;
}
- std::vector<ShardId> participants(participantList.begin(), participantList.end());
- auto decisionFuture = coordinator->runCommit(participants);
+ coordinator->runCommit(std::vector<ShardId>{participantList.begin(), participantList.end()});
return coordinator->onCompletion().then(
[coordinator] { return coordinator->getDecision().get(); });
@@ -123,7 +108,7 @@ boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::coor
// TODO (SERVER-37364): Re-enable the coordinator returning the decision as soon as the decision
// is made durable. Currently the coordinator waits to hear acks because participants in prepare
// reject requests with a higher transaction number, causing tests to fail.
- // return coordinator->runCommit(participants);
+ // return coordinator->getDecision();
}
boost::optional<Future<txn::CommitDecision>> TransactionCoordinatorService::recoverCommit(
@@ -181,6 +166,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
<< " transactions";
auto& catalog = catalogAndScheduler->catalog;
+ auto& scheduler = catalogAndScheduler->scheduler;
for (const auto& doc : coordinatorDocs) {
LOG(3) << "Going to resume coordinating commit for " << doc.toBSON();
@@ -188,8 +174,13 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
const auto lsid = *doc.getId().getSessionId();
const auto txnNumber = *doc.getId().getTxnNumber();
- auto coordinator = std::make_shared<TransactionCoordinator>(
- opCtx->getServiceContext(), lsid, txnNumber);
+ auto coordinator =
+ std::make_shared<TransactionCoordinator>(opCtx->getServiceContext(),
+ lsid,
+ txnNumber,
+ scheduler.makeChildScheduler(),
+ boost::none /* No deadline */);
+
catalog.insert(opCtx, lsid, txnNumber, coordinator, true /* forStepUp */);
coordinator->continueCommit(doc);
}
@@ -214,8 +205,7 @@ void TransactionCoordinatorService::onStepDown() {
_catalogAndSchedulerToCleanup = std::move(_catalogAndScheduler);
}
- _catalogAndSchedulerToCleanup->scheduler.shutdown(
- {ErrorCodes::InterruptedDueToStepDown, "Transaction coordinator service stepping down"});
+ _catalogAndSchedulerToCleanup->onStepDown();
}
void TransactionCoordinatorService::onShardingInitialization(OperationContext* opCtx,
@@ -253,9 +243,19 @@ void TransactionCoordinatorService::_joinPreviousRound() {
// Block until all coordinators scheduled the previous time the service was primary to have
// drained. Because the scheduler was interrupted, it should be extremely rare for there to be
// any coordinators left, so if this actually causes blocking, it would most likely be a bug.
- _catalogAndSchedulerToCleanup->recoveryTaskCompleted->wait();
- _catalogAndSchedulerToCleanup->catalog.join();
+ _catalogAndSchedulerToCleanup->join();
_catalogAndSchedulerToCleanup.reset();
}
+void TransactionCoordinatorService::CatalogAndScheduler::onStepDown() {
+ scheduler.shutdown({ErrorCodes::TransactionCoordinatorSteppingDown,
+ "Transaction coordinator service stepping down"});
+ catalog.onStepDown();
+}
+
+void TransactionCoordinatorService::CatalogAndScheduler::join() {
+ recoveryTaskCompleted->wait();
+ catalog.join();
+}
+
} // namespace mongo