From 6dae6fed07e6d99cf23597101bcf7f9716c7f498 Mon Sep 17 00:00:00 2001 From: Jordi Serra Torrens Date: Thu, 22 Sep 2022 11:52:55 +0000 Subject: SERVER-69222 Make random_moveChunk_multi_update_delete_change_streamsjs check for the change stream events at the end of the test --- ...moveChunk_multi_update_delete_change_streams.js | 240 ++++++++++----------- 1 file changed, 115 insertions(+), 125 deletions(-) diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_multi_update_delete_change_streams.js b/jstests/concurrency/fsm_workloads/random_moveChunk_multi_update_delete_change_streams.js index 07426c802eb..f9035b2a268 100644 --- a/jstests/concurrency/fsm_workloads/random_moveChunk_multi_update_delete_change_streams.js +++ b/jstests/concurrency/fsm_workloads/random_moveChunk_multi_update_delete_change_streams.js @@ -22,17 +22,9 @@ var $config = extendWorkload($config, function($config, $super) { // partition per thread. $config.data.partitionSize = 100; - // Queue of pending change stream events regarding this tid's documents expected to be seen. - // Only used when running in transaction suites (due to SERVER-20361, because - // multi-updates/deletes may be applied 0, 1 or more times, thus we cannot foresee the exact - // number of change stream events that will be produced for each multi write). - $config.data.pendingEvents = []; - - // Lists that store the update/delete change stream events seen by this tid. Used to check that - // no duplicate events (e.g due to writes on orphans) are received. Only used in non-transaction - // suites, since for transaction suites 'pendingEvents' already tests the uniqueness property. - $config.data.seenUpdates = []; - $config.data.seenDeletes = []; + // Keep track of the number of write operations. Used only as a sort element for the operations + // side-table. + $config.data.writesCount = 0; const withSkipRetryOnNetworkError = (fn) => { const previousSkipRetryOnNetworkError = TestData.skipRetryOnNetworkError; @@ -91,11 +83,18 @@ var $config = extendWorkload($config, function($config, $super) { // this update. const expectedDocsIndex = this.expectedDocs.findIndex(item => item._id === id); if (expectedDocsIndex != -1) { - this.pendingEvents.push({ - operationType: 'update', - _id: id, - counter: this.expectedDocs[expectedDocsIndex].counter + 1 - }); + // Log this operation on a side collection. It will be used later to validate the change + // stream events. + assert.commandWorked(db["operations"].insert({ + tid: this.tid, + iteration: this.writesCount, + operationDetails: { + operationType: 'update', + documentId: id, + counter: this.expectedDocs[expectedDocsIndex].counter + 1 + } + })); + this.writesCount++; // Update the in-memory representation of the updated document. this.expectedDocs[expectedDocsIndex].counter++; @@ -130,7 +129,14 @@ var $config = extendWorkload($config, function($config, $super) { // If the document existed, we expect a change stream event to be eventually seen regarding // this delete. if (this.expectedDocs.some(item => item._id === id)) { - this.pendingEvents.push({operationType: 'delete', _id: id}); + // Log this operation on a side collection. It will be used later to validate the change + // stream events. + assert.commandWorked(db["operations"].insert({ + tid: this.tid, + iteration: this.writesCount, + operationDetails: {operationType: 'delete', documentId: id} + })); + this.writesCount++; } // Remove the in-memory representation of the deleted document. @@ -151,119 +157,96 @@ var $config = extendWorkload($config, function($config, $super) { } }; - $config.states.checkChangeStream = function init(db, collName, connCache) { - // Cannot establish changeStream in a txn. - fsm.forceRunningOutsideTransaction(this); - - const establishChangeStreamCursor = () => { - const csOptions = this.resumeToken ? {resumeAfter: this.resumeToken} : { - startAtOperationTime: - Timestamp(this.startAtOperationTime.t, this.startAtOperationTime.i) - }; - - return db[collName].watch([], csOptions); - }; - - const consumeChangeStream = () => { - let changeStream = establishChangeStreamCursor(); - - let events = []; - while (changeStream.hasNext()) { - events.push(changeStream.next()); - } - - changeStream.close(); - return events; - }; - - const waitForChangeStreamEvent = () => { - runWithManualRetriesIfInStepdownSuite(() => { - let changeStream = establishChangeStreamCursor(); - assert.soon(() => changeStream.hasNext()); - changeStream.close(); - return true; - }); - }; + function checkChangeStream(db, collName, startAtOperationTime) { + // Lists that store the update/delete change stream events seen by this tid. Used to check + // that no duplicate events (e.g due to writes on orphans) are received. Only used in + // non-transaction suites. For transaction suites we do a strict check on the expected + // events. + var seenUpdates = []; + var seenDeletes = []; + + // Read the operations that the workload threads did. + var operationsByTid = {}; // tid -> [operations] + for (var tid = 0; tid < $config.threadCount; ++tid) { + operationsByTid[tid] = db["operations"].find({tid: tid}).sort({iteration: 1}).toArray(); + } + // Check change stream + const changeStream = db[collName].watch([], {startAtOperationTime: startAtOperationTime}); while (true) { - const events = runWithManualRetriesIfInStepdownSuite(consumeChangeStream); - if (events.length > 0) { - const lastEvent = events[events.length - 1]; - this.resumeToken = lastEvent._id; - jsTest.log("tid:" + this.tid + - " timestamp of the last event:" + tojson(lastEvent.clusterTime)); + assert.soon(() => changeStream.hasNext()); + const event = changeStream.next(); + + if (event.operationType === 'drop') { + jsTest.log("tid: + " + tid + " checkChangeStream saw drop event"); + break; } - events.forEach(event => { - // Only process events related to this tid's documents. - if (this.ownedIds[collName].includes(event.documentKey._id)) { - if (TestData.runInsideTransaction) { - assertAlways( - this.pendingEvents.length > 0, - "Did not expect to see any change stream event regarding this tid's documents. Event: " + - tojson(event)); - - jsTest.log("tid:" + this.tid + " changeStream event: operationType: " + - event.operationType + "; _id: " + event.documentKey._id); - - // Check that the received event corresponds to the next expected event. - const nextPendingEvent = this.pendingEvents.shift(); - assertAlways.eq(nextPendingEvent.operationType, event.operationType); - assertAlways.eq(nextPendingEvent._id, event.documentKey._id); - - if (event.operationType === 'update') { - assert.eq(nextPendingEvent.counter, - event.updateDescription.updatedFields.counter); - } - } else { - // Check that no duplicate events are seen on the change stream. - // - For deletes this means that we should not see the same document deleted - // more than once (since the workload does not perform any inserts after - // setup). - // - For updates, this means that for each document, we should never see the - // same updated value more than once. This is because the updates are {$inc: - // 1}, so they must be strictly incrementing. - if (event.operationType === 'delete') { - assertAlways(!this.seenDeletes.includes(event.documentKey._id), - "Found duplicate change stream event for delete on _id: " + - event.documentKey._id); - this.seenDeletes.push(event.documentKey._id); - } else if (event.operationType === 'update') { - const idAndUpdate = { - _id: event.documentKey._id, - updatedFields: event.updateDescription.updatedFields - }; - - assert(!this.seenUpdates.some( - item => item._id === idAndUpdate._id && - bsonWoCompare(item.updatedFields, - idAndUpdate.updatedFields) == 0), - "Found duplicate change stream event for update on _id: " + - event.documentKey._id + ", update: " + - tojson(event.updateDescription.updatedFields)); - this.seenUpdates.push(idAndUpdate); - } + jsTest.log("changeStream event: operationType: " + event.operationType + + "; _id: " + tojson(event.documentKey)); + + if (TestData.runInsideTransaction) { + // Check that this event corresponds to the next outstanding operation one of the + // worker threads did. + var found = false; + for (var tid = 0; tid < $config.threadCount; ++tid) { + const nextOperationForTid = operationsByTid[tid][0]; + if (nextOperationForTid && + nextOperationForTid.operationDetails.operationType === + event.operationType && + nextOperationForTid.operationDetails.documentId === event.documentKey._id) { + found = true; + + // Remove that operation from the array of outstanding operations. + operationsByTid[tid].shift(); + break; } } - }); - - // If running in a txn suite, ensure we eventually see all pending events. We can only - // do this in txn suites because on non-txn suites we have no guarantee that a - // multi-update/delete actually was actually applied on all the intended documents - // (SERVER-20361). - if (TestData.runInsideTransaction && this.pendingEvents.length > 0) { - jsTest.log("tid:" + this.tid + - " waiting for more change stream events. Next expected event: " + - tojson(this.pendingEvents[0])); - waitForChangeStreamEvent(); - jsTest.log("tid:" + this.tid + - " now there should be more change stream events available"); - continue; + assertAlways( + found, + "did not find worker thread operation matching the change stream event: " + + tojson(event) + "; Outstanding operations: " + tojson(operationsByTid)); + } else { + // Check that no duplicate events are seen on the change stream. + // - For deletes this means that we should not see the same document deleted more + // than once (since the workload does not perform any inserts after setup). + // - For updates, this means that for each document, we should never see the same + // updated value more than once. This is because the updates are {$inc: 1}, so they + // must be strictly incrementing. + if (event.operationType === 'delete') { + assertAlways(!seenDeletes.includes(event.documentKey._id), + "Found duplicate change stream event for delete on _id: " + + event.documentKey._id); + seenDeletes.push(event.documentKey._id); + } else if (event.operationType === 'update') { + const idAndUpdate = { + _id: event.documentKey._id, + updatedFields: event.updateDescription.updatedFields + }; + + assert(!seenUpdates.some(item => item._id === idAndUpdate._id && + bsonWoCompare(item.updatedFields, + idAndUpdate.updatedFields) == 0), + "Found duplicate change stream event for update on _id: " + + event.documentKey._id + + ", update: " + tojson(event.updateDescription.updatedFields)); + seenUpdates.push(idAndUpdate); + } } + } - break; + // If running in a txn suite, ensure we eventually see all events. We can only do this in + // txn suites because on non-txn suites we have no guarantee that a multi-update/delete + // actually was actually applied on all the intended documents (SERVER-20361). + if (TestData.runInsideTransaction) { + for (var tid = 0; tid < $config.threadCount; ++tid) { + assertAlways( + operationsByTid[tid].length === 0, + "Did not observe change stream event for all worker thread operations. Outstanding operations: " + + tojson(operationsByTid)); + } } - }; + } let previousWritePeriodicNoops; @@ -302,15 +285,22 @@ var $config = extendWorkload($config, function($config, $super) { db.adminCommand({setParameter: 1, writePeriodicNoops: previousWritePeriodicNoops})); }); + // Drop the collection as to have a sentinel event (drop) on the change stream. + assertAlways(db[collName].drop()); + + // Validate the change stream events. + var startAtOperationTime = + Timestamp(this.startAtOperationTime.t, this.startAtOperationTime.i); + checkChangeStream(db, collName, startAtOperationTime); + $super.teardown.apply(this, arguments); }; $config.transitions = { init: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4}, - moveChunk: {moveChunk: 0.2, multiUpdate: 0.3, multiDelete: 0.3, checkChangeStream: 0.2}, - multiUpdate: {moveChunk: 0.2, multiUpdate: 0.3, multiDelete: 0.3, checkChangeStream: 0.2}, - multiDelete: {moveChunk: 0.2, multiUpdate: 0.3, multiDelete: 0.3, checkChangeStream: 0.2}, - checkChangeStream: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4} + moveChunk: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4}, + multiUpdate: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4}, + multiDelete: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4}, }; return $config; -- cgit v1.2.1