summaryrefslogtreecommitdiff
path: root/src/mongo/db/s/transaction_coordinator.cpp
diff options
context:
space:
mode:
authorRandolph Tan <randolph@10gen.com>2019-07-29 15:45:21 -0400
committerRandolph Tan <randolph@10gen.com>2019-07-31 16:51:43 -0400
commitf083be87fa096353b004e3f4ddd0503b77f7ef06 (patch)
tree8e9fcdd291e7f520aa690c8e9439b18b8111745d /src/mongo/db/s/transaction_coordinator.cpp
parent003db640a8b0523e130badc4b935624a7d000c60 (diff)
downloadmongo-f083be87fa096353b004e3f4ddd0503b77f7ef06.tar.gz
SERVER-42338 Make the hang failpoint callback always ran on the executor thread
Diffstat (limited to 'src/mongo/db/s/transaction_coordinator.cpp')
-rw-r--r--src/mongo/db/s/transaction_coordinator.cpp55
1 files changed, 35 insertions, 20 deletions
diff --git a/src/mongo/db/s/transaction_coordinator.cpp b/src/mongo/db/s/transaction_coordinator.cpp
index 76c51e89ca6..8dc918a6212 100644
--- a/src/mongo/db/s/transaction_coordinator.cpp
+++ b/src/mongo/db/s/transaction_coordinator.cpp
@@ -52,21 +52,36 @@ using TransactionCoordinatorDocument = txn::TransactionCoordinatorDocument;
MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForParticipantListWriteConcern);
MONGO_FAIL_POINT_DEFINE(hangBeforeWaitingForDecisionWriteConcern);
-void hangIfFailPointEnabled(ServiceContext* service,
- FailPoint& failpoint,
- const StringData& failPointName) {
+ExecutorFuture<void> waitForMajorityWithHangFailpoint(ServiceContext* service,
+ FailPoint& failpoint,
+ const std::string& failPointName,
+ repl::OpTime opTime) {
+ auto executor = Grid::get(service)->getExecutorPool()->getFixedExecutor();
+ auto waitForWC = [service, executor](repl::OpTime opTime) {
+ return WaitForMajorityService::get(service).waitUntilMajority(opTime).thenRunOn(executor);
+ };
+
MONGO_FAIL_POINT_BLOCK(failpoint, fp) {
LOG(0) << "Hit " << failPointName << " failpoint";
const BSONObj& data = fp.getData();
- if (!data["useUninterruptibleSleep"].eoo()) {
- MONGO_FAIL_POINT_PAUSE_WHILE_SET(failpoint);
- } else {
- ThreadClient tc(failPointName, service);
- auto opCtx = tc->makeOperationContext();
-
- MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx.get(), failpoint);
- }
+
+ // Run the hang failpoint asynchronously on a different thread to avoid self deadlocks.
+ return ExecutorFuture<void>(executor).then(
+ [service, &failpoint, failPointName, data, waitForWC, opTime] {
+ if (!data["useUninterruptibleSleep"].eoo()) {
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET(failpoint);
+ } else {
+ ThreadClient tc(failPointName, service);
+ auto opCtx = tc->makeOperationContext();
+
+ MONGO_FAIL_POINT_PAUSE_WHILE_SET_OR_INTERRUPTED(opCtx.get(), failpoint);
+ }
+
+ return waitForWC(std::move(opTime));
+ });
}
+
+ return waitForWC(std::move(opTime));
}
} // namespace
@@ -142,10 +157,11 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
*_sendPrepareScheduler, _lsid, _txnNumber, *_participants);
})
.then([this](repl::OpTime opTime) {
- hangIfFailPointEnabled(_serviceContext,
- hangBeforeWaitingForParticipantListWriteConcern,
- "hangBeforeWaitingForParticipantListWriteConcern");
- return WaitForMajorityService::get(_serviceContext).waitUntilMajority(opTime);
+ return waitForMajorityWithHangFailpoint(
+ _serviceContext,
+ hangBeforeWaitingForParticipantListWriteConcern,
+ "hangBeforeWaitingForParticipantListWriteConcern",
+ std::move(opTime));
})
.thenRunOn(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor())
.then([this] {
@@ -221,12 +237,11 @@ TransactionCoordinator::TransactionCoordinator(ServiceContext* serviceContext,
return txn::persistDecision(*_scheduler, _lsid, _txnNumber, *_participants, *_decision);
})
.then([this](repl::OpTime opTime) {
- hangIfFailPointEnabled(_serviceContext,
- hangBeforeWaitingForDecisionWriteConcern,
- "hangBeforeWaitingForDecisionWriteConcern");
- return WaitForMajorityService::get(_serviceContext).waitUntilMajority(opTime);
+ return waitForMajorityWithHangFailpoint(_serviceContext,
+ hangBeforeWaitingForDecisionWriteConcern,
+ "hangBeforeWaitingForDecisionWriteConcern",
+ std::move(opTime));
})
- .thenRunOn(Grid::get(_serviceContext)->getExecutorPool()->getFixedExecutor())
.then([this] {
{
stdx::lock_guard<stdx::mutex> lg(_mutex);