diff options
author | Matthew Saltz <matthew.saltz@mongodb.com> | 2019-01-29 11:34:54 -0500 |
---|---|---|
committer | Matthew Saltz <matthew.saltz@mongodb.com> | 2019-01-30 12:10:25 -0500 |
commit | d1771e696b6df883af70eedaaa0733548c573fec (patch) | |
tree | 551cc628f50ce01639d6df14dceef90e1bb82bc8 | |
parent | 5fbc833d3906e960554b3697672f34060d5e1d9e (diff) | |
download | mongo-d1771e696b6df883af70eedaaa0733548c573fec.tar.gz |
SERVER-39187 Signal coordinator's decision promise when the coordinator is canceled before beginning commit
8 files changed, 106 insertions, 8 deletions
diff --git a/jstests/sharding/transactions_recover_decision_from_local_participant.js b/jstests/sharding/transactions_recover_decision_from_local_participant.js index 88d660dd315..9afd40377b9 100644 --- a/jstests/sharding/transactions_recover_decision_from_local_participant.js +++ b/jstests/sharding/transactions_recover_decision_from_local_participant.js @@ -11,6 +11,9 @@ // The test modifies config.transactions, which must be done outside of a session. TestData.disableImplicitSessions = true; + // Reducing this from the resmoke default, which is several hours, so that tests that rely on a + // transaction coordinator being canceled after a timeout happen in a reasonable amount of time. + TestData.transactionLifetimeLimitSeconds = 60; let st = new ShardingTest({shards: 2, rs: {nodes: 2}, mongos: 2, other: {rsOptions: {verbose: 2}}}); @@ -41,7 +44,27 @@ writeConcern)); }; - const startNewTransactionThroughMongos = function() { + const startNewSingleShardTransactionThroughMongos = function() { + const updateDocumentOnShard0 = { + q: {x: -1}, + u: {"$set": {lastTxnNumber: txnNumber}}, + upsert: true + }; + + let res = assert.commandWorked(testDB.runCommand({ + update: 'user', + updates: [updateDocumentOnShard0], + lsid: lsid, + txnNumber: NumberLong(txnNumber), + autocommit: false, + startTransaction: true + })); + + assert.neq(null, res.recoveryToken); + return res.recoveryToken; + }; + + const startNewCrossShardTransactionThroughMongos = function() { const updateDocumentOnShard0 = { q: {x: -1}, u: {"$set": {lastTxnNumber: txnNumber}}, @@ -92,7 +115,7 @@ "coordinateCommit sent after coordinator finished coordinating an abort decision."); ++txnNumber; - let recoveryToken = startNewTransactionThroughMongos(); + let recoveryToken = startNewCrossShardTransactionThroughMongos(); assert.commandWorked(st.rs0.getPrimary().adminCommand({ abortTransaction: 1, lsid: lsid, @@ -115,7 +138,7 @@ "coordinateCommit sent after coordinator finished coordinating a commit decision."); ++txnNumber; - recoveryToken = startNewTransactionThroughMongos(); + recoveryToken = startNewCrossShardTransactionThroughMongos(); assert.commandWorked(testDB.adminCommand({ commitTransaction: 1, lsid: lsid, @@ -129,7 +152,7 @@ "coordinateCommit sent after coordinator finished coordinating a commit decision but coordinator node can't majority commit writes"); ++txnNumber; - recoveryToken = startNewTransactionThroughMongos(); + recoveryToken = startNewCrossShardTransactionThroughMongos(); assert.commandWorked(testDB.adminCommand({ commitTransaction: 1, lsid: lsid, @@ -181,7 +204,7 @@ jsTest.log( "coordinateCommit sent for higher transaction number than participant has seen."); ++txnNumber; - recoveryToken = startNewTransactionThroughMongos(); + recoveryToken = startNewCrossShardTransactionThroughMongos(); assert.commandFailedWithCode(runCoordinateCommit(txnNumber + 1, participantList), ErrorCodes.NoSuchTransaction); @@ -205,5 +228,48 @@ txnNumber = 0; runTest([]); + /** + * Test that commit recovery succeeds with a single-shard transaction that has already + * committed. + */ + (function() { + lsid = {id: UUID()}; + txnNumber = 0; + + // Start single-shard transaction so that coordinateCommit is not necessary for commit. + let recoveryToken = startNewSingleShardTransactionThroughMongos(); + + // Commit the transaction from the first mongos. + assert.commandWorked(testDB.adminCommand({ + commitTransaction: 1, + lsid: lsid, + txnNumber: NumberLong(txnNumber), + autocommit: false, + recoveryToken: recoveryToken + })); + + // Try to recover decision from other mongos. This should block until the coordinator is + // removed and then return the commit decision (which was commit). + assert.commandWorked(sendCommitViaOtherMongos(lsid, txnNumber, recoveryToken)); + }()); + + /** + * Test that commit recovery succeeds with a multi-shard transaction for which commit is never + * sent. + */ + (function() { + lsid = {id: UUID()}; + txnNumber = 0; + + // Start transaction and run CRUD ops on several shards. + let recoveryToken = startNewCrossShardTransactionThroughMongos(); + + // Try to recover decision from other mongos. This should block until the transaction + // coordinator is canceled after transactionLifetimeLimitSeconds, after which it should + // abort the local participant and return NoSuchTransaction. + assert.commandFailedWithCode(sendCommitViaOtherMongos(lsid, txnNumber, recoveryToken), + ErrorCodes.NoSuchTransaction); + })(); + st.stop(); })(); diff --git a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp index eaa2a9ba36a..d458006ffdb 100644 --- a/src/mongo/db/s/txn_two_phase_commit_cmds.cpp +++ b/src/mongo/db/s/txn_two_phase_commit_cmds.cpp @@ -244,6 +244,9 @@ public: // The commit coordination is still ongoing. Block waiting for the decision. auto commitDecision = commitDecisionFuture->get(opCtx); switch (commitDecision) { + case txn::CommitDecision::kCanceled: + // Continue on to recover the commit decision from disk. + break; case txn::CommitDecision::kAbort: uasserted(ErrorCodes::NoSuchTransaction, "Transaction was aborted"); case txn::CommitDecision::kCommit: diff --git a/src/mongo/db/transaction_coordinator.cpp b/src/mongo/db/transaction_coordinator.cpp index 463cdb879a8..1040735154e 100644 --- a/src/mongo/db/transaction_coordinator.cpp +++ b/src/mongo/db/transaction_coordinator.cpp @@ -176,6 +176,8 @@ Future<void> TransactionCoordinator::onCompletion() { void TransactionCoordinator::cancelIfCommitNotYetStarted() { stdx::unique_lock<stdx::mutex> lk(_mutex); if (_state == CoordinatorState::kInit) { + invariant(!_finalDecisionPromise.getFuture().isReady()); + _finalDecisionPromise.emplaceValue(txn::CommitDecision::kCanceled); _transitionToDone(std::move(lk)); } } @@ -195,6 +197,8 @@ Future<void> TransactionCoordinator::_sendDecisionToParticipants( case txn::CommitDecision::kAbort: _state = CoordinatorState::kAborting; return _driver.sendAbort(participantShards, _lsid, _txnNumber); + case txn::CommitDecision::kCanceled: + MONGO_UNREACHABLE; }; MONGO_UNREACHABLE; }; diff --git a/src/mongo/db/transaction_coordinator.h b/src/mongo/db/transaction_coordinator.h index 063e32e209a..2fb6bfa0cef 100644 --- a/src/mongo/db/transaction_coordinator.h +++ b/src/mongo/db/transaction_coordinator.h @@ -226,8 +226,9 @@ inline logger::LogstreamBuilder& operator<<(logger::LogstreamBuilder& stream, const txn::CommitDecision& decision) { // clang-format off switch (decision) { - case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break; - case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break; + case txn::CommitDecision::kCommit: stream.stream() << "kCommit"; break; + case txn::CommitDecision::kAbort: stream.stream() << "kAbort"; break; + case txn::CommitDecision::kCanceled: stream.stream() << "kCanceled"; break; }; // clang-format on return stream; diff --git a/src/mongo/db/transaction_coordinator_driver.cpp b/src/mongo/db/transaction_coordinator_driver.cpp index 9e78746bed0..1853053139c 100644 --- a/src/mongo/db/transaction_coordinator_driver.cpp +++ b/src/mongo/db/transaction_coordinator_driver.cpp @@ -280,6 +280,11 @@ Future<txn::PrepareVoteConsensus> TransactionCoordinatorDriver::sendPrepare( ? std::max(result.maxPrepareTimestamp, next.prepareTimestamp) : next.prepareTimestamp; break; + case PrepareVote::kCanceled: + // PrepareVote is just an alias for CommitDecision, so we need to include this + // branch as part of the switch statement, but CommitDecision::kCanceled will + // never be a valid response to prepare so this path is unreachable. + MONGO_UNREACHABLE; } return txn::ShouldStopIteration::kNo; diff --git a/src/mongo/db/transaction_coordinator_driver.h b/src/mongo/db/transaction_coordinator_driver.h index a92d97f8174..62354189320 100644 --- a/src/mongo/db/transaction_coordinator_driver.h +++ b/src/mongo/db/transaction_coordinator_driver.h @@ -46,6 +46,7 @@ namespace txn { enum class CommitDecision { kCommit, kAbort, + kCanceled, }; /** diff --git a/src/mongo/db/transaction_coordinator_futures_util_test.cpp b/src/mongo/db/transaction_coordinator_futures_util_test.cpp index 86754e7ef60..ecb4ed5ba66 100644 --- a/src/mongo/db/transaction_coordinator_futures_util_test.cpp +++ b/src/mongo/db/transaction_coordinator_futures_util_test.cpp @@ -300,7 +300,7 @@ TEST_F(DoWhileTest, LoopBodyExecutesAtLeastOnceWithBackoff) { TEST_F(DoWhileTest, LoopBodyExecutesManyIterationsWithoutBackoff) { AsyncWorkScheduler async(getServiceContext()); - int remainingLoops = 100'000; + int remainingLoops = 1000; auto future = doWhile(async, boost::none, [&remainingLoops](const StatusWith<int>& status) { diff --git a/src/mongo/db/transaction_coordinator_service_test.cpp b/src/mongo/db/transaction_coordinator_service_test.cpp index 05aaa06b759..28c0bcb9780 100644 --- a/src/mongo/db/transaction_coordinator_service_test.cpp +++ b/src/mongo/db/transaction_coordinator_service_test.cpp @@ -382,6 +382,24 @@ TEST_F(TransactionCoordinatorServiceTest, RecoverCommitJoinsOngoingCoordinationT static_cast<int>(commitDecisionFuture2.get())); } +TEST_F(TransactionCoordinatorServiceTest, + RecoverCommitWorksIfCommitNeverReceivedAndCoordinationCanceled) { + auto coordinatorService = TransactionCoordinatorService::get(operationContext()); + + coordinatorService->createCoordinator(operationContext(), _lsid, _txnNumber, kCommitDeadline); + + auto commitDecisionFuture = + *coordinatorService->recoverCommit(operationContext(), _lsid, _txnNumber); + + // Cancel previous coordinator by creating a new coordinator at a higher txn number. + coordinatorService->createCoordinator( + operationContext(), _lsid, _txnNumber + 1, kCommitDeadline); + + + ASSERT_EQ(static_cast<int>(commitDecisionFuture.get()), + static_cast<int>(txn::CommitDecision::kCanceled)); +} + TEST_F( TransactionCoordinatorServiceTest, CreateCoordinatorWithHigherTxnNumberThanExistingButNotYetCommittingTxnCancelsPreviousTxnAndSucceeds) { |