summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2019-01-29 11:34:54 -0500
committerMatthew Saltz <matthew.saltz@mongodb.com>2019-01-30 12:10:25 -0500
commitd1771e696b6df883af70eedaaa0733548c573fec (patch)
tree551cc628f50ce01639d6df14dceef90e1bb82bc8 /src/mongo
parent5fbc833d3906e960554b3697672f34060d5e1d9e (diff)
downloadmongo-d1771e696b6df883af70eedaaa0733548c573fec.tar.gz
SERVER-39187 Signal coordinator's decision promise when the coordinator is canceled before beginning commit
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp3
-rw-r--r--src/mongo/db/transaction_coordinator.cpp4
-rw-r--r--src/mongo/db/transaction_coordinator.h5
-rw-r--r--src/mongo/db/transaction_coordinator_driver.cpp5
-rw-r--r--src/mongo/db/transaction_coordinator_driver.h1
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util_test.cpp2
-rw-r--r--src/mongo/db/transaction_coordinator_service_test.cpp18
7 files changed, 35 insertions, 3 deletions
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index eaa2a9ba36a..d458006ffdb 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -244,6 +244,9 @@ public:
// The commit coordination is still ongoing. Block waiting for the decision.
auto commitDecision = commitDecisionFuture->get(opCtx);
switch (commitDecision) {
+ case txn::CommitDecision::kCanceled:
+ // Continue on to recover the commit decision from disk.
+ break;
case txn::CommitDecision::kAbort:
uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted");
case txn::CommitDecision::kCommit:
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp
index 463cdb879a8..1040735154e 100644
--- a/src/mongo/db/transaction_coordinator.cpp
+++ b/src/mongo/db/transaction_coordinator.cpp
@@ -176,6 +176,8 @@ Future<void> TransactionCoordinator::onCompletion() {
void TransactionCoordinator::cancelIfCommitNotYetStarted() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_state == CoordinatorState::kInit) {
+ invariant(!_finalDecisionPromise.getFuture().isReady());
+ _finalDecisionPromise.emplaceValue(txn::CommitDecision::kCanceled);
_transitionToDone(std::move(lk));
}
}
@@ -195,6 +197,8 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants(
case txn::CommitDecision::kAbort:
_state = CoordinatorState::kAborting;
return _driver.sendAbort(participantShards, _lsid, _txnNumber);
+ case txn::CommitDecision::kCanceled:
+ MONGO_UNREACHABLE;
};
MONGO_UNREACHABLE;
};
diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h
index 063e32e209a..2fb6bfa0cef 100644
--- a/src/mongo/db/transaction_coordinator.h
+++ b/src/mongo/db/transaction_coordinator.h
@@ -226,8 +226,9 @@ inline logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
const txn::CommitDecision& decision) {
// clang-format off
switch (decision) {
- case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break;
- case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break;
+ case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break;
+ case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break;
+ case txn::CommitDecision::kCanceled: stream.stream() << "kCanceled"; break;
};
// clang-format on
return stream;
diff --git a/src/mongo/db/transaction_coordinator_driver.cpp b/src/mongo/db/transaction_coordinator_driver.cpp
index 9e78746bed0..1853053139c 100644
--- a/src/mongo/db/transaction_coordinator_driver.cpp
+++ b/src/mongo/db/transaction_coordinator_driver.cpp
@@ -280,6 +280,11 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
? std::max(result.maxPrepareTimestamp, next.prepareTimestamp)
: next.prepareTimestamp;
break;
+ case PrepareVote::kCanceled:
+ // PrepareVote is just an alias for CommitDecision, so we need to include this
+ // branch as part of the switch statement, but CommitDecision::kCanceled will
+ // never be a valid response to prepare so this path is unreachable.
+ MONGO_UNREACHABLE;
}
return txn::ShouldStopIteration::kNo;
diff --git a/src/mongo/db/transaction_coordinator_driver.h b/src/mongo/db/transaction_coordinator_driver.h
index a92d97f8174..62354189320 100644
--- a/src/mongo/db/transaction_coordinator_driver.h
+++ b/src/mongo/db/transaction_coordinator_driver.h
@@ -46,6 +46,7 @@ namespace txn {
enum class CommitDecision {
kCommit,
kAbort,
+ kCanceled,
};
/**
diff --git a/src/mongo/db/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/transaction_coordinator_futures_util_test.cpp
index 86754e7ef60..ecb4ed5ba66 100644
--- a/src/mongo/db/transaction_coordinator_futures_util_test.cpp
+++ b/src/mongo/db/transaction_coordinator_futures_util_test.cpp
@@ -300,7 +300,7 @@ TEST_F(DoWhileTest, LoopBodyExecutesAtLeastOnceWithBackoff) {
TEST_F(DoWhileTest, LoopBodyExecutesManyIterationsWithoutBackoff) {
AsyncWorkScheduler async(getServiceContext());
- int remainingLoops = 100'000;
+ int remainingLoops = 1000;
auto future = doWhile(async,
boost::none,
[&remainingLoops](const StatusWith<int>& status) {
diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp
index 05aaa06b759..28c0bcb9780 100644
--- a/src/mongo/db/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/transaction_coordinator_service_test.cpp
@@ -382,6 +382,24 @@ TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationT
static_cast<int>(commitDecisionFuture2.get()));
}
+TEST_F(TransactionCoordinatorServiceTest,
+ RecoverCommitWorksIfCommitNeverReceivedAndCoordinationCanceled) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ auto commitDecisionFuture =
+ *coordinatorService->recoverCommit(operationContext(), _lsid, _txnNumber);
+
+ // Cancel previous coordinator by creating a new coordinator at a higher txn number.
+ coordinatorService->createCoordinator(
+ operationContext(), _lsid, _txnNumber + 1, kCommitDeadline);
+
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()),
+ static_cast<int>(txn::CommitDecision::kCanceled));
+}
+
TEST_F(
TransactionCoordinatorServiceTest,
CreateCoordinatorWithHigherTxnNumberThanExistingButNotYetCommittingTxnCancelsPreviousTxnAndSucceeds) {