summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSuganthi Mani <suganthi.mani@mongodb.com>2019-07-12 16:11:52 -0400
committerSuganthi Mani <suganthi.mani@mongodb.com>2019-07-21 22:40:51 -0400
commita5d4ab967af9cbba17e6aa5afadca35927bd74c1 (patch)
treeeab6ec805312a30b0a7e0d0392f40a8d0c421b1a
parent0fddb3efeb0677b6be81fa3615a55d1f2888ed38 (diff)
downloadmongo-a5d4ab967af9cbba17e6aa5afadca35927bd74c1.tar.gz
SERVER-41980 Prepared transactions should not acquire ticket on primary.
-rw-r--r--buildscripts/resmokeconfig/suites/replica_sets_auth.yml1
-rw-r--r--jstests/replsets/transactions_committed_with_tickets_exhausted.js96
-rw-r--r--src/mongo/db/concurrency/d_concurrency_test.cpp2
-rw-r--r--src/mongo/db/concurrency/locker.h12
-rw-r--r--src/mongo/db/ftdc/collector.cpp2
-rw-r--r--src/mongo/db/transaction_participant.cpp35
-rw-r--r--src/mongo/db/transaction_participant.h28
7 files changed, 155 insertions, 21 deletions
diff --git a/buildscripts/resmokeconfig/suites/replica_sets_auth.yml b/buildscripts/resmokeconfig/suites/replica_sets_auth.yml
index 3306d5347ae..03f48dd72b5 100644
--- a/buildscripts/resmokeconfig/suites/replica_sets_auth.yml
+++ b/buildscripts/resmokeconfig/suites/replica_sets_auth.yml
@@ -14,6 +14,7 @@ selector:
# Also skip tests that require a ScopedThread, because ScopedThreads don't inherit credentials.
- jstests/replsets/interrupted_batch_insert.js
- jstests/replsets/transactions_reaped_with_tickets_exhausted.js
+ - jstests/replsets/transactions_committed_with_tickets_exhausted.js
executor:
config:
diff --git a/jstests/replsets/transactions_committed_with_tickets_exhausted.js b/jstests/replsets/transactions_committed_with_tickets_exhausted.js
new file mode 100644
index 00000000000..9fe978a38d6
--- /dev/null
+++ b/jstests/replsets/transactions_committed_with_tickets_exhausted.js
@@ -0,0 +1,96 @@
+/**
+ * Test ensures that exhausting the number of write tickets in the system does not prevent
+ * transactions from being committed.
+ *
+ * @tags: [uses_transactions, uses_prepare_transaction]
+ */
+(function() {
+ "use strict";
+
+ load("jstests/libs/parallelTester.js"); // for ScopedThread
+ load("jstests/core/txns/libs/prepare_helpers.js");
+
+ // We set the number of write tickets to be a small value in order to avoid needing to spawn a
+ // large number of threads to exhaust all of the available ones.
+ const kNumWriteTickets = 5;
+
+ const rst = new ReplSetTest({
+ nodes: 1,
+ nodeOptions: {
+ setParameter: {
+ wiredTigerConcurrentWriteTransactions: kNumWriteTickets,
+
+ // Setting a transaction lifetime of 20 seconds works fine locally because the
+ // threads which attempt to run the drop command are spawned quickly enough. This
+ // might not be the case for Evergreen hosts and may need to be tuned accordingly.
+ transactionLifetimeLimitSeconds: 20,
+ }
+ }
+ });
+ rst.startSet();
+ rst.initiate();
+
+ const primary = rst.getPrimary();
+ const db = primary.getDB("test");
+
+ const session = primary.startSession({causalConsistency: false});
+ const sessionDb = session.getDatabase("test");
+
+ assert.commandWorked(db.runCommand({create: "mycoll"}));
+
+ jsTestLog("Starting transaction");
+ session.startTransaction();
+ assert.commandWorked(sessionDb.mycoll.insert({}));
+
+ jsTestLog("Preparing transaction");
+ let prepareTimestamp = PrepareHelpers.prepareTransaction(session);
+
+ const threads = [];
+
+ for (let i = 0; i < kNumWriteTickets; ++i) {
+ const thread = new ScopedThread(function(host) {
+ try {
+ const conn = new Mongo(host);
+ const db = conn.getDB("test");
+
+ // Dropping a collection requires a database X lock and therefore blocks behind the
+ // transaction committing or aborting.
+ db.mycoll.drop();
+
+ return {ok: 1};
+ } catch (e) {
+ return {ok: 0, error: e.toString(), stack: e.stack};
+ }
+ }, primary.host);
+
+ threads.push(thread);
+ thread.start();
+ }
+
+ // We wait until all of the drop commands are waiting for a lock to know that we've exhausted
+ // all of the available write tickets.
+ assert.soon(
+ () => {
+ const ops = db.currentOp({"command.drop": "mycoll", waitingForLock: true});
+ return ops.inprog.length === kNumWriteTickets;
+ },
+ () => {
+ return `Didn't find ${kNumWriteTickets} drop commands running: ` +
+ tojson(db.currentOp());
+ });
+
+ // Should be able to successfully commit the transaction with the write tickets exhausted.
+ jsTestLog("Committing transaction");
+ assert.commandWorked(PrepareHelpers.commitTransaction(session, prepareTimestamp));
+
+ jsTestLog("Waiting for drop command to join");
+ for (let thread of threads) {
+ thread.join();
+ }
+
+ for (let thread of threads) {
+ assert.commandWorked(thread.returnData());
+ }
+
+ rst.stopSet();
+})();
diff --git a/src/mongo/db/concurrency/d_concurrency_test.cpp b/src/mongo/db/concurrency/d_concurrency_test.cpp
index cb310b55eb9..1a0b49a9e82 100644
--- a/src/mongo/db/concurrency/d_concurrency_test.cpp
+++ b/src/mongo/db/concurrency/d_concurrency_test.cpp
@@ -1476,7 +1476,7 @@ TEST_F(DConcurrencyTestFixture, NoThrottlingWhenNotAcquiringTickets) {
UseGlobalThrottling throttle(opctx1, 1);
// Prevent the enforcement of ticket throttling.
- opctx1->lockState()->setShouldAcquireTicket(false);
+ opctx1->lockState()->skipAcquireTicket();
// Both locks should be acquired immediately because there is no throttling.
Lock::GlobalRead R1(opctx1, Date_t::now(), Lock::InterruptBehavior::kThrow);
diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h
index 47996f9b62a..746b65fa40d 100644
--- a/src/mongo/db/concurrency/locker.h
+++ b/src/mongo/db/concurrency/locker.h
@@ -499,13 +499,15 @@ public:
}
/**
- * If set to false, this opts out of the ticket mechanism. This should be used sparingly
- * for special purpose threads, such as FTDC.
+ * This will opt out of the ticket mechanism. This should be used sparingly for special purpose
+ * threads, such as FTDC and committing or aborting prepared transactions.
*/
- void setShouldAcquireTicket(bool newValue) {
- invariant(!isLocked() || isNoop());
- _shouldAcquireTicket = newValue;
+ void skipAcquireTicket() {
+ // Should not hold or wait for the ticket.
+ invariant(isNoop() || getClientState() == Locker::ClientState::kInactive);
+ _shouldAcquireTicket = false;
}
+
bool shouldAcquireTicket() const {
return _shouldAcquireTicket;
}
diff --git a/src/mongo/db/ftdc/collector.cpp b/src/mongo/db/ftdc/collector.cpp
index 2462ee34690..37dd68b136e 100644
--- a/src/mongo/db/ftdc/collector.cpp
+++ b/src/mongo/db/ftdc/collector.cpp
@@ -68,7 +68,7 @@ std::tuple<BSONObj, Date_t> FTDCCollectorCollection::collect(Client* client) {
// batches that are taking a long time.
auto opCtx = client->makeOperationContext();
ShouldNotConflictWithSecondaryBatchApplicationBlock shouldNotConflictBlock(opCtx->lockState());
- opCtx->lockState()->setShouldAcquireTicket(false);
+ opCtx->lockState()->skipAcquireTicket();
// Explicitly start future read transactions without a timestamp.
opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 54544c4d6e6..c9b54f19cb1 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -869,7 +869,7 @@ void TransactionParticipant::Participant::resetRetryableWriteState(OperationCont
}
void TransactionParticipant::Participant::_releaseTransactionResourcesToOpCtx(
- OperationContext* opCtx, MaxLockTimeout maxLockTimeout) {
+ OperationContext* opCtx, MaxLockTimeout maxLockTimeout, AcquireTicket acquireTicket) {
// Transaction resources already exist for this transaction. Transfer them from the
// stash to the operation context.
//
@@ -910,8 +910,14 @@ void TransactionParticipant::Participant::_releaseTransactionResourcesToOpCtx(
}
}
+ if (acquireTicket == AcquireTicket::kSkip) {
+ stashLocker->skipAcquireTicket();
+ }
+
tempTxnResourceStash->release(opCtx);
releaseOnError.dismiss();
+
+ invariant(opCtx->lockState()->shouldAcquireTicket() || o().txnState.isPrepared());
}
void TransactionParticipant::Participant::unstashTransactionResources(OperationContext* opCtx,
@@ -928,13 +934,26 @@ void TransactionParticipant::Participant::unstashTransactionResources(OperationC
_checkIsCommandValidWithTxnState(*opCtx->getTxnNumber(), cmdName);
if (o().txnResourceStash) {
MaxLockTimeout maxLockTimeout;
- // Max lock timeout must not be set on secondaries, since secondary oplog application cannot
- // fail. And, primaries should respect the transaction lock timeout, since it can prevent
- // the transaction from making progress.
- maxLockTimeout =
- opCtx->writesAreReplicated() ? MaxLockTimeout::kAllowed : MaxLockTimeout::kNotAllowed;
+ // Default is we should acquire ticket.
+ AcquireTicket acquireTicket{AcquireTicket::kNoSkip};
+
+ if (opCtx->writesAreReplicated()) {
+ // Primaries should respect the transaction lock timeout, since it can prevent
+ // the transaction from making progress.
+ maxLockTimeout = MaxLockTimeout::kAllowed;
+ // Prepared transactions should not acquire ticket. Else, it can deadlock with other
+ // non-transactional operations that have exhausted the write tickets and are blocked on
+ // them due to prepare or lock conflict.
+ if (o().txnState.isPrepared()) {
+ acquireTicket = AcquireTicket::kSkip;
+ }
+ } else {
+ // Max lock timeout must not be set on secondaries, since secondary oplog application
+ // cannot fail.
+ maxLockTimeout = MaxLockTimeout::kNotAllowed;
+ }
- _releaseTransactionResourcesToOpCtx(opCtx, maxLockTimeout);
+ _releaseTransactionResourcesToOpCtx(opCtx, maxLockTimeout, acquireTicket);
stdx::lock_guard<Client> lg(*opCtx->getClient());
o(lg).transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx),
opCtx->getServiceContext()->getTickSource());
@@ -1013,7 +1032,7 @@ void TransactionParticipant::Participant::refreshLocksForPreparedTransaction(
// Lock and Ticket reacquisition of a prepared transaction should not fail for
// state transitions (step up/step down).
- _releaseTransactionResourcesToOpCtx(opCtx, MaxLockTimeout::kNotAllowed);
+ _releaseTransactionResourcesToOpCtx(opCtx, MaxLockTimeout::kNotAllowed, AcquireTicket::kNoSkip);
// Snapshot transactions don't conflict with PBWM lock on both primary and secondary.
invariant(!opCtx->lockState()->shouldConflictWithSecondaryBatchApplication());
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 0478045dc11..5f104936b50 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -359,6 +359,8 @@ public:
public:
// Indicates whether the future lock requests should have timeouts.
enum class MaxLockTimeout { kNotAllowed, kAllowed };
+ // Indicates whether we should opt out of the ticket mechanism.
+ enum class AcquireTicket { kNoSkip, kSkip };
explicit Participant(OperationContext* opCtx);
explicit Participant(const SessionToKill& session);
@@ -771,14 +773,28 @@ public:
* - MaxLockTimeout::kAllowed will set the timeout as
* MaxTransactionLockRequestTimeoutMillis.
*
- * ------------------------------------------------------------------
- * | | PRIMARY | SECONDARY | STATE TRANSITION |
- * |----------------|------------|---------------|------------------|
- * |maxLockTimeout | kAllowed | kNotAllowed | kNotAllowed |
- * ------------------------------------------------------------------
+ * acquireTicket will determine we should acquire ticket on unstashing the transaction
+ * resources.
+ * - AcquireTicket::kSkip will not acquire ticket.
+ * - AcquireTicket::kNoSkip will retain the default behavior which is to acquire ticket.
+ *
+ * Below is the expected behavior.
+ * ----------------------------------------------------------------------------
+ * | | | | |
+ * | | PRIMARY | SECONDARY | STATE TRANSITION |
+ * | | | | |
+ * |----------------|----------------------|---------------|------------------|
+ * | |Unprepared | Prepared | | |
+ * | | Txn | Txn | | |
+ * | |----------------------| | |
+ * |acquireTicket | kNoSkip | kSkip | kNoSkip | kNoSkip |
+ * |----------------|----------------------|---------------|------------------|
+ * |maxLockTimeout | kAllowed | kNotAllowed | kNotAllowed |
+ * ----------------------------------------------------------------------------
*/
void _releaseTransactionResourcesToOpCtx(OperationContext* opCtx,
- MaxLockTimeout maxLockTimeout);
+ MaxLockTimeout maxLockTimeout,
+ AcquireTicket acquireTicket);
TransactionParticipant::PrivateState& p() {
return _tp->_p;