summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJosef Ahmad <josef.ahmad@mongodb.com>2021-11-17 15:56:47 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-12-07 08:11:38 +0000
commit79599d1ea413cfc331d8b48ac617dec08bdcba0f (patch)
treed9de902119072a5501cca7da3791b8495a8b9e44
parenta1607ca021bcdd91f525e210a541741dfb9e337a (diff)
downloadmongo-79599d1ea413cfc331d8b48ac617dec08bdcba0f.tar.gz
SERVER-60682 Exempt transaction coordinators and journal flusher from acquiring storage tickets
Co-authored-by: Max Hirschhorn max.hirschhorn@mongodb.com (cherry picked from commit 1bdff76322b144ef27060fe79324fe3cce4bb17a)
-rw-r--r--etc/backports_required_for_multiversion_tests.yml4
-rw-r--r--jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js109
-rw-r--r--src/mongo/db/repl/replication_consistency_markers_impl.cpp7
-rw-r--r--src/mongo/db/s/transaction_coordinator_util.cpp5
4 files changed, 125 insertions, 0 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml
index a96d0b79e2a..79c033c8b69 100644
--- a/etc/backports_required_for_multiversion_tests.yml
+++ b/etc/backports_required_for_multiversion_tests.yml
@@ -70,6 +70,8 @@ last-continuous:
test_file: jstests/replsets/sync_source_selection_ignores_minvalid_after_rollback.js
- ticket: SERVER-59721
test_file: jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js
+ - ticket: SERVER-60682
+ test_file: jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
@@ -292,6 +294,8 @@ last-lts:
test_file: jstests/replsets/sync_source_selection_ignores_minvalid_after_rollback.js
- ticket: SERVER-59721
test_file: jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js
+ - ticket: SERVER-60682
+ test_file: jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js
# Tests that should only be excluded from particular suites should be listed under that suite.
suites:
diff --git a/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js b/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js
new file mode 100644
index 00000000000..5263d14b85a
--- /dev/null
+++ b/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js
@@ -0,0 +1,109 @@
+/**
+ * Validate SERVER-60682: TransactionCoordinator won't starve for a storage ticket to
+ * persist its decision.
+ *
+ * @tags: [
+ * requires_find_command,
+ * uses_transactions,
+ * uses_multi_shard_transaction,
+ * uses_prepare_transaction,
+ * ]
+ */
+
+(function() {
+"use strict";
+
+load("jstests/libs/fail_point_util.js");
+load('jstests/libs/parallelTester.js');
+load("jstests/sharding/libs/create_sharded_collection_util.js");
+
+const kNumWriteTickets = 10;
+const st = new ShardingTest({
+ mongos: 1,
+ config: 1,
+ shards: 2,
+ rs: {nodes: 1},
+ rsOptions: {
+ setParameter: {
+ wiredTigerConcurrentWriteTransactions: kNumWriteTickets,
+ // Raise maxTransactionLockRequestTimeoutMillis to prevent the transactions in prepare
+ // conflict state from aborting early due to being unable to acquire a write ticket.
+ // This is needed because we want to reproduce a scenario where the number of
+ // transactions in prepare conflict state is greater or equal to the available storage
+ // tickets.
+ maxTransactionLockRequestTimeoutMillis: 24 * 60 * 60 * 1000,
+ // Similarly, we need to keep transactions alive longer than the Evergreen test
+ // execution timeout so as to be able to detect failure.
+ // While the test environment may already set a large enough default
+ // transactionLifetimeLimitSeconds, we nevertheless specify the lifetime to avoid
+ // relying on a potentially changing default.
+ transactionLifetimeLimitSeconds: 24 * 60 * 60,
+ }
+ }
+});
+
+const sourceCollection = st.s.getCollection("test.mycoll");
+CreateShardedCollectionUtil.shardCollectionWithChunks(sourceCollection, {key: 1}, [
+ {min: {key: MinKey}, max: {key: 0}, shard: st.shard0.shardName},
+ {min: {key: 0}, max: {key: MaxKey}, shard: st.shard1.shardName},
+]);
+
+// Insert a document into each shard.
+assert.commandWorked(sourceCollection.insert([{key: 200}, {key: -200}]));
+
+// Create a thread which leaves the TransactionCoordinator in a state where prepareTransaction has
+// been run on both participant shards and it is about to write the commit decision locally to the
+// config.transaction_coordinators collection.
+const preparedTxnThread = new Thread(function runTwoPhaseCommitTxn(host, dbName, collName) {
+ const conn = new Mongo(host);
+ const session = conn.startSession({causalConsistency: false});
+ const sessionCollection = session.getDatabase(dbName).getCollection(collName);
+
+ session.startTransaction();
+ assert.commandWorked(sessionCollection.update({key: 200}, {$inc: {counter: 1}}));
+ assert.commandWorked(sessionCollection.update({key: -200}, {$inc: {counter: 1}}));
+ assert.commandWorked(session.commitTransaction_forTesting());
+}, st.s.host, sourceCollection.getDB().getName(), sourceCollection.getName());
+
+const txnCoordinator = st.rs1.getPrimary();
+const hangBeforeWritingDecisionFp = configureFailPoint(txnCoordinator, "hangBeforeWritingDecision");
+
+preparedTxnThread.start();
+hangBeforeWritingDecisionFp.wait();
+
+// Create other threads which will block on a prepare conflict while still holding a write ticket to
+// test that the TransactionCoordinator from preparedTxnThread can still complete.
+const prepareConflictThreads = [];
+for (let i = 0; i < kNumWriteTickets; ++i) {
+ const thread = new Thread(function hitPrepareConflictOnCoordinator(host, dbName, collName) {
+ const conn = new Mongo(host);
+ const session = conn.startSession({causalConsistency: false});
+ const sessionCollection = session.getDatabase(dbName).getCollection(collName);
+
+ session.startTransaction();
+ // Do a write to ensure the transaction takes a write ticket.
+ assert.commandWorked(sessionCollection.insert({key: 300}));
+ // Then do a read which will block until the prepare conflict resolves.
+ assert.eq({key: 200, counter: 1}, sessionCollection.findOne({key: 200}, {_id: 0}));
+ assert.commandWorked(session.commitTransaction_forTesting());
+ }, st.s.host, sourceCollection.getDB().getName(), sourceCollection.getName());
+ prepareConflictThreads.push(thread);
+ thread.start();
+}
+
+const currentOp = (pipeline = []) => st.admin.aggregate([{$currentOp: {}}, ...pipeline]).toArray();
+
+assert.soon(() => {
+ const ops = currentOp([{$match: {prepareReadConflicts: {$gt: 0}}}]);
+ return ops.length >= Math.min(prepareConflictThreads.length, kNumWriteTickets);
+}, () => `Failed to find prepare conflicts in $currentOp output: ${tojson(currentOp())}`);
+
+hangBeforeWritingDecisionFp.off();
+
+preparedTxnThread.join();
+for (let thread of prepareConflictThreads) {
+ thread.join();
+}
+
+st.stop();
+})();
diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
index df8c3ebdd6d..e7151875651 100644
--- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp
+++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/catalog/collection_options.h"
#include "mongo/db/catalog_raii.h"
#include "mongo/db/concurrency/d_concurrency.h"
+#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/repl/replication_coordinator.h"
@@ -498,6 +499,12 @@ ReplicationConsistencyMarkersImpl::refreshOplogTruncateAfterPointIfPrimary(
}
ON_BLOCK_EXIT([&] { opCtx->recoveryUnit()->setPrepareConflictBehavior(originalBehavior); });
+ // Exempt storage ticket acquisition in order to avoid starving upstream requests waiting
+ // for durability. SERVER-60682 is an example with more pending prepared transactions than
+ // storage tickets; the transaction coordinator could not persist the decision and
+ // had to unnecessarily wait for prepared transactions to expire to make forward progress.
+ SkipTicketAcquisitionForLock skipTicketAcquisition(opCtx);
+
// The locks necessary to write to the oplog truncate after point's collection and read from the
// oplog collection must be taken up front so that the mutex can also be taken around both
// operations without causing deadlocks.
diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp
index 0bc0d019c6c..8fa377734ef 100644
--- a/src/mongo/db/s/transaction_coordinator_util.cpp
+++ b/src/mongo/db/s/transaction_coordinator_util.cpp
@@ -36,6 +36,7 @@
#include "mongo/client/remote_command_retry_scheduler.h"
#include "mongo/db/commands/txn_cmds_gen.h"
#include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h"
+#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/curop.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/ops/write_ops.h"
@@ -411,6 +412,10 @@ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler,
return scheduler.scheduleWork(
[lsid, txnNumber, participants, decision](OperationContext* opCtx) {
FlowControl::Bypass flowControlBypass(opCtx);
+ // Do not acquire a storage ticket in order to avoid unnecessary serialization
+ // with other prepared transactions that are holding a storage ticket
+ // themselves; see SERVER-60682.
+ SkipTicketAcquisitionForLock skipTicketAcquisition(opCtx);
getTransactionCoordinatorWorkerCurOpRepository()->set(
opCtx, lsid, txnNumber, CoordinatorAction::kWritingDecision);
return persistDecisionBlocking(opCtx, lsid, txnNumber, participants, decision);