diff options
author | Josef Ahmad <josef.ahmad@mongodb.com> | 2021-11-17 15:56:47 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-12-10 17:09:27 +0000 |
commit | 02d950aee92600ec4613912dc1c84118e8c961c0 (patch) | |
tree | b4e06c513027cc4ca7d90684f48013058c83774e | |
parent | 80b2c2a100cb4925f02a603333cec26803187536 (diff) | |
download | mongo-02d950aee92600ec4613912dc1c84118e8c961c0.tar.gz |
SERVER-60682 Exempt transaction coordinators and journal flusher from acquiring storage tickets
Co-authored-by: Max Hirschhorn max.hirschhorn@mongodb.com
(cherry picked from commit 1bdff76322b144ef27060fe79324fe3cce4bb17a)
4 files changed, 123 insertions, 0 deletions
diff --git a/etc/backports_required_for_multiversion_tests.yml b/etc/backports_required_for_multiversion_tests.yml index 7ab910681a8..5c73cacb254 100644 --- a/etc/backports_required_for_multiversion_tests.yml +++ b/etc/backports_required_for_multiversion_tests.yml @@ -181,6 +181,8 @@ all: test_file: jstests/replsets/replSetGetStatus_member_wall_times.js - ticket: SERVER-53335 test_file: jstests/sharding/collation_shard_targeting_hashed_shard_key.js + - ticket: SERVER-60682 + test_file: jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js suites: diff --git a/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js b/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js new file mode 100644 index 00000000000..5263d14b85a --- /dev/null +++ b/jstests/sharding/coordinate_txn_commit_with_tickets_exhausted.js @@ -0,0 +1,109 @@ +/** + * Validate SERVER-60682: TransactionCoordinator won't starve for a storage ticket to + * persist its decision. + * + * @tags: [ + * requires_find_command, + * uses_transactions, + * uses_multi_shard_transaction, + * uses_prepare_transaction, + * ] + */ + +(function() { +"use strict"; + +load("jstests/libs/fail_point_util.js"); +load('jstests/libs/parallelTester.js'); +load("jstests/sharding/libs/create_sharded_collection_util.js"); + +const kNumWriteTickets = 10; +const st = new ShardingTest({ + mongos: 1, + config: 1, + shards: 2, + rs: {nodes: 1}, + rsOptions: { + setParameter: { + wiredTigerConcurrentWriteTransactions: kNumWriteTickets, + // Raise maxTransactionLockRequestTimeoutMillis to prevent the transactions in prepare + // conflict state from aborting early due to being unable to acquire a write ticket. + // This is needed because we want to reproduce a scenario where the number of + // transactions in prepare conflict state is greater or equal to the available storage + // tickets. + maxTransactionLockRequestTimeoutMillis: 24 * 60 * 60 * 1000, + // Similarly, we need to keep transactions alive longer than the Evergreen test + // execution timeout so as to be able to detect failure. + // While the test environment may already set a large enough default + // transactionLifetimeLimitSeconds, we nevertheless specify the lifetime to avoid + // relying on a potentially changing default. + transactionLifetimeLimitSeconds: 24 * 60 * 60, + } + } +}); + +const sourceCollection = st.s.getCollection("test.mycoll"); +CreateShardedCollectionUtil.shardCollectionWithChunks(sourceCollection, {key: 1}, [ + {min: {key: MinKey}, max: {key: 0}, shard: st.shard0.shardName}, + {min: {key: 0}, max: {key: MaxKey}, shard: st.shard1.shardName}, +]); + +// Insert a document into each shard. +assert.commandWorked(sourceCollection.insert([{key: 200}, {key: -200}])); + +// Create a thread which leaves the TransactionCoordinator in a state where prepareTransaction has +// been run on both participant shards and it is about to write the commit decision locally to the +// config.transaction_coordinators collection. +const preparedTxnThread = new Thread(function runTwoPhaseCommitTxn(host, dbName, collName) { + const conn = new Mongo(host); + const session = conn.startSession({causalConsistency: false}); + const sessionCollection = session.getDatabase(dbName).getCollection(collName); + + session.startTransaction(); + assert.commandWorked(sessionCollection.update({key: 200}, {$inc: {counter: 1}})); + assert.commandWorked(sessionCollection.update({key: -200}, {$inc: {counter: 1}})); + assert.commandWorked(session.commitTransaction_forTesting()); +}, st.s.host, sourceCollection.getDB().getName(), sourceCollection.getName()); + +const txnCoordinator = st.rs1.getPrimary(); +const hangBeforeWritingDecisionFp = configureFailPoint(txnCoordinator, "hangBeforeWritingDecision"); + +preparedTxnThread.start(); +hangBeforeWritingDecisionFp.wait(); + +// Create other threads which will block on a prepare conflict while still holding a write ticket to +// test that the TransactionCoordinator from preparedTxnThread can still complete. +const prepareConflictThreads = []; +for (let i = 0; i < kNumWriteTickets; ++i) { + const thread = new Thread(function hitPrepareConflictOnCoordinator(host, dbName, collName) { + const conn = new Mongo(host); + const session = conn.startSession({causalConsistency: false}); + const sessionCollection = session.getDatabase(dbName).getCollection(collName); + + session.startTransaction(); + // Do a write to ensure the transaction takes a write ticket. + assert.commandWorked(sessionCollection.insert({key: 300})); + // Then do a read which will block until the prepare conflict resolves. + assert.eq({key: 200, counter: 1}, sessionCollection.findOne({key: 200}, {_id: 0})); + assert.commandWorked(session.commitTransaction_forTesting()); + }, st.s.host, sourceCollection.getDB().getName(), sourceCollection.getName()); + prepareConflictThreads.push(thread); + thread.start(); +} + +const currentOp = (pipeline = []) => st.admin.aggregate([{$currentOp: {}}, ...pipeline]).toArray(); + +assert.soon(() => { + const ops = currentOp([{$match: {prepareReadConflicts: {$gt: 0}}}]); + return ops.length >= Math.min(prepareConflictThreads.length, kNumWriteTickets); +}, () => `Failed to find prepare conflicts in $currentOp output: ${tojson(currentOp())}`); + +hangBeforeWritingDecisionFp.off(); + +preparedTxnThread.join(); +for (let thread of prepareConflictThreads) { + thread.join(); +} + +st.stop(); +})(); diff --git a/src/mongo/db/repl/replication_consistency_markers_impl.cpp b/src/mongo/db/repl/replication_consistency_markers_impl.cpp index df8c3ebdd6d..e7151875651 100644 --- a/src/mongo/db/repl/replication_consistency_markers_impl.cpp +++ b/src/mongo/db/repl/replication_consistency_markers_impl.cpp @@ -36,6 +36,7 @@ #include "mongo/db/catalog/collection_options.h" #include "mongo/db/catalog_raii.h" #include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/repl/optime.h" #include "mongo/db/repl/replication_coordinator.h" @@ -498,6 +499,12 @@ ReplicationConsistencyMarkersImpl::refreshOplogTruncateAfterPointIfPrimary( } ON_BLOCK_EXIT([&] { opCtx->recoveryUnit()->setPrepareConflictBehavior(originalBehavior); }); + // Exempt storage ticket acquisition in order to avoid starving upstream requests waiting + // for durability. SERVER-60682 is an example with more pending prepared transactions than + // storage tickets; the transaction coordinator could not persist the decision and + // had to unnecessarily wait for prepared transactions to expire to make forward progress. + SkipTicketAcquisitionForLock skipTicketAcquisition(opCtx); + // The locks necessary to write to the oplog truncate after point's collection and read from the // oplog collection must be taken up front so that the mutex can also be taken around both // operations without causing deadlocks. diff --git a/src/mongo/db/s/transaction_coordinator_util.cpp b/src/mongo/db/s/transaction_coordinator_util.cpp index 5253f1c17e2..731654258d6 100644 --- a/src/mongo/db/s/transaction_coordinator_util.cpp +++ b/src/mongo/db/s/transaction_coordinator_util.cpp @@ -36,6 +36,7 @@ #include "mongo/client/remote_command_retry_scheduler.h" #include "mongo/db/commands/txn_cmds_gen.h" #include "mongo/db/commands/txn_two_phase_commit_cmds_gen.h" +#include "mongo/db/concurrency/lock_state.h" #include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/ops/write_ops.h" @@ -398,6 +399,10 @@ Future<repl::OpTime> persistDecision(txn::AsyncWorkScheduler& scheduler, return scheduler.scheduleWork( [lsid, txnNumber, participants, decision](OperationContext* opCtx) { FlowControl::Bypass flowControlBypass(opCtx); + // Do not acquire a storage ticket in order to avoid unnecessary serialization + // with other prepared transactions that are holding a storage ticket + // themselves; see SERVER-60682. + SkipTicketAcquisitionForLock skipTicketAcquisition(opCtx); getTransactionCoordinatorWorkerCurOpRepository()->set( opCtx, lsid, txnNumber, CoordinatorAction::kWritingDecision); return persistDecisionBlocking(opCtx, lsid, txnNumber, participants, decision); |