diff options
author | Josef Ahmad <josef.ahmad@mongodb.com> | 2021-11-17 15:56:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-06 07:09:51 +0000 |
commit | 0f9e2dc19f68c8db49539bbaf6542c1bde9025a0 (patch) | |
tree | 26932443afeeeeb60a0d3dc5bef67537fd3b8623 | |
parent | 1c164f931a406b6f091a4ee41c6726b2a56a154d (diff) | |
download | mongo-0f9e2dc19f68c8db49539bbaf6542c1bde9025a0.tar.gz |
SERVER-60682 Exempt transaction coordinators and journal flusher from acquiring storage tickets
Co-authored-by: Max Hirschhorn max.hirschhorn@mongodb.com
(cherry picked from commit 1bdff76322b144ef27060fe79324fe3cce4bb17a)
4 files changed, 124 insertions, 0 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index b586681cd03..27e86c55998 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -112,6 +112,8 @@ last-continuous: test_file: jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js - ticket: SERVER-54909 test_file: jstests/replsets/replSetGetStatus_member_wall_times.js + - ticket: SERVER-60682 + test_file: jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: @@ -384,6 +386,8 @@ last-lts: test_file: jstests/sharding/resharding_secondary_recovers_temp_ns_metadata.js - ticket: SERVER-54909 test_file: jstests/replsets/replSetGetStatus_member_wall_times.js + - ticket: SERVER-60682 + test_file: jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js # Tests that should only be excluded from particular suites should be listed under that suite. suites: diff --git a/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js b/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js new file mode 100644 index 00000000000..ea474edf934 --- /dev/null +++ b/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js @@ -0,0 +1,108 @@ +/** + * Validate SERVER-60682: TransactionCoordinator won't starve for a storage ticket to + * persist its decision. + * + * @tags: [ + * uses_transactions, + * uses_multi_shard_transaction, + * uses_prepare_transaction, + * ] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load('jstests/libs/parallelTester.js'); +load("jstests/sharding/libs/create_sharded_collection_util.js"); + +const kNumWriteTickets = 10; +const st = new ShardingTest({ + mongos: 1, + config: 1, + shards: 2, + rs: {nodes: 1}, + rsOptions: { + setParameter: { + wiredTigerConcurrentWriteTransactions: kNumWriteTickets, + // Raise maxTransactionLockRequestTimeoutMillis to prevent the transactions in prepare + // conflict state from aborting early due to being unable to acquire a write ticket. + // This is needed because we want to reproduce a scenario where the number of + // transactions in prepare conflict state is greater or equal to the available storage + // tickets. + maxTransactionLockRequestTimeoutMillis: 24 * 60 * 60 * 1000, + // Similarly, we need to keep transactions alive longer than the Evergreen test + // execution timeout so as to be able to detect failure. + // While the test environment may already set a large enough default + // transactionLifetimeLimitSeconds, we nevertheless specify the lifetime to avoid + // relying on a potentially changing default. + transactionLifetimeLimitSeconds: 24 * 60 * 60, + } + } +}); + +const sourceCollection = st.s.getCollection("test.mycoll"); +CreateShardedCollectionUtil.shardCollectionWithChunks(sourceCollection, {key: 1}, [ + {min: {key: MinKey}, max: {key: 0}, shard: st.shard0.shardName}, + {min: {key: 0}, max: {key: MaxKey}, shard: st.shard1.shardName}, +]); + +// Insert a document into each shard. +assert.commandWorked(sourceCollection.insert([{key: 200}, {key: -200}])); + +// Create a thread which leaves the TransactionCoordinator in a state where prepareTransaction has +// been run on both participant shards and it is about to write the commit decision locally to the +// config.transaction_coordinators collection. +const preparedTxnThread = new Thread(function runTwoPhaseCommitTxn(host, dbName, collName) { + const conn = new Mongo(host); + const session = conn.startSession({causalConsistency: false}); + const sessionCollection = session.getDatabase(dbName).getCollection(collName); + + session.startTransaction(); + assert.commandWorked(sessionCollection.update({key: 200}, {$inc: {counter: 1}})); + assert.commandWorked(sessionCollection.update({key: -200}, {$inc: {counter: 1}})); + assert.commandWorked(session.commitTransaction_forTesting()); +}, st.s.host, sourceCollection.getDB().getName(), sourceCollection.getName()); + +const txnCoordinator = st.rs1.getPrimary(); +const hangBeforeWritingDecisionFp = configureFailPoint(txnCoordinator, "hangBeforeWritingDecision"); + +preparedTxnThread.start(); +hangBeforeWritingDecisionFp.wait(); + +// Create other threads which will block on a prepare conflict while still holding a write ticket to +// test that the TransactionCoordinator from preparedTxnThread can still complete. +const prepareConflictThreads = []; +for (let i = 0; i < kNumWriteTickets; ++i) { + const thread = new Thread(function hitPrepareConflictOnCoordinator(host, dbName, collName) { + const conn = new Mongo(host); + const session = conn.startSession({causalConsistency: false}); + const sessionCollection = session.getDatabase(dbName).getCollection(collName); + + session.startTransaction(); + // Do a write to ensure the transaction takes a write ticket. + assert.commandWorked(sessionCollection.insert({key: 300})); + // Then do a read which will block until the prepare conflict resolves. + assert.eq({key: 200, counter: 1}, sessionCollection.findOne({key: 200}, {_id: 0})); + assert.commandWorked(session.commitTransaction_forTesting()); + }, st.s.host, sourceCollection.getDB().getName(), sourceCollection.getName()); + prepareConflictThreads.push(thread); + thread.start(); +} + +const currentOp = (pipeline = []) => st.admin.aggregate([{$currentOp: {}}, ...pipeline]).toArray(); + +assert.soon(() => { + const ops = currentOp([{$match: {prepareReadConflicts: {$gt: 0}}}]); + return ops.length >= Math.min(prepareConflictThreads.length, kNumWriteTickets); +}, () => `Failed to find prepare conflicts in $currentOp output: ${tojson(currentOp())}`); + +hangBeforeWritingDecisionFp.off(); + +preparedTxnThread.join(); +for (let thread of prepareConflictThreads) { + thread.join(); +} + +st.stop(); +})(); diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index df8c3ebdd6d..e7151875651 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -36,6 +36,7 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" @@ -498,6 +499,12 @@ ReplicationConsistencyMarkersImpl::refreshOplogTruncateAfterPointIfPrimary( } ON_BLOCK_EXIT([&] { opCtx->recoveryUnit()->setPrepareConflictBehavior(originalBehavior); }); + // Exempt storage ticket acquisition in order to avoid starving upstream requests waiting + // for durability. SERVER-60682 is an example with more pending prepared transactions than + // storage tickets; the transaction coordinator could not persist the decision and + // had to unnecessarily wait for prepared transactions to expire to make forward progress. + SkipTicketAcquisitionForLock skipTicketAcquisition(opCtx); + // The locks necessary to write to the oplog truncate after point's collection and read from the // oplog collection must be taken up front so that the mutex can also be taken around both // operations without causing deadlocks. diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp index 9e07378295d..698b692f472 100644 --- a/src/mongo/db/s/transaction_coordinator_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_util.cpp @@ -36,6 +36,7 @@ #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" @@ -411,6 +412,10 @@ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler, return scheduler.scheduleWork( [lsid, txnNumber, participants, decision](OperationContext* opCtx) { FlowControl::Bypass flowControlBypass(opCtx); + // Do not acquire a storage ticket in order to avoid unnecessary serialization + // with other prepared transactions that are holding a storage ticket + // themselves; see SERVER-60682. + SkipTicketAcquisitionForLock skipTicketAcquisition(opCtx); getTransactionCoordinatorWorkerCurOpRepository()->set( opCtx, lsid, txnNumber, CoordinatorAction::kWritingDecision); return persistDecisionBlocking(opCtx, lsid, txnNumber, participants, decision); |