diff options
4 files changed, 417 insertions, 4 deletions
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_base.js b/jstests/concurrency/fsm_workloads/random_moveChunk_base.js new file mode 100644 index 00000000000..a630d83093c --- /dev/null +++ b/jstests/concurrency/fsm_workloads/random_moveChunk_base.js @@ -0,0 +1,132 @@ +'use strict'; + +/** + * Shards a collection by 'skey' and creates one chunk per thread, filling each chunk with + * documents, and assigning each document to a random thread. Meant to be extended by workloads that + * test operations with concurrent moveChunks. Assumes each thread has an id from [0, threadCount). + * + * @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off]; + */ +load('jstests/concurrency/fsm_libs/extend_workload.js'); +load('jstests/concurrency/fsm_workloads/sharded_base_partitioned.js'); + +var $config = extendWorkload($config, function($config, $super) { + + $config.threadCount = 1; + $config.iterations = 1; + + $config.data.shardKey = {skey: 1}; + $config.data.shardKeyField = 'skey'; + + // Which skey and _id values are owned by this thread (they are equal by default), populated in + // init(). + $config.data.ownedIds = []; + + // Depending on the operations performed by each workload, it might be expected that a random + // moveChunk may fail with an error code other than those expected by the helper. + $config.data.isMoveChunkErrorAcceptable = (err) => false; + + /** + * Returns the _id of a random document owned by this thread. + */ + $config.data.getIdForThread = function getIdForThread() { + assertAlways.neq(0, this.ownedIds.size); + return this.ownedIds[Random.randInt(this.ownedIds.length)]; + }; + + /** + * Picks a random chunk and moves it to a random new shard. The migration is retried on + * acceptable errors, e.g. ConflictingOperationInProgress, and is not guaranteed to succeed. + */ + $config.states.moveChunk = function moveChunk(db, collName, connCache) { + // Choose a random chunk in our partition to move. + const chunk = this.getRandomChunkInPartition(ChunkHelper.getPrimary(connCache.config)); + const fromShard = chunk.shard; + + // Choose a random shard to move the chunk to. + const shardNames = Object.keys(connCache.shards); + const destinationShards = shardNames.filter(function(shard) { + if (shard !== fromShard) { + return shard; + } + }); + const toShard = destinationShards[Random.randInt(destinationShards.length)]; + + // Use chunk_helper.js's moveChunk wrapper to tolerate acceptable failures and to use a + // limited number of retries with exponential backoff. + const bounds = [ + {[this.shardKeyField]: chunk.min[this.shardKeyField]}, + {[this.shardKeyField]: chunk.max[this.shardKeyField]} + ]; + const waitForDelete = Random.rand() < 0.5; + try { + ChunkHelper.moveChunk(db, collName, bounds, toShard, waitForDelete); + } catch (e) { + // Failed moveChunks are thrown by the moveChunk helper with the response included as a + // JSON string in the error's message. + if (this.isMoveChunkErrorAcceptable(e)) { + print("Ignoring acceptable moveChunk error: " + tojson(e)); + return; + } + + throw e; + } + }; + + /** + * Loads this threads partition and the _ids of owned documents into memory. + */ + $config.states.init = function init(db, collName, connCache) { + // Load this thread's partition. + const ns = db[collName].getFullName(); + this.partition = this.makePartition(ns, this.tid, this.partitionSize); + + // Search the collection to find the _ids of docs assigned to this thread. + const docsOwnedByThread = db[collName].find({tid: this.tid}).toArray(); + assert.neq(0, docsOwnedByThread.size); + docsOwnedByThread.forEach(doc => { + this.ownedIds.push(doc._id); + }); + }; + + /** + * Sets up the collection so each thread's partition is a single chunk, with partitionSize + * documents within it, randomly assigning each document to a thread, ensuring at least one + * document is given to each one. + */ + $config.setup = function setup(db, collName, cluster) { + const ns = db[collName].getFullName(); + + for (let tid = 0; tid < this.threadCount; ++tid) { + // Find the thread's partition. + const partition = this.makePartition(ns, tid, this.partitionSize); + let bulk = db[collName].initializeUnorderedBulkOp(); + + let choseThisThread = false; + for (let i = partition.lower; i < partition.upper; ++i) { + // Randomly assign threads, but ensure each thread is given at least one document. + let chosenThread = Random.randInt(this.threadCount); + + choseThisThread = choseThisThread || chosenThread === tid; + if (i === partition.upper - 1 && !choseThisThread) { + chosenThread = tid; + } + + // Give each document the same shard key and _id value, but a different tid. + bulk.insert({_id: i, skey: i, tid: chosenThread}); + } + assertAlways.writeOK(bulk.execute()); + + // Create a chunk with boundaries matching the partition's. The low chunk's lower bound + // is minKey, so a split is not necessary. + if (!partition.isLowChunk) { + assertAlways.commandWorked( + db.adminCommand({split: ns, middle: {skey: partition.lower}})); + } + } + }; + + $config.transitions = {init: {moveChunk: 1}, moveChunk: {moveChunk: 1}}; + + return $config; +}); diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_delete_transaction.js b/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_delete_transaction.js new file mode 100644 index 00000000000..941118fd0bf --- /dev/null +++ b/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_delete_transaction.js @@ -0,0 +1,165 @@ +'use strict'; + +/** + * Performs deletes in transactions without the shard key while chunks are being moved. This + * includes multi=true deletes and multi=false deletes with exact _id queries. + * + * @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off, + * requires_non_retryable_writes, uses_transactions]; + */ +load('jstests/concurrency/fsm_libs/extend_workload.js'); +load('jstests/concurrency/fsm_workloads/random_moveChunk_base.js'); +load('jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js'); + +var $config = extendWorkload($config, function($config, $super) { + + $config.threadCount = 5; + $config.iterations = 50; + + // Number of documents per partition. Note that there is one chunk per partition and one + // partition per thread. + $config.data.partitionSize = 100; + + // In-memory representation of the documents owned by this thread. Used to verify the expected + // documents are deleted in the collection. + $config.data.expectedDocuments = []; + + // The number of "groups" each document within those assigned to a thread can belong to. Entire + // groups will be deleted at once by the multiDelete state function, so this is effectively the + // number of times that stage can be meaningfully run per thread. + $config.data.numGroupsWithinThread = $config.data.partitionSize / 5; + $config.data.nextGroupId = 0; + + // A moveChunk may fail with a WriteConflict when clearing orphans on the destination shard if + // any of them are concurrently written to by a broadcast transaction operation. The error + // message may be different depending on where the failure occurs. + // TODO SERVER-39141: Remove once the range deleter retries on write conflict exceptions. + $config.data.isMoveChunkErrorAcceptable = (err) => { + return err.message && (err.message.indexOf("WriteConflict") > -1 || + err.message.indexOf("CommandFailed") > -1); + }; + + /** + * Returns the next groupId for the multiDelete state function to use. + */ + $config.data.getNextGroupIdForDelete = function getNextGroupIdForDelete() { + const nextId = this.nextGroupId; + this.nextGroupId = (this.nextGroupId + 1) % this.numGroupsWithinThread; + return nextId; + }; + + /** + * Returns the _id of a random document owned by this thread to be deleted by an exact _id + * query. Should only be called if this thread hasn't deleted every document assigned to it. + */ + $config.data.getIdForDelete = function getIdForDelete() { + assertAlways.neq(0, this.expectedDocuments.length); + return this.expectedDocuments[Random.randInt(this.expectedDocuments.length)]; + }; + + /** + * Sends a multi=false delete with an exact match on _id for a random id, which should be sent + * to all shards. + */ + $config.states.exactIdDelete = function exactIdDelete(db, collName, connCache) { + // If no documents remain in our partition, there is nothing to do. + if (!this.expectedDocuments.length) { + print('This thread owns no more documents, skipping exactIdDelete stage'); + return; + } + + const idToDelete = this.getIdForDelete(); + + const collection = this.session.getDatabase(db.getName()).getCollection(collName); + withTxnAndAutoRetry(this.session, () => { + assertWhenOwnColl.writeOK(collection.remove({_id: idToDelete}, {multi: false})); + }); + + // Remove the deleted document from the in-memory representation. + this.expectedDocuments = this.expectedDocuments.filter(obj => { + return obj._id !== idToDelete; + }); + }; + + /** + * Sends a multi=true delete without the shard key that targets all documents assigned to this + * thread, which should be sent to all shards. + */ + $config.states.multiDelete = function multiDelete(db, collName, connCache) { + // If no documents remain in our partition, there is nothing to do. + if (!this.expectedDocuments.length) { + print('This thread owns no more documents, skipping multiDelete stage'); + return; + } + + // Delete a group of documents within those assigned to this thread. + const groupIdToDelete = this.getNextGroupIdForDelete(); + + const collection = this.session.getDatabase(db.getName()).getCollection(collName); + withTxnAndAutoRetry(this.session, () => { + assertWhenOwnColl.writeOK( + collection.remove({tid: this.tid, groupId: groupIdToDelete}, {multi: true})); + }); + + // Remove the deleted documents from the in-memory representation. + this.expectedDocuments = this.expectedDocuments.filter(obj => { + return obj.groupId !== groupIdToDelete; + }); + }; + + /** + * Asserts only the expected documents for this thread are present in the collection. + */ + $config.states.verifyDocuments = function verifyDocuments(db, collName, connCache) { + const docs = db[collName].find({tid: this.tid}).toArray(); + assertWhenOwnColl.eq(this.expectedDocuments.length, + docs.length, + 'unexpected number of documents, docs: ' + tojson(docs)); + + // Verify only the documents we haven't tried to delete were found. + const expectedDocIds = new Set(this.expectedDocuments.map(doc => doc._id)); + docs.forEach(doc => { + assertWhenOwnColl(expectedDocIds.has(doc._id), + 'expected document to be deleted, doc: ' + tojson(doc)); + expectedDocIds.delete(doc._id); + }); + + // All expected document ids should have been found in the collection. + assertWhenOwnColl.eq( + 0, + expectedDocIds.size, + 'did not find all expected documents, _ids not found: ' + tojson(expectedDocIds)); + }; + + /** + * Sets up the base workload, starts a session, gives each document assigned to this thread a + * group id for multi=true deletes, and loads each document into memory. + */ + $config.states.init = function init(db, collName, connCache) { + $super.states.init.apply(this, arguments); + + this.session = db.getMongo().startSession({causalConsistency: false}); + + // Assign each document owned by this thread to a different "group" so they can be multi + // deleted by group later. + let nextGroupId = 0; + db[collName].find({tid: this.tid}).forEach(doc => { + assert.writeOK(db[collName].update({_id: doc._id}, {$set: {groupId: nextGroupId}})); + nextGroupId = (nextGroupId + 1) % this.numGroupsWithinThread; + }); + + // Store the updated documents in-memory so the test can verify the expected ones are + // deleted. + this.expectedDocuments = db[collName].find({tid: this.tid}).toArray(); + }; + + $config.transitions = { + init: {moveChunk: 0.2, exactIdDelete: 0.4, multiDelete: 0.4}, + moveChunk: {moveChunk: 0.2, exactIdDelete: 0.3, multiDelete: 0.3, verifyDocuments: 0.2}, + exactIdDelete: {moveChunk: 0.2, exactIdDelete: 0.3, multiDelete: 0.3, verifyDocuments: 0.2}, + multiDelete: {moveChunk: 0.2, exactIdDelete: 0.3, multiDelete: 0.3, verifyDocuments: 0.2}, + verifyDocuments: {moveChunk: 0.2, exactIdDelete: 0.4, multiDelete: 0.4}, + }; + + return $config; +}); diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_update_transaction.js b/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_update_transaction.js new file mode 100644 index 00000000000..fc2757bd379 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_update_transaction.js @@ -0,0 +1,107 @@ +'use strict'; + +/** + * Performs updates in transactions without the shard key while chunks are being moved. This + * includes multi=true updates and multi=false updates with exact _id queries. + * + * @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off, + * requires_non_retryable_writes, uses_transactions]; + */ +load('jstests/concurrency/fsm_libs/extend_workload.js'); +load('jstests/concurrency/fsm_workloads/random_moveChunk_base.js'); +load('jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js'); + +var $config = extendWorkload($config, function($config, $super) { + + $config.threadCount = 5; + $config.iterations = 50; + + // Number of documents per partition. Note that there is one chunk per partition and one + // partition per thread. + $config.data.partitionSize = 100; + + // The counter values associated with each owned id. Used to verify updates aren't double + // applied. + $config.data.expectedCounters = {}; + + // A moveChunk may fail with a WriteConflict when clearing orphans on the destination shard if + // any of them are concurrently written to by a broadcast transaction operation. The error + // message may be different depending on where the failure occurs. + // TODO SERVER-39141: Remove once the range deleter retries on write conflict exceptions. + $config.data.isMoveChunkErrorAcceptable = (err) => { + return err.message && (err.message.indexOf("WriteConflict") > -1 || + err.message.indexOf("CommandFailed") > -1); + }; + + /** + * Sends a multi=false update with an exact match on _id for a random document assigned to this + * thread, which should be sent to all shards. + */ + $config.states.exactIdUpdate = function exactIdUpdate(db, collName, connCache) { + const idToUpdate = this.getIdForThread(); + + const collection = this.session.getDatabase(db.getName()).getCollection(collName); + withTxnAndAutoRetry(this.session, () => { + assertWhenOwnColl.writeOK( + collection.update({_id: idToUpdate}, {$inc: {counter: 1}}, {multi: false})); + }); + + // Update the expected counter for the targeted id. + this.expectedCounters[idToUpdate] += 1; + }; + + /** + * Sends a multi=true update without the shard key that targets all documents assigned to this + * thread, which should be sent to all shards. + */ + $config.states.multiUpdate = function multiUpdate(db, collName, connCache) { + const collection = this.session.getDatabase(db.getName()).getCollection(collName); + withTxnAndAutoRetry(this.session, () => { + assertWhenOwnColl.writeOK( + collection.update({tid: this.tid}, {$inc: {counter: 1}}, {multi: true})); + }); + + // The expected counter for every document owned by this thread should be incremented. + Object.keys(this.expectedCounters).forEach(id => { + this.expectedCounters[id] += 1; + }); + }; + + /** + * Asserts all documents assigned to this thread match their expected values. + */ + $config.states.verifyDocuments = function verifyDocuments(db, collName, connCache) { + const docs = db[collName].find({tid: this.tid}).toArray(); + docs.forEach(doc => { + const expectedCounter = this.expectedCounters[doc._id]; + assertWhenOwnColl.eq( + expectedCounter, doc.counter, 'unexpected counter value, doc: ' + tojson(doc)); + }); + }; + + /** + * Sets up the base workload, starts a session, and gives each document assigned to this thread + * a counter value that is tracked in-memory. + */ + $config.states.init = function init(db, collName, connCache) { + $super.states.init.apply(this, arguments); + + this.session = db.getMongo().startSession({causalConsistency: false}); + + // Assign a default counter value to each document owned by this thread. + db[collName].find({tid: this.tid}).forEach(doc => { + this.expectedCounters[doc._id] = 0; + assert.writeOK(db[collName].update({_id: doc._id}, {$set: {counter: 0}})); + }); + }; + + $config.transitions = { + init: {moveChunk: 0.2, exactIdUpdate: 0.4, multiUpdate: 0.4}, + moveChunk: {moveChunk: 0.2, exactIdUpdate: 0.3, multiUpdate: 0.3, verifyDocuments: 0.2}, + exactIdUpdate: {moveChunk: 0.2, exactIdUpdate: 0.3, multiUpdate: 0.3, verifyDocuments: 0.2}, + multiUpdate: {moveChunk: 0.2, exactIdUpdate: 0.3, multiUpdate: 0.3, verifyDocuments: 0.2}, + verifyDocuments: {moveChunk: 0.2, exactIdUpdate: 0.4, multiUpdate: 0.4}, + }; + + return $config; +}); diff --git a/jstests/concurrency/fsm_workloads/sharded_base_partitioned.js b/jstests/concurrency/fsm_workloads/sharded_base_partitioned.js index e778a3d64cb..018951ea053 100644 --- a/jstests/concurrency/fsm_workloads/sharded_base_partitioned.js +++ b/jstests/concurrency/fsm_workloads/sharded_base_partitioned.js @@ -31,6 +31,7 @@ var $config = (function() { // shard. The setup function creates documents with sequential numbering and gives // each shard its own numeric range to work with. shardKey: {_id: 1}, + shardKeyField: '_id', }; data.makePartition = function makePartition(ns, tid, partitionSize) { @@ -68,6 +69,8 @@ var $config = (function() { // We must split up these cases because MinKey and MaxKey are not fully comparable. // This may be due to SERVER-18341, where the Matcher returns false positives in // comparison predicates with MinKey/MaxKey. + const maxField = 'max.' + this.shardKeyField; + const minField = 'min.' + this.shardKeyField; if (this.partition.isLowChunk && this.partition.isHighChunk) { return coll .aggregate([ @@ -78,14 +81,20 @@ var $config = (function() { } else if (this.partition.isLowChunk) { return coll .aggregate([ - {$match: {ns: this.partition.ns, 'max._id': {$lte: this.partition.chunkUpper}}}, + { + $match: + {ns: this.partition.ns, [maxField]: {$lte: this.partition.chunkUpper}} + }, {$sample: {size: 1}} ]) .toArray()[0]; } else if (this.partition.isHighChunk) { return coll .aggregate([ - {$match: {ns: this.partition.ns, 'min._id': {$gte: this.partition.chunkLower}}}, + { + $match: + {ns: this.partition.ns, [minField]: {$gte: this.partition.chunkLower}} + }, {$sample: {size: 1}} ]) .toArray()[0]; @@ -95,8 +104,8 @@ var $config = (function() { { $match: { ns: this.partition.ns, - 'min._id': {$gte: this.partition.chunkLower}, - 'max._id': {$lte: this.partition.chunkUpper} + [minField]: {$gte: this.partition.chunkLower}, + [maxField]: {$lte: this.partition.chunkUpper} } }, {$sample: {size: 1}} |