summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Saltz <matthew.saltz@mongodb.com>2019-01-29 11:34:54 -0500
committerMatthew Saltz <matthew.saltz@mongodb.com>2019-01-30 12:10:25 -0500
commitd1771e696b6df883af70eedaaa0733548c573fec (patch)
tree551cc628f50ce01639d6df14dceef90e1bb82bc8
parent5fbc833d3906e960554b3697672f34060d5e1d9e (diff)
downloadmongo-d1771e696b6df883af70eedaaa0733548c573fec.tar.gz
SERVER-39187 Signal coordinator's decision promise when the coordinator is canceled before beginning commit
-rw-r--r--jstests/sharding/transactions_recover_decision_from_local_participant.js76
-rw-r--r--src/mongo/db/s/txn_two_phase_commit_cmds.cpp3
-rw-r--r--src/mongo/db/transaction_coordinator.cpp4
-rw-r--r--src/mongo/db/transaction_coordinator.h5
-rw-r--r--src/mongo/db/transaction_coordinator_driver.cpp5
-rw-r--r--src/mongo/db/transaction_coordinator_driver.h1
-rw-r--r--src/mongo/db/transaction_coordinator_futures_util_test.cpp2
-rw-r--r--src/mongo/db/transaction_coordinator_service_test.cpp18
8 files changed, 106 insertions, 8 deletions
diff --git a/jstests/sharding/transactions_recover_decision_from_local_participant.js b/jstests/sharding/transactions_recover_decision_from_local_participant.js
index 88d660dd315..9afd40377b9 100644
--- a/jstests/sharding/transactions_recover_decision_from_local_participant.js
+++ b/jstests/sharding/transactions_recover_decision_from_local_participant.js
@@ -11,6 +11,9 @@
// The test modifies config.transactions, which must be done outside of a session.
TestData.disableImplicitSessions = true;
+ // Reducing this from the resmoke default, which is several hours, so that tests that rely on a
+ // transaction coordinator being canceled after a timeout happen in a reasonable amount of time.
+ TestData.transactionLifetimeLimitSeconds = 60;
let st =
new ShardingTest({shards: 2, rs: {nodes: 2}, mongos: 2, other: {rsOptions: {verbose: 2}}});
@@ -41,7 +44,27 @@
writeConcern));
};
- const startNewTransactionThroughMongos = function() {
+ const startNewSingleShardTransactionThroughMongos = function() {
+ const updateDocumentOnShard0 = {
+ q: {x: -1},
+ u: {"$set": {lastTxnNumber: txnNumber}},
+ upsert: true
+ };
+
+ let res = assert.commandWorked(testDB.runCommand({
+ update: 'user',
+ updates: [updateDocumentOnShard0],
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false,
+ startTransaction: true
+ }));
+
+ assert.neq(null, res.recoveryToken);
+ return res.recoveryToken;
+ };
+
+ const startNewCrossShardTransactionThroughMongos = function() {
const updateDocumentOnShard0 = {
q: {x: -1},
u: {"$set": {lastTxnNumber: txnNumber}},
@@ -92,7 +115,7 @@
"coordinateCommit sent after coordinator finished coordinating an abort decision.");
++txnNumber;
- let recoveryToken = startNewTransactionThroughMongos();
+ let recoveryToken = startNewCrossShardTransactionThroughMongos();
assert.commandWorked(st.rs0.getPrimary().adminCommand({
abortTransaction: 1,
lsid: lsid,
@@ -115,7 +138,7 @@
"coordinateCommit sent after coordinator finished coordinating a commit decision.");
++txnNumber;
- recoveryToken = startNewTransactionThroughMongos();
+ recoveryToken = startNewCrossShardTransactionThroughMongos();
assert.commandWorked(testDB.adminCommand({
commitTransaction: 1,
lsid: lsid,
@@ -129,7 +152,7 @@
"coordinateCommit sent after coordinator finished coordinating a commit decision but coordinator node can't majority commit writes");
++txnNumber;
- recoveryToken = startNewTransactionThroughMongos();
+ recoveryToken = startNewCrossShardTransactionThroughMongos();
assert.commandWorked(testDB.adminCommand({
commitTransaction: 1,
lsid: lsid,
@@ -181,7 +204,7 @@
jsTest.log(
"coordinateCommit sent for higher transaction number than participant has seen.");
++txnNumber;
- recoveryToken = startNewTransactionThroughMongos();
+ recoveryToken = startNewCrossShardTransactionThroughMongos();
assert.commandFailedWithCode(runCoordinateCommit(txnNumber + 1, participantList),
ErrorCodes.NoSuchTransaction);
@@ -205,5 +228,48 @@
txnNumber = 0;
runTest([]);
+ /**
+ * Test that commit recovery succeeds with a single-shard transaction that has already
+ * committed.
+ */
+ (function() {
+ lsid = {id: UUID()};
+ txnNumber = 0;
+
+ // Start single-shard transaction so that coordinateCommit is not necessary for commit.
+ let recoveryToken = startNewSingleShardTransactionThroughMongos();
+
+ // Commit the transaction from the first mongos.
+ assert.commandWorked(testDB.adminCommand({
+ commitTransaction: 1,
+ lsid: lsid,
+ txnNumber: NumberLong(txnNumber),
+ autocommit: false,
+ recoveryToken: recoveryToken
+ }));
+
+ // Try to recover decision from other mongos. This should block until the coordinator is
+ // removed and then return the commit decision (which was commit).
+ assert.commandWorked(sendCommitViaOtherMongos(lsid, txnNumber, recoveryToken));
+ }());
+
+ /**
+ * Test that commit recovery succeeds with a multi-shard transaction for which commit is never
+ * sent.
+ */
+ (function() {
+ lsid = {id: UUID()};
+ txnNumber = 0;
+
+ // Start transaction and run CRUD ops on several shards.
+ let recoveryToken = startNewCrossShardTransactionThroughMongos();
+
+ // Try to recover decision from other mongos. This should block until the transaction
+ // coordinator is canceled after transactionLifetimeLimitSeconds, after which it should
+ // abort the local participant and return NoSuchTransaction.
+ assert.commandFailedWithCode(sendCommitViaOtherMongos(lsid, txnNumber, recoveryToken),
+ ErrorCodes.NoSuchTransaction);
+ })();
+
st.stop();
})();
diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
index eaa2a9ba36a..d458006ffdb 100644
--- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
+++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp
@@ -244,6 +244,9 @@ public:
// The commit coordination is still ongoing. Block waiting for the decision.
auto commitDecision = commitDecisionFuture->get(opCtx);
switch (commitDecision) {
+ case txn::CommitDecision::kCanceled:
+ // Continue on to recover the commit decision from disk.
+ break;
case txn::CommitDecision::kAbort:
uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted");
case txn::CommitDecision::kCommit:
diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp
index 463cdb879a8..1040735154e 100644
--- a/src/mongo/db/transaction_coordinator.cpp
+++ b/src/mongo/db/transaction_coordinator.cpp
@@ -176,6 +176,8 @@ Future<void> TransactionCoordinator::onCompletion() {
void TransactionCoordinator::cancelIfCommitNotYetStarted() {
stdx::unique_lock<stdx::mutex> lk(_mutex);
if (_state == CoordinatorState::kInit) {
+ invariant(!_finalDecisionPromise.getFuture().isReady());
+ _finalDecisionPromise.emplaceValue(txn::CommitDecision::kCanceled);
_transitionToDone(std::move(lk));
}
}
@@ -195,6 +197,8 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants(
case txn::CommitDecision::kAbort:
_state = CoordinatorState::kAborting;
return _driver.sendAbort(participantShards, _lsid, _txnNumber);
+ case txn::CommitDecision::kCanceled:
+ MONGO_UNREACHABLE;
};
MONGO_UNREACHABLE;
};
diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h
index 063e32e209a..2fb6bfa0cef 100644
--- a/src/mongo/db/transaction_coordinator.h
+++ b/src/mongo/db/transaction_coordinator.h
@@ -226,8 +226,9 @@ inline logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream,
const txn::CommitDecision& decision) {
// clang-format off
switch (decision) {
- case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break;
- case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break;
+ case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break;
+ case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break;
+ case txn::CommitDecision::kCanceled: stream.stream() << "kCanceled"; break;
};
// clang-format on
return stream;
diff --git a/src/mongo/db/transaction_coordinator_driver.cpp b/src/mongo/db/transaction_coordinator_driver.cpp
index 9e78746bed0..1853053139c 100644
--- a/src/mongo/db/transaction_coordinator_driver.cpp
+++ b/src/mongo/db/transaction_coordinator_driver.cpp
@@ -280,6 +280,11 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare(
? std::max(result.maxPrepareTimestamp, next.prepareTimestamp)
: next.prepareTimestamp;
break;
+ case PrepareVote::kCanceled:
+ // PrepareVote is just an alias for CommitDecision, so we need to include this
+ // branch as part of the switch statement, but CommitDecision::kCanceled will
+ // never be a valid response to prepare so this path is unreachable.
+ MONGO_UNREACHABLE;
}
return txn::ShouldStopIteration::kNo;
diff --git a/src/mongo/db/transaction_coordinator_driver.h b/src/mongo/db/transaction_coordinator_driver.h
index a92d97f8174..62354189320 100644
--- a/src/mongo/db/transaction_coordinator_driver.h
+++ b/src/mongo/db/transaction_coordinator_driver.h
@@ -46,6 +46,7 @@ namespace txn {
enum class CommitDecision {
kCommit,
kAbort,
+ kCanceled,
};
/**
diff --git a/src/mongo/db/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/transaction_coordinator_futures_util_test.cpp
index 86754e7ef60..ecb4ed5ba66 100644
--- a/src/mongo/db/transaction_coordinator_futures_util_test.cpp
+++ b/src/mongo/db/transaction_coordinator_futures_util_test.cpp
@@ -300,7 +300,7 @@ TEST_F(DoWhileTest, LoopBodyExecutesAtLeastOnceWithBackoff) {
TEST_F(DoWhileTest, LoopBodyExecutesManyIterationsWithoutBackoff) {
AsyncWorkScheduler async(getServiceContext());
- int remainingLoops = 100'000;
+ int remainingLoops = 1000;
auto future = doWhile(async,
boost::none,
[&remainingLoops](const StatusWith<int>& status) {
diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp
index 05aaa06b759..28c0bcb9780 100644
--- a/src/mongo/db/transaction_coordinator_service_test.cpp
+++ b/src/mongo/db/transaction_coordinator_service_test.cpp
@@ -382,6 +382,24 @@ TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationT
static_cast<int>(commitDecisionFuture2.get()));
}
+TEST_F(TransactionCoordinatorServiceTest,
+ RecoverCommitWorksIfCommitNeverReceivedAndCoordinationCanceled) {
+ auto coordinatorService = TransactionCoordinatorService::get(operationContext());
+
+ coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline);
+
+ auto commitDecisionFuture =
+ *coordinatorService->recoverCommit(operationContext(), _lsid, _txnNumber);
+
+ // Cancel previous coordinator by creating a new coordinator at a higher txn number.
+ coordinatorService->createCoordinator(
+ operationContext(), _lsid, _txnNumber + 1, kCommitDeadline);
+
+
+ ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()),
+ static_cast<int>(txn::CommitDecision::kCanceled));
+}
+
TEST_F(
TransactionCoordinatorServiceTest,
CreateCoordinatorWithHigherTxnNumberThanExistingButNotYetCommittingTxnCancelsPreviousTxnAndSucceeds) {