summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-12-19 14:56:58 -0500
committerJack Mulrow <jack.mulrow@mongodb.com>2019-02-01 16:27:48 -0500
commit2dc3359cfe83cafa0f450a0dc7e2815f48ad08b4 (patch)
tree1ca5294dea3cc00bfbcdc77b5a2a45757f69cca8
parent0a91b031441bfcd69d4e28c0a7f2d0eb51cbb516 (diff)
downloadmongo-2dc3359cfe83cafa0f450a0dc7e2815f48ad08b4.tar.gz
SERVER-38435 Add concurrency workloads for multi shard writes in transactions with migrations
-rw-r--r--jstests/concurrency/fsm_workloads/random_moveChunk_base.js132
-rw-r--r--jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_delete_transaction.js165
-rw-r--r--jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_update_transaction.js107
-rw-r--r--jstests/concurrency/fsm_workloads/sharded_base_partitioned.js17
4 files changed, 417 insertions, 4 deletions
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_base.js b/jstests/concurrency/fsm_workloads/random_moveChunk_base.js
new file mode 100644
index 00000000000..a630d83093c
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/random_moveChunk_base.js
@@ -0,0 +1,132 @@
+'use strict';
+
+/**
+ * Shards a collection by 'skey' and creates one chunk per thread, filling each chunk with
+ * documents, and assigning each document to a random thread. Meant to be extended by workloads that
+ * test operations with concurrent moveChunks. Assumes each thread has an id from [0, threadCount).
+ *
+ * @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off];
+ */
+load('jstests/concurrency/fsm_libs/extend_workload.js');
+load('jstests/concurrency/fsm_workloads/sharded_base_partitioned.js');
+
+var $config = extendWorkload($config, function($config, $super) {
+
+ $config.threadCount = 1;
+ $config.iterations = 1;
+
+ $config.data.shardKey = {skey: 1};
+ $config.data.shardKeyField = 'skey';
+
+ // Which skey and _id values are owned by this thread (they are equal by default), populated in
+ // init().
+ $config.data.ownedIds = [];
+
+ // Depending on the operations performed by each workload, it might be expected that a random
+ // moveChunk may fail with an error code other than those expected by the helper.
+ $config.data.isMoveChunkErrorAcceptable = (err) => false;
+
+ /**
+ * Returns the _id of a random document owned by this thread.
+ */
+ $config.data.getIdForThread = function getIdForThread() {
+ assertAlways.neq(0, this.ownedIds.size);
+ return this.ownedIds[Random.randInt(this.ownedIds.length)];
+ };
+
+ /**
+ * Picks a random chunk and moves it to a random new shard. The migration is retried on
+ * acceptable errors, e.g. ConflictingOperationInProgress, and is not guaranteed to succeed.
+ */
+ $config.states.moveChunk = function moveChunk(db, collName, connCache) {
+ // Choose a random chunk in our partition to move.
+ const chunk = this.getRandomChunkInPartition(ChunkHelper.getPrimary(connCache.config));
+ const fromShard = chunk.shard;
+
+ // Choose a random shard to move the chunk to.
+ const shardNames = Object.keys(connCache.shards);
+ const destinationShards = shardNames.filter(function(shard) {
+ if (shard !== fromShard) {
+ return shard;
+ }
+ });
+ const toShard = destinationShards[Random.randInt(destinationShards.length)];
+
+ // Use chunk_helper.js's moveChunk wrapper to tolerate acceptable failures and to use a
+ // limited number of retries with exponential backoff.
+ const bounds = [
+ {[this.shardKeyField]: chunk.min[this.shardKeyField]},
+ {[this.shardKeyField]: chunk.max[this.shardKeyField]}
+ ];
+ const waitForDelete = Random.rand() < 0.5;
+ try {
+ ChunkHelper.moveChunk(db, collName, bounds, toShard, waitForDelete);
+ } catch (e) {
+ // Failed moveChunks are thrown by the moveChunk helper with the response included as a
+ // JSON string in the error's message.
+ if (this.isMoveChunkErrorAcceptable(e)) {
+ print("Ignoring acceptable moveChunk error: " + tojson(e));
+ return;
+ }
+
+ throw e;
+ }
+ };
+
+ /**
+ * Loads this threads partition and the _ids of owned documents into memory.
+ */
+ $config.states.init = function init(db, collName, connCache) {
+ // Load this thread's partition.
+ const ns = db[collName].getFullName();
+ this.partition = this.makePartition(ns, this.tid, this.partitionSize);
+
+ // Search the collection to find the _ids of docs assigned to this thread.
+ const docsOwnedByThread = db[collName].find({tid: this.tid}).toArray();
+ assert.neq(0, docsOwnedByThread.size);
+ docsOwnedByThread.forEach(doc => {
+ this.ownedIds.push(doc._id);
+ });
+ };
+
+ /**
+ * Sets up the collection so each thread's partition is a single chunk, with partitionSize
+ * documents within it, randomly assigning each document to a thread, ensuring at least one
+ * document is given to each one.
+ */
+ $config.setup = function setup(db, collName, cluster) {
+ const ns = db[collName].getFullName();
+
+ for (let tid = 0; tid < this.threadCount; ++tid) {
+ // Find the thread's partition.
+ const partition = this.makePartition(ns, tid, this.partitionSize);
+ let bulk = db[collName].initializeUnorderedBulkOp();
+
+ let choseThisThread = false;
+ for (let i = partition.lower; i < partition.upper; ++i) {
+ // Randomly assign threads, but ensure each thread is given at least one document.
+ let chosenThread = Random.randInt(this.threadCount);
+
+ choseThisThread = choseThisThread || chosenThread === tid;
+ if (i === partition.upper - 1 && !choseThisThread) {
+ chosenThread = tid;
+ }
+
+ // Give each document the same shard key and _id value, but a different tid.
+ bulk.insert({_id: i, skey: i, tid: chosenThread});
+ }
+ assertAlways.writeOK(bulk.execute());
+
+ // Create a chunk with boundaries matching the partition's. The low chunk's lower bound
+ // is minKey, so a split is not necessary.
+ if (!partition.isLowChunk) {
+ assertAlways.commandWorked(
+ db.adminCommand({split: ns, middle: {skey: partition.lower}}));
+ }
+ }
+ };
+
+ $config.transitions = {init: {moveChunk: 1}, moveChunk: {moveChunk: 1}};
+
+ return $config;
+});
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_delete_transaction.js b/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_delete_transaction.js
new file mode 100644
index 00000000000..941118fd0bf
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_delete_transaction.js
@@ -0,0 +1,165 @@
+'use strict';
+
+/**
+ * Performs deletes in transactions without the shard key while chunks are being moved. This
+ * includes multi=true deletes and multi=false deletes with exact _id queries.
+ *
+ * @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off,
+ * requires_non_retryable_writes, uses_transactions];
+ */
+load('jstests/concurrency/fsm_libs/extend_workload.js');
+load('jstests/concurrency/fsm_workloads/random_moveChunk_base.js');
+load('jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js');
+
+var $config = extendWorkload($config, function($config, $super) {
+
+ $config.threadCount = 5;
+ $config.iterations = 50;
+
+ // Number of documents per partition. Note that there is one chunk per partition and one
+ // partition per thread.
+ $config.data.partitionSize = 100;
+
+ // In-memory representation of the documents owned by this thread. Used to verify the expected
+ // documents are deleted in the collection.
+ $config.data.expectedDocuments = [];
+
+ // The number of "groups" each document within those assigned to a thread can belong to. Entire
+ // groups will be deleted at once by the multiDelete state function, so this is effectively the
+ // number of times that stage can be meaningfully run per thread.
+ $config.data.numGroupsWithinThread = $config.data.partitionSize / 5;
+ $config.data.nextGroupId = 0;
+
+ // A moveChunk may fail with a WriteConflict when clearing orphans on the destination shard if
+ // any of them are concurrently written to by a broadcast transaction operation. The error
+ // message may be different depending on where the failure occurs.
+ // TODO SERVER-39141: Remove once the range deleter retries on write conflict exceptions.
+ $config.data.isMoveChunkErrorAcceptable = (err) => {
+ return err.message && (err.message.indexOf("WriteConflict") > -1 ||
+ err.message.indexOf("CommandFailed") > -1);
+ };
+
+ /**
+ * Returns the next groupId for the multiDelete state function to use.
+ */
+ $config.data.getNextGroupIdForDelete = function getNextGroupIdForDelete() {
+ const nextId = this.nextGroupId;
+ this.nextGroupId = (this.nextGroupId + 1) % this.numGroupsWithinThread;
+ return nextId;
+ };
+
+ /**
+ * Returns the _id of a random document owned by this thread to be deleted by an exact _id
+ * query. Should only be called if this thread hasn't deleted every document assigned to it.
+ */
+ $config.data.getIdForDelete = function getIdForDelete() {
+ assertAlways.neq(0, this.expectedDocuments.length);
+ return this.expectedDocuments[Random.randInt(this.expectedDocuments.length)];
+ };
+
+ /**
+ * Sends a multi=false delete with an exact match on _id for a random id, which should be sent
+ * to all shards.
+ */
+ $config.states.exactIdDelete = function exactIdDelete(db, collName, connCache) {
+ // If no documents remain in our partition, there is nothing to do.
+ if (!this.expectedDocuments.length) {
+ print('This thread owns no more documents, skipping exactIdDelete stage');
+ return;
+ }
+
+ const idToDelete = this.getIdForDelete();
+
+ const collection = this.session.getDatabase(db.getName()).getCollection(collName);
+ withTxnAndAutoRetry(this.session, () => {
+ assertWhenOwnColl.writeOK(collection.remove({_id: idToDelete}, {multi: false}));
+ });
+
+ // Remove the deleted document from the in-memory representation.
+ this.expectedDocuments = this.expectedDocuments.filter(obj => {
+ return obj._id !== idToDelete;
+ });
+ };
+
+ /**
+ * Sends a multi=true delete without the shard key that targets all documents assigned to this
+ * thread, which should be sent to all shards.
+ */
+ $config.states.multiDelete = function multiDelete(db, collName, connCache) {
+ // If no documents remain in our partition, there is nothing to do.
+ if (!this.expectedDocuments.length) {
+ print('This thread owns no more documents, skipping multiDelete stage');
+ return;
+ }
+
+ // Delete a group of documents within those assigned to this thread.
+ const groupIdToDelete = this.getNextGroupIdForDelete();
+
+ const collection = this.session.getDatabase(db.getName()).getCollection(collName);
+ withTxnAndAutoRetry(this.session, () => {
+ assertWhenOwnColl.writeOK(
+ collection.remove({tid: this.tid, groupId: groupIdToDelete}, {multi: true}));
+ });
+
+ // Remove the deleted documents from the in-memory representation.
+ this.expectedDocuments = this.expectedDocuments.filter(obj => {
+ return obj.groupId !== groupIdToDelete;
+ });
+ };
+
+ /**
+ * Asserts only the expected documents for this thread are present in the collection.
+ */
+ $config.states.verifyDocuments = function verifyDocuments(db, collName, connCache) {
+ const docs = db[collName].find({tid: this.tid}).toArray();
+ assertWhenOwnColl.eq(this.expectedDocuments.length,
+ docs.length,
+ 'unexpected number of documents, docs: ' + tojson(docs));
+
+ // Verify only the documents we haven't tried to delete were found.
+ const expectedDocIds = new Set(this.expectedDocuments.map(doc => doc._id));
+ docs.forEach(doc => {
+ assertWhenOwnColl(expectedDocIds.has(doc._id),
+ 'expected document to be deleted, doc: ' + tojson(doc));
+ expectedDocIds.delete(doc._id);
+ });
+
+ // All expected document ids should have been found in the collection.
+ assertWhenOwnColl.eq(
+ 0,
+ expectedDocIds.size,
+ 'did not find all expected documents, _ids not found: ' + tojson(expectedDocIds));
+ };
+
+ /**
+ * Sets up the base workload, starts a session, gives each document assigned to this thread a
+ * group id for multi=true deletes, and loads each document into memory.
+ */
+ $config.states.init = function init(db, collName, connCache) {
+ $super.states.init.apply(this, arguments);
+
+ this.session = db.getMongo().startSession({causalConsistency: false});
+
+ // Assign each document owned by this thread to a different "group" so they can be multi
+ // deleted by group later.
+ let nextGroupId = 0;
+ db[collName].find({tid: this.tid}).forEach(doc => {
+ assert.writeOK(db[collName].update({_id: doc._id}, {$set: {groupId: nextGroupId}}));
+ nextGroupId = (nextGroupId + 1) % this.numGroupsWithinThread;
+ });
+
+ // Store the updated documents in-memory so the test can verify the expected ones are
+ // deleted.
+ this.expectedDocuments = db[collName].find({tid: this.tid}).toArray();
+ };
+
+ $config.transitions = {
+ init: {moveChunk: 0.2, exactIdDelete: 0.4, multiDelete: 0.4},
+ moveChunk: {moveChunk: 0.2, exactIdDelete: 0.3, multiDelete: 0.3, verifyDocuments: 0.2},
+ exactIdDelete: {moveChunk: 0.2, exactIdDelete: 0.3, multiDelete: 0.3, verifyDocuments: 0.2},
+ multiDelete: {moveChunk: 0.2, exactIdDelete: 0.3, multiDelete: 0.3, verifyDocuments: 0.2},
+ verifyDocuments: {moveChunk: 0.2, exactIdDelete: 0.4, multiDelete: 0.4},
+ };
+
+ return $config;
+});
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_update_transaction.js b/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_update_transaction.js
new file mode 100644
index 00000000000..fc2757bd379
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/random_moveChunk_broadcast_update_transaction.js
@@ -0,0 +1,107 @@
+'use strict';
+
+/**
+ * Performs updates in transactions without the shard key while chunks are being moved. This
+ * includes multi=true updates and multi=false updates with exact _id queries.
+ *
+ * @tags: [requires_sharding, assumes_balancer_off, assumes_autosplit_off,
+ * requires_non_retryable_writes, uses_transactions];
+ */
+load('jstests/concurrency/fsm_libs/extend_workload.js');
+load('jstests/concurrency/fsm_workloads/random_moveChunk_base.js');
+load('jstests/concurrency/fsm_workload_helpers/auto_retry_transaction.js');
+
+var $config = extendWorkload($config, function($config, $super) {
+
+ $config.threadCount = 5;
+ $config.iterations = 50;
+
+ // Number of documents per partition. Note that there is one chunk per partition and one
+ // partition per thread.
+ $config.data.partitionSize = 100;
+
+ // The counter values associated with each owned id. Used to verify updates aren't double
+ // applied.
+ $config.data.expectedCounters = {};
+
+ // A moveChunk may fail with a WriteConflict when clearing orphans on the destination shard if
+ // any of them are concurrently written to by a broadcast transaction operation. The error
+ // message may be different depending on where the failure occurs.
+ // TODO SERVER-39141: Remove once the range deleter retries on write conflict exceptions.
+ $config.data.isMoveChunkErrorAcceptable = (err) => {
+ return err.message && (err.message.indexOf("WriteConflict") > -1 ||
+ err.message.indexOf("CommandFailed") > -1);
+ };
+
+ /**
+ * Sends a multi=false update with an exact match on _id for a random document assigned to this
+ * thread, which should be sent to all shards.
+ */
+ $config.states.exactIdUpdate = function exactIdUpdate(db, collName, connCache) {
+ const idToUpdate = this.getIdForThread();
+
+ const collection = this.session.getDatabase(db.getName()).getCollection(collName);
+ withTxnAndAutoRetry(this.session, () => {
+ assertWhenOwnColl.writeOK(
+ collection.update({_id: idToUpdate}, {$inc: {counter: 1}}, {multi: false}));
+ });
+
+ // Update the expected counter for the targeted id.
+ this.expectedCounters[idToUpdate] += 1;
+ };
+
+ /**
+ * Sends a multi=true update without the shard key that targets all documents assigned to this
+ * thread, which should be sent to all shards.
+ */
+ $config.states.multiUpdate = function multiUpdate(db, collName, connCache) {
+ const collection = this.session.getDatabase(db.getName()).getCollection(collName);
+ withTxnAndAutoRetry(this.session, () => {
+ assertWhenOwnColl.writeOK(
+ collection.update({tid: this.tid}, {$inc: {counter: 1}}, {multi: true}));
+ });
+
+ // The expected counter for every document owned by this thread should be incremented.
+ Object.keys(this.expectedCounters).forEach(id => {
+ this.expectedCounters[id] += 1;
+ });
+ };
+
+ /**
+ * Asserts all documents assigned to this thread match their expected values.
+ */
+ $config.states.verifyDocuments = function verifyDocuments(db, collName, connCache) {
+ const docs = db[collName].find({tid: this.tid}).toArray();
+ docs.forEach(doc => {
+ const expectedCounter = this.expectedCounters[doc._id];
+ assertWhenOwnColl.eq(
+ expectedCounter, doc.counter, 'unexpected counter value, doc: ' + tojson(doc));
+ });
+ };
+
+ /**
+ * Sets up the base workload, starts a session, and gives each document assigned to this thread
+ * a counter value that is tracked in-memory.
+ */
+ $config.states.init = function init(db, collName, connCache) {
+ $super.states.init.apply(this, arguments);
+
+ this.session = db.getMongo().startSession({causalConsistency: false});
+
+ // Assign a default counter value to each document owned by this thread.
+ db[collName].find({tid: this.tid}).forEach(doc => {
+ this.expectedCounters[doc._id] = 0;
+ assert.writeOK(db[collName].update({_id: doc._id}, {$set: {counter: 0}}));
+ });
+ };
+
+ $config.transitions = {
+ init: {moveChunk: 0.2, exactIdUpdate: 0.4, multiUpdate: 0.4},
+ moveChunk: {moveChunk: 0.2, exactIdUpdate: 0.3, multiUpdate: 0.3, verifyDocuments: 0.2},
+ exactIdUpdate: {moveChunk: 0.2, exactIdUpdate: 0.3, multiUpdate: 0.3, verifyDocuments: 0.2},
+ multiUpdate: {moveChunk: 0.2, exactIdUpdate: 0.3, multiUpdate: 0.3, verifyDocuments: 0.2},
+ verifyDocuments: {moveChunk: 0.2, exactIdUpdate: 0.4, multiUpdate: 0.4},
+ };
+
+ return $config;
+});
diff --git a/jstests/concurrency/fsm_workloads/sharded_base_partitioned.js b/jstests/concurrency/fsm_workloads/sharded_base_partitioned.js
index e778a3d64cb..018951ea053 100644
--- a/jstests/concurrency/fsm_workloads/sharded_base_partitioned.js
+++ b/jstests/concurrency/fsm_workloads/sharded_base_partitioned.js
@@ -31,6 +31,7 @@ var $config = (function() {
// shard. The setup function creates documents with sequential numbering and gives
// each shard its own numeric range to work with.
shardKey: {_id: 1},
+ shardKeyField: '_id',
};
data.makePartition = function makePartition(ns, tid, partitionSize) {
@@ -68,6 +69,8 @@ var $config = (function() {
// We must split up these cases because MinKey and MaxKey are not fully comparable.
// This may be due to SERVER-18341, where the Matcher returns false positives in
// comparison predicates with MinKey/MaxKey.
+ const maxField = 'max.' + this.shardKeyField;
+ const minField = 'min.' + this.shardKeyField;
if (this.partition.isLowChunk && this.partition.isHighChunk) {
return coll
.aggregate([
@@ -78,14 +81,20 @@ var $config = (function() {
} else if (this.partition.isLowChunk) {
return coll
.aggregate([
- {$match: {ns: this.partition.ns, 'max._id': {$lte: this.partition.chunkUpper}}},
+ {
+ $match:
+ {ns: this.partition.ns, [maxField]: {$lte: this.partition.chunkUpper}}
+ },
{$sample: {size: 1}}
])
.toArray()[0];
} else if (this.partition.isHighChunk) {
return coll
.aggregate([
- {$match: {ns: this.partition.ns, 'min._id': {$gte: this.partition.chunkLower}}},
+ {
+ $match:
+ {ns: this.partition.ns, [minField]: {$gte: this.partition.chunkLower}}
+ },
{$sample: {size: 1}}
])
.toArray()[0];
@@ -95,8 +104,8 @@ var $config = (function() {
{
$match: {
ns: this.partition.ns,
- 'min._id': {$gte: this.partition.chunkLower},
- 'max._id': {$lte: this.partition.chunkUpper}
+ [minField]: {$gte: this.partition.chunkLower},
+ [maxField]: {$lte: this.partition.chunkUpper}
}
},
{$sample: {size: 1}}