diff options
3 files changed, 379 insertions, 1 deletions
diff --git a/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js b/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js index 8c02b494af8..c76609bad75 100644 --- a/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js +++ b/jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js @@ -65,7 +65,6 @@ function doSnapshotFind(sortByAscending, collName, data, findErrorCodes) { * Performs a snapshot getmore. This function is to be used in conjunction with doSnapshotFind. */ function doSnapshotGetMore(collName, data, getMoreErrorCodes, commitTransactionErrorCodes) { - // doSnapshotGetMore may be called even if doSnapshotFind fails to obtain a cursor. if (bsonWoCompare({_: data.cursorId}, {_: NumberLong(0)}) === 0) { return; } @@ -97,6 +96,71 @@ function doSnapshotGetMore(collName, data, getMoreErrorCodes, commitTransactionE } /** + * Performs a find with readConcern {level: "snapshot"} and optionally atClusterTime, if specified. + */ +function doSnapshotFindAtClusterTime( + db, collName, data, findErrorCodes, sortOrder, checkSnapshotCorrectness) { + const findCmd = { + find: collName, + sort: sortOrder, + batchSize: data.batchSize, + readConcern: {level: "snapshot"} + }; + if (data.atClusterTime) { + findCmd.readConcern.atClusterTime = data.atClusterTime; + } + + let res = db.runCommand(findCmd); + assert.commandWorkedOrFailedWithCode( + res, findErrorCodes, () => `cmd: ${tojson(findCmd)}, res: ${tojson(res)}`); + const cursor = parseCursor(res); + + if (!cursor) { + data.cursorId = NumberLong(0); + } else { + assert(cursor.hasOwnProperty("firstBatch"), tojson(res)); + assert(cursor.hasOwnProperty("atClusterTime"), tojson(res)); + // Store the cursorId and cursor in the data object. + assert.neq(cursor.id, 0); + data.cursorId = cursor.id; + // checkSnapshotCorrectness verifies that the snapshot sees the correct documents. + if (typeof checkSnapshotCorrectness === "function") { + checkSnapshotCorrectness(res); + } + } +} + +/** + * Performs a getMore on a previously established snapshot cursor. This function is to be used in + * conjunction with doSnapshotFindAtClusterTime. + */ +function doSnapshotGetMoreAtClusterTime( + db, collName, data, getMoreErrorCodes, checkSnapshotCorrectness) { + const getMoreCmd = { + getMore: data.cursorId, + collection: collName, + batchSize: data.batchSize, + }; + let res = db.runCommand(getMoreCmd); + assert.commandWorkedOrFailedWithCode( + res, getMoreErrorCodes, () => `cmd: ${tojson(getMoreCmd)}, res: ${tojson(res)}`); + const cursor = parseCursor(res); + if (cursor) { + data.cursorId = cursor.id; + if (bsonWoCompare({_: data.cursorId}, {_: NumberLong(0)}) === 0) { + return; + } + // checkSnapshotCorrectness verifies that the snapshot sees the correct documents. + if (typeof checkSnapshotCorrectness === "function") { + assert(cursor.hasOwnProperty("nextBatch"), tojson(res)); + checkSnapshotCorrectness(res); + } + } else { + data.cursorId = NumberLong(0); + } +} + +/** * This function can be used to share session data across threads. */ function insertSessionDoc(db, collName, tid, sessionId) { diff --git a/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_crud_operations.js b/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_crud_operations.js new file mode 100644 index 00000000000..a1aafbcba8d --- /dev/null +++ b/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_crud_operations.js @@ -0,0 +1,157 @@ +'use strict'; + +/** + * Perform point-in-time snapshot reads that span a 'find' and multiple 'getmore's concurrently with + * CRUD operations, after initial insert operations. This tests that the effects of concurrent CRUD + * operations are not visible to the point-in-time snapshot reads. The initial inserted documents + * (prior to the atClusterTime timestamp) are of the pattern: + * {_id: (0-99), x:1}. The subsequent inserted documents have a generated ObjectId as _id. Document + * updates increment the value of x. We test that the snapshot read only returns documents where _id + * is between 0-99, and the value of x is always 1. + * + * @tags: [requires_fcv_46, requires_replication, does_not_support_causal_consistency, + * requires_majority_read_concern] + */ + +load('jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js'); +var $config = (function() { + const data = {numIds: 100, numDocsToInsertPerThread: 5, batchSize: 10}; + + const states = { + init: function init(db, collName) { + this.atClusterTime = new Timestamp(this.clusterTime.t, this.clusterTime.i); + jsTestLog("atClusterTime Timestamp: " + this.atClusterTime.toString()); + this.numDocScanned = 0; + }, + + snapshotScan: function snapshotScan(db, collName) { + const readErrorCodes = [ + ErrorCodes.ShutdownInProgress, + ]; + if (!this.cursorId || this.cursorId == 0) { + doSnapshotFindAtClusterTime(db, collName, this, readErrorCodes, {_id: 1}, (res) => { + let expectedDocs = + [...Array(this.batchSize).keys()].map((i) => ({_id: i, x: 1})); + assert.eq(res.cursor.firstBatch, expectedDocs, () => tojson(res)); + this.numDocScanned = this.batchSize; + }); + } else { + doSnapshotGetMoreAtClusterTime(db, collName, this, readErrorCodes, (res) => { + let expectedDocs = [...Array(this.batchSize).keys()].map( + (i) => ({_id: i + this.numDocScanned, x: 1})); + assert.eq(res.cursor.nextBatch, expectedDocs, () => tojson(res)); + this.numDocScanned = this.numDocScanned + this.batchSize; + }); + } + }, + + insertDocs: function insertDocs(db, collName) { + for (let i = 0; i < this.numDocsToInsertPerThread; ++i) { + const res = db[collName].insert({x: 1}); + assertWhenOwnColl.commandWorked(res); + assertWhenOwnColl.eq(1, res.nInserted); + } + }, + + updateDocs: function updateDocs(db, collName) { + for (let i = 0; i < this.numIds; ++i) { + assert.commandWorked(db[collName].update({_id: i}, {$inc: {x: 1}})); + } + }, + + readDocs: function readDocs(db, collName) { + for (let i = 0; i < this.numIds; ++i) { + db[collName].findOne({_id: i}); + } + }, + + deleteDocs: function deleteDocs(db, collName) { + let indexToDelete = Math.floor(Math.random() * this.numIds); + assert.commandWorked(db[collName].deleteOne({_id: indexToDelete})); + }, + + killOp: function killOp(db, collName) { + // Find the object ID of the getMore in the snapshot read, if it is running, and attempt + // to kill the operation. + const res = assert.commandWorkedOrFailedWithCode( + db.adminCommand( + {currentOp: 1, ns: {$regex: db.getName() + "\." + collName}, op: "getmore"}), + [ErrorCodes.Interrupted]); + if (res.hasOwnProperty("inprog") && res.inprog.length) { + const killOpCmd = {killOp: 1, op: res.inprog[0].opid}; + const killRes = db.adminCommand(killOpCmd); + assert.commandWorkedOrFailedWithCode(killRes, ErrorCodes.Interrupted); + } + }, + }; + + const transitions = { + init: { + snapshotScan: 0.2, + insertDocs: 0.2, + updateDocs: 0.2, + deleteDocs: 0.2, + readDocs: 0.2, + }, + snapshotScan: + {insertDocs: 0.2, updateDocs: 0.2, deleteDocs: 0.2, readDocs: 0.2, killOp: 0.2}, + insertDocs: {snapshotScan: 1.0}, + updateDocs: {snapshotScan: 1.0}, + readDocs: {snapshotScan: 1.0}, + deleteDocs: {snapshotScan: 1.0}, + killOp: {snapshotScan: 1.0} + }; + + let minSnapshotHistoryWindowInSecondsDefault; + + function setup(db, collName, cluster) { + // We temporarily increase the minimum snapshot history window to ensure point-in-time reads + // at the initial insert timestamp are valid throughout the duration of this test. + cluster.executeOnMongodNodes((db) => { + const res = db.adminCommand({setParameter: 1, minSnapshotHistoryWindowInSeconds: 3600}); + assert.commandWorked(res); + minSnapshotHistoryWindowInSecondsDefault = res.was; + }); + // We modify chunk history to be larger on config nodes to ensure snapshot reads succeed for + // sharded clusters. + if (cluster.isSharded()) { + cluster.executeOnConfigNodes((db) => { + assert.commandWorked( + db.adminCommand({setParameter: 1, minSnapshotHistoryWindowInSeconds: 3600})); + }); + } + assertWhenOwnColl.commandWorked(db.runCommand({create: collName})); + const docs = [...Array(this.numIds).keys()].map((i) => ({_id: i, x: 1})); + this.clusterTime = + assert.commandWorked(db.runCommand({insert: collName, documents: docs})).operationTime; + } + + function teardown(db, collName, cluster) { + assertWhenOwnColl.commandWorked(db.runCommand({drop: collName})); + cluster.executeOnMongodNodes(function(db) { + assert.commandWorked(db.adminCommand({ + setParameter: 1, + minSnapshotHistoryWindowInSeconds: minSnapshotHistoryWindowInSecondsDefault + })); + }); + if (cluster.isSharded()) { + cluster.executeOnConfigNodes((db) => { + assert.commandWorked(db.adminCommand({ + setParameter: 1, + minSnapshotHistoryWindowInSeconds: minSnapshotHistoryWindowInSecondsDefault + })); + }); + } + } + + return { + threadCount: 5, + iterations: 50, + startState: 'init', + states: states, + transitions: transitions, + setup: setup, + teardown: teardown, + data: data, + }; +})(); diff --git a/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_ddl_operations.js b/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_ddl_operations.js new file mode 100644 index 00000000000..e8b1898ea26 --- /dev/null +++ b/jstests/concurrency/fsm_workloads/snapshot_read_at_cluster_time_ddl_operations.js @@ -0,0 +1,157 @@ +'use strict'; + +/** + * Perform point-in-time snapshot reads that span a 'find' and multiple 'getmore's concurrently with + * CRUD operations. Index operations running concurrently with the snapshot read may cause + * the read to fail with a SnapshotUnavailable error. + * + * @tags: [creates_background_indexes, requires_fcv_46, requires_replication, + * does_not_support_causal_consistency, requires_majority_read_concern] + */ + +load('jstests/concurrency/fsm_workload_helpers/snapshot_read_utils.js'); +var $config = (function() { + const data = {numIds: 100, numDocsToInsertPerThread: 5, batchSize: 10}; + + const states = { + + snapshotScan: function snapshotScan(db, collName) { + const readErrorCodes = [ + ErrorCodes.SnapshotUnavailable, + ErrorCodes.ShutdownInProgress, + ErrorCodes.CursorNotFound, + ErrorCodes.QueryPlanKilled, + ]; + if (!this.cursorId || this.cursorId == 0) { + doSnapshotFindAtClusterTime(db, collName, this, readErrorCodes, {a: 1}); + } else { + doSnapshotGetMoreAtClusterTime(db, collName, this, readErrorCodes); + } + }, + + insertDocs: function insertDocs(db, collName) { + for (let i = 0; i < this.numDocsToInsertPerThread; ++i) { + const res = db[collName].insert({x: 1}); + assertWhenOwnColl.commandWorked(res); + assertWhenOwnColl.eq(1, res.nInserted); + } + }, + + updateDocs: function updateDocs(db, collName) { + for (let i = 0; i < this.numIds; ++i) { + try { + db[collName].update({a: i}, {$inc: {x: 1}}); + } catch (e) { + // dropIndex can cause queries to throw if these queries yield. + assertAlways.contains(e.code, + [ErrorCodes.QueryPlanKilled, ErrorCodes.OperationFailed], + 'unexpected error code: ' + e.code + ': ' + e.message); + } + } + }, + + readDocs: function readDocs(db, collName) { + for (let i = 0; i < this.numIds; ++i) { + try { + db[collName].findOne({a: i}); + } catch (e) { + // dropIndex can cause queries to throw if these queries yield. + assertAlways.contains(e.code, + [ErrorCodes.QueryPlanKilled, ErrorCodes.OperationFailed], + 'unexpected error code: ' + e.code + ': ' + e.message); + } + } + }, + + deleteDocs: function deleteDocs(db, collName) { + let indexToDelete = Math.floor(Math.random() * this.numIds); + try { + db[collName].deleteOne({a: indexToDelete}); + } catch (e) { + // dropIndex can cause queries to throw if these queries yield. + assertAlways.contains(e.code, + [ErrorCodes.QueryPlanKilled, ErrorCodes.OperationFailed], + 'unexpected error code: ' + e.code + ': ' + e.message); + } + }, + + createIndex: function createIndex(db, collName) { + db[collName].createIndex({a: 1}, {background: true}); + }, + + dropIndex: function dropIndex(db, collName) { + db[collName].dropIndex({a: 1}); + }, + + }; + + const transitions = { + snapshotScan: { + insertDocs: 0.17, + updateDocs: 0.16, + deleteDocs: 0.17, + readDocs: 0.16, + createIndex: 0.17, + dropIndex: 0.17, + }, + insertDocs: {snapshotScan: 1.0}, + updateDocs: {snapshotScan: 1.0}, + readDocs: {snapshotScan: 1.0}, + deleteDocs: {snapshotScan: 1.0}, + createIndex: {snapshotScan: 1.0}, + dropIndex: {snapshotScan: 1.0}, + }; + + let minSnapshotHistoryWindowInSecondsDefault; + + function setup(db, collName, cluster) { + // We temporarily increase the minimum snapshot history window to ensure point-in-time reads + // at the initial insert timestamp are valid throughout the duration of this test. + cluster.executeOnMongodNodes((db) => { + const res = db.adminCommand({setParameter: 1, minSnapshotHistoryWindowInSeconds: 3600}); + assert.commandWorked(res); + minSnapshotHistoryWindowInSecondsDefault = res.was; + }); + // We modify chunk history to be larger on config nodes to ensure snapshot reads succeed for + // sharded clusters. + if (cluster.isSharded()) { + cluster.executeOnConfigNodes((db) => { + assert.commandWorked( + db.adminCommand({setParameter: 1, minSnapshotHistoryWindowInSeconds: 3600})); + }); + } + assertWhenOwnColl.commandWorked(db.runCommand({create: collName})); + const docs = [...Array(this.numIds).keys()].map((i) => ({a: i, x: 1})); + assert.commandWorked(db.runCommand({insert: collName, documents: docs})); + assert.commandWorked( + db.runCommand({createIndexes: collName, indexes: [{key: {a: 1}, name: "a_1"}]})); + } + + function teardown(db, collName, cluster) { + cluster.executeOnMongodNodes(function(db) { + assert.commandWorked(db.adminCommand({ + setParameter: 1, + minSnapshotHistoryWindowInSeconds: minSnapshotHistoryWindowInSecondsDefault + })); + }); + if (cluster.isSharded()) { + cluster.executeOnConfigNodes((db) => { + assert.commandWorked(db.adminCommand({ + setParameter: 1, + minSnapshotHistoryWindowInSeconds: minSnapshotHistoryWindowInSecondsDefault + })); + }); + } + } + + return { + threadCount: 5, + iterations: 50, + startState: 'snapshotScan', + states: states, + transitions: transitions, + setup: setup, + teardown: teardown, + data: data, + }; +})(); |