summaryrefslogtreecommitdiff
path: root/jstests/concurrency
diff options
context:
space:
mode:
authorAli Mir <ali.mir@mongodb.com>2020-06-26 14:29:36 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2020-07-09 22:12:59 +0000
commit915402884c52da861b1660cd6a7172c552ce1806 (patch)
treebd3a061e2f02eb596f274a418f2b9d77cd15099a /jstests/concurrency
parentaf43724d0602075993a181955b96f7854dc4f698 (diff)
downloadmongo-915402884c52da861b1660cd6a7172c552ce1806.tar.gz
SERVER-47789 Add concurrency tests for snapshot reads using atClusterTime
Diffstat (limited to 'jstests/concurrency')
-rw-r--r--jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js66
-rw-r--r--jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_crud_operations.js157
-rw-r--r--jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_ddl_operations.js157
3 files changed, 379 insertions, 1 deletions
diff --git a/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js b/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js
index 8c02b494af8..c76609bad75 100644
--- a/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js
+++ b/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js
@@ -65,7 +65,6 @@ function doSnapshotFind(sortByAscending, collName, data, findErrorCodes) {
* Performs a snapshot getmore. This function is to be used in conjunction with doSnapshotFind.
*/
function doSnapshotGetMore(collName, data, getMoreErrorCodes, commitTransactionErrorCodes) {
- // doSnapshotGetMore may be called even if doSnapshotFind fails to obtain a cursor.
if (bsonWoCompare({_: data.cursorId}, {_: NumberLong(0)}) === 0) {
return;
}
@@ -97,6 +96,71 @@ function doSnapshotGetMore(collName, data, getMoreErrorCodes, commitTransactionE
}
/**
+ * Performs a find with readConcern {level: "snapshot"} and optionally atClusterTime, if specified.
+ */
+function doSnapshotFindAtClusterTime(
+ db, collName, data, findErrorCodes, sortOrder, checkSnapshotCorrectness) {
+ const findCmd = {
+ find: collName,
+ sort: sortOrder,
+ batchSize: data.batchSize,
+ readConcern: {level: "snapshot"}
+ };
+ if (data.atClusterTime) {
+ findCmd.readConcern.atClusterTime = data.atClusterTime;
+ }
+
+ let res = db.runCommand(findCmd);
+ assert.commandWorkedOrFailedWithCode(
+ res, findErrorCodes, () => `cmd: ${tojson(findCmd)}, res: ${tojson(res)}`);
+ const cursor = parseCursor(res);
+
+ if (!cursor) {
+ data.cursorId = NumberLong(0);
+ } else {
+ assert(cursor.hasOwnProperty("firstBatch"), tojson(res));
+ assert(cursor.hasOwnProperty("atClusterTime"), tojson(res));
+ // Store the cursorId and cursor in the data object.
+ assert.neq(cursor.id, 0);
+ data.cursorId = cursor.id;
+ // checkSnapshotCorrectness verifies that the snapshot sees the correct documents.
+ if (typeof checkSnapshotCorrectness === "function") {
+ checkSnapshotCorrectness(res);
+ }
+ }
+}
+
+/**
+ * Performs a getMore on a previously established snapshot cursor. This function is to be used in
+ * conjunction with doSnapshotFindAtClusterTime.
+ */
+function doSnapshotGetMoreAtClusterTime(
+ db, collName, data, getMoreErrorCodes, checkSnapshotCorrectness) {
+ const getMoreCmd = {
+ getMore: data.cursorId,
+ collection: collName,
+ batchSize: data.batchSize,
+ };
+ let res = db.runCommand(getMoreCmd);
+ assert.commandWorkedOrFailedWithCode(
+ res, getMoreErrorCodes, () => `cmd: ${tojson(getMoreCmd)}, res: ${tojson(res)}`);
+ const cursor = parseCursor(res);
+ if (cursor) {
+ data.cursorId = cursor.id;
+ if (bsonWoCompare({_: data.cursorId}, {_: NumberLong(0)}) === 0) {
+ return;
+ }
+ // checkSnapshotCorrectness verifies that the snapshot sees the correct documents.
+ if (typeof checkSnapshotCorrectness === "function") {
+ assert(cursor.hasOwnProperty("nextBatch"), tojson(res));
+ checkSnapshotCorrectness(res);
+ }
+ } else {
+ data.cursorId = NumberLong(0);
+ }
+}
+
+/**
* This function can be used to share session data across threads.
*/
function insertSessionDoc(db, collName, tid, sessionId) {
diff --git a/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_crud_operations.js b/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_crud_operations.js
new file mode 100644
index 00000000000..a1aafbcba8d
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_crud_operations.js
@@ -0,0 +1,157 @@
+'use strict';
+
+/**
+ * Perform point-in-time snapshot reads that span a 'find' and multiple 'getmore's concurrently with
+ * CRUD operations, after initial insert operations. This tests that the effects of concurrent CRUD
+ * operations are not visible to the point-in-time snapshot reads. The initial inserted documents
+ * (prior to the atClusterTime timestamp) are of the pattern:
+ * {_id: (0-99), x:1}. The subsequent inserted documents have a generated ObjectId as _id. Document
+ * updates increment the value of x. We test that the snapshot read only returns documents where _id
+ * is between 0-99, and the value of x is always 1.
+ *
+ * @tags: [requires_fcv_46, requires_replication, does_not_support_causal_consistency,
+ * requires_majority_read_concern]
+ */
+
+load('jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js');
+var $config = (function() {
+ const data = {numIds: 100, numDocsToInsertPerThread: 5, batchSize: 10};
+
+ const states = {
+ init: function init(db, collName) {
+ this.atClusterTime = new Timestamp(this.clusterTime.t, this.clusterTime.i);
+ jsTestLog("atClusterTime Timestamp: " + this.atClusterTime.toString());
+ this.numDocScanned = 0;
+ },
+
+ snapshotScan: function snapshotScan(db, collName) {
+ const readErrorCodes = [
+ ErrorCodes.ShutdownInProgress,
+ ];
+ if (!this.cursorId || this.cursorId == 0) {
+ doSnapshotFindAtClusterTime(db, collName, this, readErrorCodes, {_id: 1}, (res) => {
+ let expectedDocs =
+ [...Array(this.batchSize).keys()].map((i) => ({_id: i, x: 1}));
+ assert.eq(res.cursor.firstBatch, expectedDocs, () => tojson(res));
+ this.numDocScanned = this.batchSize;
+ });
+ } else {
+ doSnapshotGetMoreAtClusterTime(db, collName, this, readErrorCodes, (res) => {
+ let expectedDocs = [...Array(this.batchSize).keys()].map(
+ (i) => ({_id: i + this.numDocScanned, x: 1}));
+ assert.eq(res.cursor.nextBatch, expectedDocs, () => tojson(res));
+ this.numDocScanned = this.numDocScanned + this.batchSize;
+ });
+ }
+ },
+
+ insertDocs: function insertDocs(db, collName) {
+ for (let i = 0; i < this.numDocsToInsertPerThread; ++i) {
+ const res = db[collName].insert({x: 1});
+ assertWhenOwnColl.commandWorked(res);
+ assertWhenOwnColl.eq(1, res.nInserted);
+ }
+ },
+
+ updateDocs: function updateDocs(db, collName) {
+ for (let i = 0; i < this.numIds; ++i) {
+ assert.commandWorked(db[collName].update({_id: i}, {$inc: {x: 1}}));
+ }
+ },
+
+ readDocs: function readDocs(db, collName) {
+ for (let i = 0; i < this.numIds; ++i) {
+ db[collName].findOne({_id: i});
+ }
+ },
+
+ deleteDocs: function deleteDocs(db, collName) {
+ let indexToDelete = Math.floor(Math.random() * this.numIds);
+ assert.commandWorked(db[collName].deleteOne({_id: indexToDelete}));
+ },
+
+ killOp: function killOp(db, collName) {
+ // Find the object ID of the getMore in the snapshot read, if it is running, and attempt
+ // to kill the operation.
+ const res = assert.commandWorkedOrFailedWithCode(
+ db.adminCommand(
+ {currentOp: 1, ns: {$regex: db.getName() + "\." + collName}, op: "getmore"}),
+ [ErrorCodes.Interrupted]);
+ if (res.hasOwnProperty("inprog") && res.inprog.length) {
+ const killOpCmd = {killOp: 1, op: res.inprog[0].opid};
+ const killRes = db.adminCommand(killOpCmd);
+ assert.commandWorkedOrFailedWithCode(killRes, ErrorCodes.Interrupted);
+ }
+ },
+ };
+
+ const transitions = {
+ init: {
+ snapshotScan: 0.2,
+ insertDocs: 0.2,
+ updateDocs: 0.2,
+ deleteDocs: 0.2,
+ readDocs: 0.2,
+ },
+ snapshotScan:
+ {insertDocs: 0.2, updateDocs: 0.2, deleteDocs: 0.2, readDocs: 0.2, killOp: 0.2},
+ insertDocs: {snapshotScan: 1.0},
+ updateDocs: {snapshotScan: 1.0},
+ readDocs: {snapshotScan: 1.0},
+ deleteDocs: {snapshotScan: 1.0},
+ killOp: {snapshotScan: 1.0}
+ };
+
+ let minSnapshotHistoryWindowInSecondsDefault;
+
+ function setup(db, collName, cluster) {
+ // We temporarily increase the minimum snapshot history window to ensure point-in-time reads
+ // at the initial insert timestamp are valid throughout the duration of this test.
+ cluster.executeOnMongodNodes((db) => {
+ const res = db.adminCommand({setParameter: 1, minSnapshotHistoryWindowInSeconds: 3600});
+ assert.commandWorked(res);
+ minSnapshotHistoryWindowInSecondsDefault = res.was;
+ });
+ // We modify chunk history to be larger on config nodes to ensure snapshot reads succeed for
+ // sharded clusters.
+ if (cluster.isSharded()) {
+ cluster.executeOnConfigNodes((db) => {
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, minSnapshotHistoryWindowInSeconds: 3600}));
+ });
+ }
+ assertWhenOwnColl.commandWorked(db.runCommand({create: collName}));
+ const docs = [...Array(this.numIds).keys()].map((i) => ({_id: i, x: 1}));
+ this.clusterTime =
+ assert.commandWorked(db.runCommand({insert: collName, documents: docs})).operationTime;
+ }
+
+ function teardown(db, collName, cluster) {
+ assertWhenOwnColl.commandWorked(db.runCommand({drop: collName}));
+ cluster.executeOnMongodNodes(function(db) {
+ assert.commandWorked(db.adminCommand({
+ setParameter: 1,
+ minSnapshotHistoryWindowInSeconds: minSnapshotHistoryWindowInSecondsDefault
+ }));
+ });
+ if (cluster.isSharded()) {
+ cluster.executeOnConfigNodes((db) => {
+ assert.commandWorked(db.adminCommand({
+ setParameter: 1,
+ minSnapshotHistoryWindowInSeconds: minSnapshotHistoryWindowInSecondsDefault
+ }));
+ });
+ }
+ }
+
+ return {
+ threadCount: 5,
+ iterations: 50,
+ startState: 'init',
+ states: states,
+ transitions: transitions,
+ setup: setup,
+ teardown: teardown,
+ data: data,
+ };
+})();
diff --git a/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_ddl_operations.js b/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_ddl_operations.js
new file mode 100644
index 00000000000..e8b1898ea26
--- /dev/null
+++ b/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_ddl_operations.js
@@ -0,0 +1,157 @@
+'use strict';
+
+/**
+ * Perform point-in-time snapshot reads that span a 'find' and multiple 'getmore's concurrently with
+ * CRUD operations. Index operations running concurrently with the snapshot read may cause
+ * the read to fail with a SnapshotUnavailable error.
+ *
+ * @tags: [creates_background_indexes, requires_fcv_46, requires_replication,
+ * does_not_support_causal_consistency, requires_majority_read_concern]
+ */
+
+load('jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js');
+var $config = (function() {
+ const data = {numIds: 100, numDocsToInsertPerThread: 5, batchSize: 10};
+
+ const states = {
+
+ snapshotScan: function snapshotScan(db, collName) {
+ const readErrorCodes = [
+ ErrorCodes.SnapshotUnavailable,
+ ErrorCodes.ShutdownInProgress,
+ ErrorCodes.CursorNotFound,
+ ErrorCodes.QueryPlanKilled,
+ ];
+ if (!this.cursorId || this.cursorId == 0) {
+ doSnapshotFindAtClusterTime(db, collName, this, readErrorCodes, {a: 1});
+ } else {
+ doSnapshotGetMoreAtClusterTime(db, collName, this, readErrorCodes);
+ }
+ },
+
+ insertDocs: function insertDocs(db, collName) {
+ for (let i = 0; i < this.numDocsToInsertPerThread; ++i) {
+ const res = db[collName].insert({x: 1});
+ assertWhenOwnColl.commandWorked(res);
+ assertWhenOwnColl.eq(1, res.nInserted);
+ }
+ },
+
+ updateDocs: function updateDocs(db, collName) {
+ for (let i = 0; i < this.numIds; ++i) {
+ try {
+ db[collName].update({a: i}, {$inc: {x: 1}});
+ } catch (e) {
+ // dropIndex can cause queries to throw if these queries yield.
+ assertAlways.contains(e.code,
+ [ErrorCodes.QueryPlanKilled, ErrorCodes.OperationFailed],
+ 'unexpected error code: ' + e.code + ': ' + e.message);
+ }
+ }
+ },
+
+ readDocs: function readDocs(db, collName) {
+ for (let i = 0; i < this.numIds; ++i) {
+ try {
+ db[collName].findOne({a: i});
+ } catch (e) {
+ // dropIndex can cause queries to throw if these queries yield.
+ assertAlways.contains(e.code,
+ [ErrorCodes.QueryPlanKilled, ErrorCodes.OperationFailed],
+ 'unexpected error code: ' + e.code + ': ' + e.message);
+ }
+ }
+ },
+
+ deleteDocs: function deleteDocs(db, collName) {
+ let indexToDelete = Math.floor(Math.random() * this.numIds);
+ try {
+ db[collName].deleteOne({a: indexToDelete});
+ } catch (e) {
+ // dropIndex can cause queries to throw if these queries yield.
+ assertAlways.contains(e.code,
+ [ErrorCodes.QueryPlanKilled, ErrorCodes.OperationFailed],
+ 'unexpected error code: ' + e.code + ': ' + e.message);
+ }
+ },
+
+ createIndex: function createIndex(db, collName) {
+ db[collName].createIndex({a: 1}, {background: true});
+ },
+
+ dropIndex: function dropIndex(db, collName) {
+ db[collName].dropIndex({a: 1});
+ },
+
+ };
+
+ const transitions = {
+ snapshotScan: {
+ insertDocs: 0.17,
+ updateDocs: 0.16,
+ deleteDocs: 0.17,
+ readDocs: 0.16,
+ createIndex: 0.17,
+ dropIndex: 0.17,
+ },
+ insertDocs: {snapshotScan: 1.0},
+ updateDocs: {snapshotScan: 1.0},
+ readDocs: {snapshotScan: 1.0},
+ deleteDocs: {snapshotScan: 1.0},
+ createIndex: {snapshotScan: 1.0},
+ dropIndex: {snapshotScan: 1.0},
+ };
+
+ let minSnapshotHistoryWindowInSecondsDefault;
+
+ function setup(db, collName, cluster) {
+ // We temporarily increase the minimum snapshot history window to ensure point-in-time reads
+ // at the initial insert timestamp are valid throughout the duration of this test.
+ cluster.executeOnMongodNodes((db) => {
+ const res = db.adminCommand({setParameter: 1, minSnapshotHistoryWindowInSeconds: 3600});
+ assert.commandWorked(res);
+ minSnapshotHistoryWindowInSecondsDefault = res.was;
+ });
+ // We modify chunk history to be larger on config nodes to ensure snapshot reads succeed for
+ // sharded clusters.
+ if (cluster.isSharded()) {
+ cluster.executeOnConfigNodes((db) => {
+ assert.commandWorked(
+ db.adminCommand({setParameter: 1, minSnapshotHistoryWindowInSeconds: 3600}));
+ });
+ }
+ assertWhenOwnColl.commandWorked(db.runCommand({create: collName}));
+ const docs = [...Array(this.numIds).keys()].map((i) => ({a: i, x: 1}));
+ assert.commandWorked(db.runCommand({insert: collName, documents: docs}));
+ assert.commandWorked(
+ db.runCommand({createIndexes: collName, indexes: [{key: {a: 1}, name: "a_1"}]}));
+ }
+
+ function teardown(db, collName, cluster) {
+ cluster.executeOnMongodNodes(function(db) {
+ assert.commandWorked(db.adminCommand({
+ setParameter: 1,
+ minSnapshotHistoryWindowInSeconds: minSnapshotHistoryWindowInSecondsDefault
+ }));
+ });
+ if (cluster.isSharded()) {
+ cluster.executeOnConfigNodes((db) => {
+ assert.commandWorked(db.adminCommand({
+ setParameter: 1,
+ minSnapshotHistoryWindowInSeconds: minSnapshotHistoryWindowInSecondsDefault
+ }));
+ });
+ }
+ }
+
+ return {
+ threadCount: 5,
+ iterations: 50,
+ startState: 'snapshotScan',
+ states: states,
+ transitions: transitions,
+ setup: setup,
+ teardown: teardown,
+ data: data,
+ };
+})();