diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2022-04-28 18:40:14 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-04-29 17:21:25 +0000 |
commit | ca7108a1b09f50100c6909c2ca24869dbe1118e7 (patch) | |
tree | e38d3e0e133948182014632952fa937cb70c492a | |
parent | 6259f8d9fd125a6441336d79323c55dfa7680a76 (diff) | |
download | mongo-ca7108a1b09f50100c6909c2ca24869dbe1118e7.tar.gz |
SERVER-65640 Add concurrency workload that performs resharding while there are active internal transactions started using test command
(cherry picked from commit e4bb8a12180f0263c28b3da60eb7e7537faab202)
-rw-r--r-- | buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_balancer.yml | 6 | ||||
-rw-r--r-- | jstests/concurrency/fsm_workloads/internal_transactions_move_chunk.js | 1 | ||||
-rw-r--r-- | jstests/concurrency/fsm_workloads/internal_transactions_resharding.js | 166 | ||||
-rw-r--r-- | jstests/concurrency/fsm_workloads/internal_transactions_sharded.js | 1 | ||||
-rw-r--r-- | jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js | 35 | ||||
-rw-r--r-- | jstests/sharding/internal_txns/transaction_api_distributed_from_shard.js (renamed from jstests/sharding/transaction_api_distributed_from_shard.js) | 16 | ||||
-rw-r--r-- | src/mongo/s/commands/internal_transactions_test_command.h | 5 |
7 files changed, 209 insertions, 21 deletions
diff --git a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_balancer.yml b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_balancer.yml index c497eefe4ab..84d7c942484 100644 --- a/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_balancer.yml +++ b/buildscripts/resmokeconfig/suites/concurrency_sharded_multi_stmt_txn_with_balancer.yml @@ -130,6 +130,12 @@ selector: # for dbCheck. TODO (SERVER-63951): Remove this exclusion. - jstests/concurrency/fsm_workloads/create_collection_and_view.js + # This suite runs the RunDBCheckInBackground hook and the dbCheck command generates oplog entries + # but those oplog entries are not supported by resharding. + # TODO (SERVER-66011): Enable internal_transactions_resharding.js in the + # concurrency_sharded_multi_stmt_txn_with_balancer suite + - jstests/concurrency/fsm_workloads/internal_transactions_resharding.js + exclude_with_any_tags: - assumes_balancer_off diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_move_chunk.js b/jstests/concurrency/fsm_workloads/internal_transactions_move_chunk.js index 1e9c86a0fbe..a545cdf01f3 100644 --- a/jstests/concurrency/fsm_workloads/internal_transactions_move_chunk.js +++ b/jstests/concurrency/fsm_workloads/internal_transactions_move_chunk.js @@ -8,7 +8,6 @@ * @tags: [ * requires_fcv_60, * requires_sharding, - * requires_persistence, * uses_transactions * ] */ diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js b/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js new file mode 100644 index 00000000000..3c95ce81985 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js @@ -0,0 +1,166 @@ +'use strict'; + +/** + * Runs insert, update, delete and findAndModify commands against a sharded collection inside + * single-shard and cross-shard internal transactions using all client session configurations, and + * occasionally reshards the collection. Only runs on sharded clusters. + * + * @tags: [ + * requires_fcv_60, + * requires_sharding, + * uses_transactions + * ] + */ +load('jstests/concurrency/fsm_libs/extend_workload.js'); +load('jstests/concurrency/fsm_workloads/internal_transactions_sharded.js'); +load('jstests/libs/fail_point_util.js'); + +var $config = extendWorkload($config, function($config, $super) { + // reshardingMinimumOperationDurationMillis is set to 30 seconds when there are stepdowns. + // So in order to limit the overall time for the test, we limit the number of resharding + // operations to maxReshardingExecutions. + const maxReshardingExecutions = TestData.runningWithShardStepdowns ? 4 : $config.iterations; + + const customShardKeyFieldName = "customShardKey"; + $config.data.shardKeys = []; + $config.data.currentShardKeyIndex = -1; + $config.data.reshardingCount = 0; + + $config.data.getQueryForDocument = function getQueryForDocument(collection, doc) { + // The query for a write command against a sharded collection must contain the shard key. + const defaultShardKeyFieldName = this.shardKeyField[collection.getName()]; + return { + _id: doc._id, + tid: this.tid, + [defaultShardKeyFieldName]: doc[defaultShardKeyFieldName], + [customShardKeyFieldName]: doc[customShardKeyFieldName] + }; + }; + + $config.data.generateRandomDocument = function generateRandomDocument(collection) { + const defaultShardKeyFieldName = this.shardKeyField[collection.getName()]; + return { + _id: UUID(), + tid: this.tid, + [defaultShardKeyFieldName]: + this.generateRandomInt(this.partition.lower, this.partition.upper - 1), + [customShardKeyFieldName]: + this.generateRandomInt(this.partition.lower, this.partition.upper - 1), + counter: 0 + }; + }; + + $config.data.isAcceptableRetryError = function isAcceptableRetryError(res) { + // This workload does in-place resharding so a retry that is sent + // reshardingMinimumOperationDurationMillis after resharding completes is expected to fail + // with IncompleteTransactionHistory. + return (res.code == ErrorCodes.IncompleteTransactionHistory) && + res.errmsg.includes("Incomplete history detected for transaction"); + }; + + $config.states.init = function init(db, collName, connCache) { + $super.states.init.apply(this, arguments); + this.shardKeys.push({[this.defaultShardKeyField]: 1}); + this.shardKeys.push({[customShardKeyFieldName]: 1}); + this.currentShardKeyIndex = 0; + }; + + $config.states.reshardCollection = function reshardCollection(db, collName, connCache) { + const collection = db.getCollection(collName); + const ns = collection.getFullName(); + + if (this.tid === 0 && (this.reshardingCount <= maxReshardingExecutions)) { + const newShardKeyIndex = (this.currentShardKeyIndex + 1) % this.shardKeys.length; + const newShardKey = this.shardKeys[newShardKeyIndex]; + const reshardCollectionCmdObj = { + reshardCollection: ns, + key: newShardKey, + }; + + print(`Started resharding collection ${ns}: ${tojson({newShardKey})}`); + if (TestData.runningWithShardStepdowns) { + assert.commandWorkedOrFailedWithCode(db.adminCommand(reshardCollectionCmdObj), + [ErrorCodes.SnapshotUnavailable]); + } else { + assert.commandWorked(db.adminCommand(reshardCollectionCmdObj)); + } + print(`Finished resharding collection ${ns}: ${tojson({newShardKey})}`); + + // If resharding fails with SnapshotUnavailable, then this will be incorrect. But + // its fine since reshardCollection will succeed if the new shard key matches the + // existing one. + this.currentShardKeyIndex = newShardKeyIndex; + this.reshardingCount += 1; + + db.printShardingStatus(); + + connCache.mongos.forEach(mongos => { + if (this.generateRandomBool()) { + // Without explicitly refreshing mongoses, retries of retryable write statements + // would always be routed to the donor shards. Non-deterministically refreshing + // enables us to have test coverage for retrying against both the donor and + // recipient shards. + assert.commandWorked(mongos.adminCommand({flushRouterConfig: 1})); + } + }); + } + }; + + $config.transitions = { + init: { + internalTransactionForInsert: 0.25, + internalTransactionForUpdate: 0.25, + internalTransactionForDelete: 0.25, + internalTransactionForFindAndModify: 0.25, + }, + reshardCollection: { + internalTransactionForInsert: 0.2, + internalTransactionForUpdate: 0.2, + internalTransactionForDelete: 0.2, + internalTransactionForFindAndModify: 0.2, + verifyDocuments: 0.2 + }, + internalTransactionForInsert: { + reshardCollection: 0.2, + internalTransactionForInsert: 0.15, + internalTransactionForUpdate: 0.15, + internalTransactionForDelete: 0.15, + internalTransactionForFindAndModify: 0.15, + verifyDocuments: 0.2 + }, + internalTransactionForUpdate: { + reshardCollection: 0.2, + internalTransactionForInsert: 0.15, + internalTransactionForUpdate: 0.15, + internalTransactionForDelete: 0.15, + internalTransactionForFindAndModify: 0.15, + verifyDocuments: 0.2 + }, + internalTransactionForDelete: { + reshardCollection: 0.2, + internalTransactionForInsert: 0.15, + internalTransactionForUpdate: 0.15, + internalTransactionForDelete: 0.15, + internalTransactionForFindAndModify: 0.15, + verifyDocuments: 0.2 + }, + internalTransactionForFindAndModify: { + reshardCollection: 0.2, + internalTransactionForInsert: 0.15, + internalTransactionForUpdate: 0.15, + internalTransactionForDelete: 0.15, + internalTransactionForFindAndModify: 0.15, + verifyDocuments: 0.2 + }, + verifyDocuments: { + reshardCollection: 0.2, + internalTransactionForInsert: 0.15, + internalTransactionForUpdate: 0.15, + internalTransactionForDelete: 0.15, + internalTransactionForFindAndModify: 0.15, + verifyDocuments: 0.2 + } + }; + + return $config; +}); diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js index f6a749e9a63..856f7ecfccd 100644 --- a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js +++ b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js @@ -8,7 +8,6 @@ * @tags: [ * requires_fcv_60, * requires_sharding, - * requires_persistence, * uses_transactions * ] */ diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js b/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js index fb683653018..447d56a6bdb 100644 --- a/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js +++ b/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js @@ -10,7 +10,6 @@ * * @tags: [ * requires_fcv_60, - * requires_persistence, * uses_transactions, * assumes_unsharded_collection * ] @@ -200,11 +199,24 @@ var $config = extendWorkload($config, function($config, $super) { }; /** + * Returns true if 'res' contains an acceptable retry error for a retryable write command. + */ + $config.data.isAcceptableRetryError = function isAcceptableRetryError(res) { + // This workload does not involve data placement changes so retries should always succeed. + // Workloads that extend this workload should override this method accordingly. + return false; + }; + + /** * Runs the given the write command 'writeCmdObj' inside an internal transaction using the given * client 'executionCtxType'. */ $config.data.runInternalTransaction = function runInternalTransaction( db, collection, executionCtxType, writeCmdObj, checkResponseFunc, checkDocsFunc) { + // The testInternalTransactions command below runs with the session setting defined by + // 'executionCtxType'. + fsm.forceRunningOutsideTransaction(this); + if (executionCtxType == executionContextTypes.kClientRetryableWrite) { writeCmdObj.stmtId = NumberInt(1); } @@ -228,9 +240,23 @@ var $config = extendWorkload($config, function($config, $super) { tojsononeline(testInternalTxnCmdObj)}: ${tojsononeline({executionCtxType})}`); let runFunc = () => { - const res = assert.commandWorked(db.adminCommand(testInternalTxnCmdObj)); - print(`Response: ${tojsononeline(res)}`); - res.responses.forEach(response => assert.commandWorked(response)); + let res; + try { + res = db.adminCommand(testInternalTxnCmdObj); + print(`Response: ${tojsononeline(res)}`); + assert.commandWorked(res); + } catch (e) { + if ((executionCtxType == executionContextTypes.kClientRetryableWrite) && + this.isAcceptableRetryError(res)) { + print("Ignoring retry error for retryable write: " + tojsononeline(res)); + return; + } + throw e; + } + + res.responses.forEach(innerRes => { + assert.commandWorked(innerRes); + }); if (executionCtxType == executionContextTypes.kClientRetryableWrite) { // If the command was retried, 'responses' would only contain the response for // 'writeCmdObj'. @@ -238,7 +264,6 @@ var $config = extendWorkload($config, function($config, $super) { } else { assert.eq(res.responses.length, 2); } - const writeCmdRes = res.responses[0]; checkResponseFunc(writeCmdRes); if (res.responses.length == 2) { diff --git a/jstests/sharding/transaction_api_distributed_from_shard.js b/jstests/sharding/internal_txns/transaction_api_distributed_from_shard.js index 3c5a98a1784..4c33c5e4967 100644 --- a/jstests/sharding/transaction_api_distributed_from_shard.js +++ b/jstests/sharding/internal_txns/transaction_api_distributed_from_shard.js @@ -70,17 +70,11 @@ function runTestFailure() { // Insert initial data. assert.commandWorked(st.s.getCollection(kNs).insert([{_id: 1}])); - const res = assert.commandWorked(shard0Primary.adminCommand( - {testInternalTransactions: 1, commandInfos: commands, useClusterClient: true})); - // The clusterCount is rejected without being run, so expect one fewer response. - assert.eq(res.responses.length, commands.length - 1, tojson(res)); - - assert.commandWorked(res.responses[0], tojson(res)); - assert.eq(res.responses[0], {n: 2, ok: 1}, tojson(res)); - - assert.commandWorked(res.responses[1], tojson(res)); - assert.sameMembers( - res.responses[1].cursor.firstBatch, [{_id: 1}, {_id: 2}, {_id: 3}], tojson(res)); + const res = assert.commandFailedWithCode( + shard0Primary.adminCommand( + {testInternalTransactions: 1, commandInfos: commands, useClusterClient: true}), + 6349501); + assert(!res.hasOwnProperty("responses")); // Verify the API didn't insert any documents. assert.sameMembers(st.s.getCollection(kNs).find().toArray(), [{_id: 1}]); diff --git a/src/mongo/s/commands/internal_transactions_test_command.h b/src/mongo/s/commands/internal_transactions_test_command.h index b2501d64e31..c5bf8b61951 100644 --- a/src/mongo/s/commands/internal_transactions_test_command.h +++ b/src/mongo/s/commands/internal_transactions_test_command.h @@ -74,9 +74,7 @@ public: Base::request().kCommandName, Base::request().getUseClusterClient()); - // Swallow errors and let clients inspect the responses array to determine success / - // failure. - (void)txn.runNoThrow( + txn.run( opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, ExecutorPtr txnExec) { sharedBlock->responses.clear(); @@ -123,6 +121,7 @@ public: } return SemiFuture<void>::makeReady(); }); + return TestInternalTransactionsCommandReply(std::move(sharedBlock->responses)); }; |