diff options
author | Suganthi Mani <suganthi.mani@mongodb.com> | 2019-07-12 16:11:52 -0400 |
---|---|---|
committer | Suganthi Mani <suganthi.mani@mongodb.com> | 2019-07-21 22:40:51 -0400 |
commit | a5d4ab967af9cbba17e6aa5afadca35927bd74c1 (patch) | |
tree | eab6ec805312a30b0a7e0d0392f40a8d0c421b1a | |
parent | 0fddb3efeb0677b6be81fa3615a55d1f2888ed38 (diff) | |
download | mongo-a5d4ab967af9cbba17e6aa5afadca35927bd74c1.tar.gz |
SERVER-41980 Prepared transactions should not acquire ticket on primary.
-rw-r--r-- | buildscripts/resmokeconfig/suites/replica_sets_auth.yml | 1 | ||||
-rw-r--r-- | jstests/replsets/transactions_committed_with_tickets_exhausted.js | 96 | ||||
-rw-r--r-- | src/mongo/db/concurrency/d_concurrency_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/concurrency/locker.h | 12 | ||||
-rw-r--r-- | src/mongo/db/ftdc/collector.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 35 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 28 |
7 files changed, 155 insertions, 21 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth.yml index 3306d5347ae..03f48dd72b5 100644 --- a/buildscripts/resmokeconfig/suites/replica_sets_auth.yml +++ b/buildscripts/resmokeconfig/suites/replica_sets_auth.yml @@ -14,6 +14,7 @@ selector: # Also skip tests that require a ScopedThread, because ScopedThreads don't inherit credentials. - jstests/replsets/interrupted_batch_insert.js - jstests/replsets/transactions_reaped_with_tickets_exhausted.js + - jstests/replsets/transactions_committed_with_tickets_exhausted.js executor: config: diff --git a/jstests/replsets/transactions_committed_with_tickets_exhausted.js b/jstests/replsets/transactions_committed_with_tickets_exhausted.js new file mode 100644 index 00000000000..9fe978a38d6 --- /dev/null +++ b/jstests/replsets/transactions_committed_with_tickets_exhausted.js @@ -0,0 +1,96 @@ +/** + * Test ensures that exhausting the number of write tickets in the system does not prevent + * transactions from being committed. + * + * @tags: [uses_transactions, uses_prepare_transaction] + */ +(function() { + "use strict"; + + load("jstests/libs/parallelTester.js"); // for ScopedThread + load("jstests/core/txns/libs/prepare_helpers.js"); + + // We set the number of write tickets to be a small value in order to avoid needing to spawn a + // large number of threads to exhaust all of the available ones. + const kNumWriteTickets = 5; + + const rst = new ReplSetTest({ + nodes: 1, + nodeOptions: { + setParameter: { + wiredTigerConcurrentWriteTransactions: kNumWriteTickets, + + // Setting a transaction lifetime of 20 seconds works fine locally because the + // threads which attempt to run the drop command are spawned quickly enough. This + // might not be the case for Evergreen hosts and may need to be tuned accordingly. + transactionLifetimeLimitSeconds: 20, + } + } + }); + rst.startSet(); + rst.initiate(); + + const primary = rst.getPrimary(); + const db = primary.getDB("test"); + + const session = primary.startSession({causalConsistency: false}); + const sessionDb = session.getDatabase("test"); + + assert.commandWorked(db.runCommand({create: "mycoll"})); + + jsTestLog("Starting transaction"); + session.startTransaction(); + assert.commandWorked(sessionDb.mycoll.insert({})); + + jsTestLog("Preparing transaction"); + let prepareTimestamp = PrepareHelpers.prepareTransaction(session); + + const threads = []; + + for (let i = 0; i < kNumWriteTickets; ++i) { + const thread = new ScopedThread(function(host) { + try { + const conn = new Mongo(host); + const db = conn.getDB("test"); + + // Dropping a collection requires a database X lock and therefore blocks behind the + // transaction committing or aborting. + db.mycoll.drop(); + + return {ok: 1}; + } catch (e) { + return {ok: 0, error: e.toString(), stack: e.stack}; + } + }, primary.host); + + threads.push(thread); + thread.start(); + } + + // We wait until all of the drop commands are waiting for a lock to know that we've exhausted + // all of the available write tickets. + assert.soon( + () => { + const ops = db.currentOp({"command.drop": "mycoll", waitingForLock: true}); + return ops.inprog.length === kNumWriteTickets; + }, + () => { + return `Didn't find ${kNumWriteTickets} drop commands running: ` + + tojson(db.currentOp()); + }); + + // Should be able to successfully commit the transaction with the write tickets exhausted. + jsTestLog("Committing transaction"); + assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp)); + + jsTestLog("Waiting for drop command to join"); + for (let thread of threads) { + thread.join(); + } + + for (let thread of threads) { + assert.commandWorked(thread.returnData()); + } + + rst.stopSet(); +})(); diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp index cb310b55eb9..1a0b49a9e82 100644 --- a/src/mongo/db/concurrency/d_concurrency_test.cpp +++ b/src/mongo/db/concurrency/d_concurrency_test.cpp @@ -1476,7 +1476,7 @@ TEST_F(DConcurrencyTestFixture, NoThrottlingWhenNotAcquiringTickets) { UseGlobalThrottling throttle(opctx1, 1); // Prevent the enforcement of ticket throttling. - opctx1->lockState()->setShouldAcquireTicket(false); + opctx1->lockState()->skipAcquireTicket(); // Both locks should be acquired immediately because there is no throttling. Lock::GlobalRead R1(opctx1, Date_t::now(), Lock::InterruptBehavior::kThrow); diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index 47996f9b62a..746b65fa40d 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -499,13 +499,15 @@ public: } /** - * If set to false, this opts out of the ticket mechanism. This should be used sparingly - * for special purpose threads, such as FTDC. + * This will opt out of the ticket mechanism. This should be used sparingly for special purpose + * threads, such as FTDC and committing or aborting prepared transactions. */ - void setShouldAcquireTicket(bool newValue) { - invariant(!isLocked() || isNoop()); - _shouldAcquireTicket = newValue; + void skipAcquireTicket() { + // Should not hold or wait for the ticket. + invariant(isNoop() || getClientState() == Locker::ClientState::kInactive); + _shouldAcquireTicket = false; } + bool shouldAcquireTicket() const { return _shouldAcquireTicket; } diff --git a/src/mongo/db/ftdc/collector.cpp b/src/mongo/db/ftdc/collector.cpp index 2462ee34690..37dd68b136e 100644 --- a/src/mongo/db/ftdc/collector.cpp +++ b/src/mongo/db/ftdc/collector.cpp @@ -68,7 +68,7 @@ std::tuple<BSONObj, Date_t> FTDCCollectorCollection::collect(Client* client) { // batches that are taking a long time. auto opCtx = client->makeOperationContext(); ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState()); - opCtx->lockState()->setShouldAcquireTicket(false); + opCtx->lockState()->skipAcquireTicket(); // Explicitly start future read transactions without a timestamp. opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 54544c4d6e6..c9b54f19cb1 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -869,7 +869,7 @@ void TransactionParticipant::Participant::resetRetryableWriteState(OperationCont } void TransactionParticipant::Participant::_releaseTransactionResourcesToOpCtx( - OperationContext* opCtx, MaxLockTimeout maxLockTimeout) { + OperationContext* opCtx, MaxLockTimeout maxLockTimeout, AcquireTicket acquireTicket) { // Transaction resources already exist for this transaction. Transfer them from the // stash to the operation context. // @@ -910,8 +910,14 @@ void TransactionParticipant::Participant::_releaseTransactionResourcesToOpCtx( } } + if (acquireTicket == AcquireTicket::kSkip) { + stashLocker->skipAcquireTicket(); + } + tempTxnResourceStash->release(opCtx); releaseOnError.dismiss(); + + invariant(opCtx->lockState()->shouldAcquireTicket() || o().txnState.isPrepared()); } void TransactionParticipant::Participant::unstashTransactionResources(OperationContext* opCtx, @@ -928,13 +934,26 @@ void TransactionParticipant::Participant::unstashTransactionResources(OperationC _checkIsCommandValidWithTxnState(*opCtx->getTxnNumber(), cmdName); if (o().txnResourceStash) { MaxLockTimeout maxLockTimeout; - // Max lock timeout must not be set on secondaries, since secondary oplog application cannot - // fail. And, primaries should respect the transaction lock timeout, since it can prevent - // the transaction from making progress. - maxLockTimeout = - opCtx->writesAreReplicated() ? MaxLockTimeout::kAllowed : MaxLockTimeout::kNotAllowed; + // Default is we should acquire ticket. + AcquireTicket acquireTicket{AcquireTicket::kNoSkip}; + + if (opCtx->writesAreReplicated()) { + // Primaries should respect the transaction lock timeout, since it can prevent + // the transaction from making progress. + maxLockTimeout = MaxLockTimeout::kAllowed; + // Prepared transactions should not acquire ticket. Else, it can deadlock with other + // non-transactional operations that have exhausted the write tickets and are blocked on + // them due to prepare or lock conflict. + if (o().txnState.isPrepared()) { + acquireTicket = AcquireTicket::kSkip; + } + } else { + // Max lock timeout must not be set on secondaries, since secondary oplog application + // cannot fail. + maxLockTimeout = MaxLockTimeout::kNotAllowed; + } - _releaseTransactionResourcesToOpCtx(opCtx, maxLockTimeout); + _releaseTransactionResourcesToOpCtx(opCtx, maxLockTimeout, acquireTicket); stdx::lock_guard<Client> lg(*opCtx->getClient()); o(lg).transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx), opCtx->getServiceContext()->getTickSource()); @@ -1013,7 +1032,7 @@ void TransactionParticipant::Participant::refreshLocksForPreparedTransaction( // Lock and Ticket reacquisition of a prepared transaction should not fail for // state transitions (step up/step down). - _releaseTransactionResourcesToOpCtx(opCtx, MaxLockTimeout::kNotAllowed); + _releaseTransactionResourcesToOpCtx(opCtx, MaxLockTimeout::kNotAllowed, AcquireTicket::kNoSkip); // Snapshot transactions don't conflict with PBWM lock on both primary and secondary. invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication()); diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 0478045dc11..5f104936b50 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -359,6 +359,8 @@ public: public: // Indicates whether the future lock requests should have timeouts. enum class MaxLockTimeout { kNotAllowed, kAllowed }; + // Indicates whether we should opt out of the ticket mechanism. + enum class AcquireTicket { kNoSkip, kSkip }; explicit Participant(OperationContext* opCtx); explicit Participant(const SessionToKill& session); @@ -771,14 +773,28 @@ public: * - MaxLockTimeout::kAllowed will set the timeout as * MaxTransactionLockRequestTimeoutMillis. * - * ------------------------------------------------------------------ - * | | PRIMARY | SECONDARY | STATE TRANSITION | - * |----------------|------------|---------------|------------------| - * |maxLockTimeout | kAllowed | kNotAllowed | kNotAllowed | - * ------------------------------------------------------------------ + * acquireTicket will determine we should acquire ticket on unstashing the transaction + * resources. + * - AcquireTicket::kSkip will not acquire ticket. + * - AcquireTicket::kNoSkip will retain the default behavior which is to acquire ticket. + * + * Below is the expected behavior. + * ---------------------------------------------------------------------------- + * | | | | | + * | | PRIMARY | SECONDARY | STATE TRANSITION | + * | | | | | + * |----------------|----------------------|---------------|------------------| + * | |Unprepared | Prepared | | | + * | | Txn | Txn | | | + * | |----------------------| | | + * |acquireTicket | kNoSkip | kSkip | kNoSkip | kNoSkip | + * |----------------|----------------------|---------------|------------------| + * |maxLockTimeout | kAllowed | kNotAllowed | kNotAllowed | + * ---------------------------------------------------------------------------- */ void _releaseTransactionResourcesToOpCtx(OperationContext* opCtx, - MaxLockTimeout maxLockTimeout); + MaxLockTimeout maxLockTimeout, + AcquireTicket acquireTicket); TransactionParticipant::PrivateState& p() { return _tp->_p; |