summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Serra Torrens <jordi.serra-torrens@mongodb.com>2022-09-22 11:52:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-22 12:41:18 +0000
commit6dae6fed07e6d99cf23597101bcf7f9716c7f498 (patch)
tree76a06f573ee4e8f75ae32d1e8f0c2469bda20da3
parent062d4605754f889093a110260695c0ff659b23ef (diff)
downloadmongo-6dae6fed07e6d99cf23597101bcf7f9716c7f498.tar.gz
SERVER-69222 Make random_moveChunk_multi_update_delete_change_streamsjs check for the change stream events at the end of the test
-rw-r--r--jstests/concurrency/fsm_workloads/random_moveChunk_multi_update_delete_change_streams.js240
1 files changed, 115 insertions, 125 deletions
diff --git a/jstests/concurrency/fsm_workloads/random_moveChunk_multi_update_delete_change_streams.js b/jstests/concurrency/fsm_workloads/random_moveChunk_multi_update_delete_change_streams.js
index 07426c802eb..f9035b2a268 100644
--- a/jstests/concurrency/fsm_workloads/random_moveChunk_multi_update_delete_change_streams.js
+++ b/jstests/concurrency/fsm_workloads/random_moveChunk_multi_update_delete_change_streams.js
@@ -22,17 +22,9 @@ var $config = extendWorkload($config, function($config, $super) {
// partition per thread.
$config.data.partitionSize = 100;
- // Queue of pending change stream events regarding this tid's documents expected to be seen.
- // Only used when running in transaction suites (due to SERVER-20361, because
- // multi-updates/deletes may be applied 0, 1 or more times, thus we cannot foresee the exact
- // number of change stream events that will be produced for each multi write).
- $config.data.pendingEvents = [];
-
- // Lists that store the update/delete change stream events seen by this tid. Used to check that
- // no duplicate events (e.g due to writes on orphans) are received. Only used in non-transaction
- // suites, since for transaction suites 'pendingEvents' already tests the uniqueness property.
- $config.data.seenUpdates = [];
- $config.data.seenDeletes = [];
+ // Keep track of the number of write operations. Used only as a sort element for the operations
+ // side-table.
+ $config.data.writesCount = 0;
const withSkipRetryOnNetworkError = (fn) => {
const previousSkipRetryOnNetworkError = TestData.skipRetryOnNetworkError;
@@ -91,11 +83,18 @@ var $config = extendWorkload($config, function($config, $super) {
// this update.
const expectedDocsIndex = this.expectedDocs.findIndex(item => item._id === id);
if (expectedDocsIndex != -1) {
- this.pendingEvents.push({
- operationType: 'update',
- _id: id,
- counter: this.expectedDocs[expectedDocsIndex].counter + 1
- });
+ // Log this operation on a side collection. It will be used later to validate the change
+ // stream events.
+ assert.commandWorked(db["operations"].insert({
+ tid: this.tid,
+ iteration: this.writesCount,
+ operationDetails: {
+ operationType: 'update',
+ documentId: id,
+ counter: this.expectedDocs[expectedDocsIndex].counter + 1
+ }
+ }));
+ this.writesCount++;
// Update the in-memory representation of the updated document.
this.expectedDocs[expectedDocsIndex].counter++;
@@ -130,7 +129,14 @@ var $config = extendWorkload($config, function($config, $super) {
// If the document existed, we expect a change stream event to be eventually seen regarding
// this delete.
if (this.expectedDocs.some(item => item._id === id)) {
- this.pendingEvents.push({operationType: 'delete', _id: id});
+ // Log this operation on a side collection. It will be used later to validate the change
+ // stream events.
+ assert.commandWorked(db["operations"].insert({
+ tid: this.tid,
+ iteration: this.writesCount,
+ operationDetails: {operationType: 'delete', documentId: id}
+ }));
+ this.writesCount++;
}
// Remove the in-memory representation of the deleted document.
@@ -151,119 +157,96 @@ var $config = extendWorkload($config, function($config, $super) {
}
};
- $config.states.checkChangeStream = function init(db, collName, connCache) {
- // Cannot establish changeStream in a txn.
- fsm.forceRunningOutsideTransaction(this);
-
- const establishChangeStreamCursor = () => {
- const csOptions = this.resumeToken ? {resumeAfter: this.resumeToken} : {
- startAtOperationTime:
- Timestamp(this.startAtOperationTime.t, this.startAtOperationTime.i)
- };
-
- return db[collName].watch([], csOptions);
- };
-
- const consumeChangeStream = () => {
- let changeStream = establishChangeStreamCursor();
-
- let events = [];
- while (changeStream.hasNext()) {
- events.push(changeStream.next());
- }
-
- changeStream.close();
- return events;
- };
-
- const waitForChangeStreamEvent = () => {
- runWithManualRetriesIfInStepdownSuite(() => {
- let changeStream = establishChangeStreamCursor();
- assert.soon(() => changeStream.hasNext());
- changeStream.close();
- return true;
- });
- };
+ function checkChangeStream(db, collName, startAtOperationTime) {
+ // Lists that store the update/delete change stream events seen by this tid. Used to check
+ // that no duplicate events (e.g due to writes on orphans) are received. Only used in
+ // non-transaction suites. For transaction suites we do a strict check on the expected
+ // events.
+ var seenUpdates = [];
+ var seenDeletes = [];
+
+ // Read the operations that the workload threads did.
+ var operationsByTid = {}; // tid -> [operations]
+ for (var tid = 0; tid < $config.threadCount; ++tid) {
+ operationsByTid[tid] = db["operations"].find({tid: tid}).sort({iteration: 1}).toArray();
+ }
+ // Check change stream
+ const changeStream = db[collName].watch([], {startAtOperationTime: startAtOperationTime});
while (true) {
- const events = runWithManualRetriesIfInStepdownSuite(consumeChangeStream);
- if (events.length > 0) {
- const lastEvent = events[events.length - 1];
- this.resumeToken = lastEvent._id;
- jsTest.log("tid:" + this.tid +
- " timestamp of the last event:" + tojson(lastEvent.clusterTime));
+ assert.soon(() => changeStream.hasNext());
+ const event = changeStream.next();
+
+ if (event.operationType === 'drop') {
+ jsTest.log("tid: + " + tid + " checkChangeStream saw drop event");
+ break;
}
- events.forEach(event => {
- // Only process events related to this tid's documents.
- if (this.ownedIds[collName].includes(event.documentKey._id)) {
- if (TestData.runInsideTransaction) {
- assertAlways(
- this.pendingEvents.length > 0,
- "Did not expect to see any change stream event regarding this tid's documents. Event: " +
- tojson(event));
-
- jsTest.log("tid:" + this.tid + " changeStream event: operationType: " +
- event.operationType + "; _id: " + event.documentKey._id);
-
- // Check that the received event corresponds to the next expected event.
- const nextPendingEvent = this.pendingEvents.shift();
- assertAlways.eq(nextPendingEvent.operationType, event.operationType);
- assertAlways.eq(nextPendingEvent._id, event.documentKey._id);
-
- if (event.operationType === 'update') {
- assert.eq(nextPendingEvent.counter,
- event.updateDescription.updatedFields.counter);
- }
- } else {
- // Check that no duplicate events are seen on the change stream.
- // - For deletes this means that we should not see the same document deleted
- // more than once (since the workload does not perform any inserts after
- // setup).
- // - For updates, this means that for each document, we should never see the
- // same updated value more than once. This is because the updates are {$inc:
- // 1}, so they must be strictly incrementing.
- if (event.operationType === 'delete') {
- assertAlways(!this.seenDeletes.includes(event.documentKey._id),
- "Found duplicate change stream event for delete on _id: " +
- event.documentKey._id);
- this.seenDeletes.push(event.documentKey._id);
- } else if (event.operationType === 'update') {
- const idAndUpdate = {
- _id: event.documentKey._id,
- updatedFields: event.updateDescription.updatedFields
- };
-
- assert(!this.seenUpdates.some(
- item => item._id === idAndUpdate._id &&
- bsonWoCompare(item.updatedFields,
- idAndUpdate.updatedFields) == 0),
- "Found duplicate change stream event for update on _id: " +
- event.documentKey._id + ", update: " +
- tojson(event.updateDescription.updatedFields));
- this.seenUpdates.push(idAndUpdate);
- }
+ jsTest.log("changeStream event: operationType: " + event.operationType +
+ "; _id: " + tojson(event.documentKey));
+
+ if (TestData.runInsideTransaction) {
+ // Check that this event corresponds to the next outstanding operation one of the
+ // worker threads did.
+ var found = false;
+ for (var tid = 0; tid < $config.threadCount; ++tid) {
+ const nextOperationForTid = operationsByTid[tid][0];
+ if (nextOperationForTid &&
+ nextOperationForTid.operationDetails.operationType ===
+ event.operationType &&
+ nextOperationForTid.operationDetails.documentId === event.documentKey._id) {
+ found = true;
+
+ // Remove that operation from the array of outstanding operations.
+ operationsByTid[tid].shift();
+ break;
}
}
- });
-
- // If running in a txn suite, ensure we eventually see all pending events. We can only
- // do this in txn suites because on non-txn suites we have no guarantee that a
- // multi-update/delete actually was actually applied on all the intended documents
- // (SERVER-20361).
- if (TestData.runInsideTransaction && this.pendingEvents.length > 0) {
- jsTest.log("tid:" + this.tid +
- " waiting for more change stream events. Next expected event: " +
- tojson(this.pendingEvents[0]));
- waitForChangeStreamEvent();
- jsTest.log("tid:" + this.tid +
- " now there should be more change stream events available");
- continue;
+ assertAlways(
+ found,
+ "did not find worker thread operation matching the change stream event: " +
+ tojson(event) + "; Outstanding operations: " + tojson(operationsByTid));
+ } else {
+ // Check that no duplicate events are seen on the change stream.
+ // - For deletes this means that we should not see the same document deleted more
+ // than once (since the workload does not perform any inserts after setup).
+ // - For updates, this means that for each document, we should never see the same
+ // updated value more than once. This is because the updates are {$inc: 1}, so they
+ // must be strictly incrementing.
+ if (event.operationType === 'delete') {
+ assertAlways(!seenDeletes.includes(event.documentKey._id),
+ "Found duplicate change stream event for delete on _id: " +
+ event.documentKey._id);
+ seenDeletes.push(event.documentKey._id);
+ } else if (event.operationType === 'update') {
+ const idAndUpdate = {
+ _id: event.documentKey._id,
+ updatedFields: event.updateDescription.updatedFields
+ };
+
+ assert(!seenUpdates.some(item => item._id === idAndUpdate._id &&
+ bsonWoCompare(item.updatedFields,
+ idAndUpdate.updatedFields) == 0),
+ "Found duplicate change stream event for update on _id: " +
+ event.documentKey._id +
+ ", update: " + tojson(event.updateDescription.updatedFields));
+ seenUpdates.push(idAndUpdate);
+ }
}
+ }
- break;
+ // If running in a txn suite, ensure we eventually see all events. We can only do this in
+ // txn suites because on non-txn suites we have no guarantee that a multi-update/delete
+ // actually was actually applied on all the intended documents (SERVER-20361).
+ if (TestData.runInsideTransaction) {
+ for (var tid = 0; tid < $config.threadCount; ++tid) {
+ assertAlways(
+ operationsByTid[tid].length === 0,
+ "Did not observe change stream event for all worker thread operations. Outstanding operations: " +
+ tojson(operationsByTid));
+ }
}
- };
+ }
let previousWritePeriodicNoops;
@@ -302,15 +285,22 @@ var $config = extendWorkload($config, function($config, $super) {
db.adminCommand({setParameter: 1, writePeriodicNoops: previousWritePeriodicNoops}));
});
+ // Drop the collection as to have a sentinel event (drop) on the change stream.
+ assertAlways(db[collName].drop());
+
+ // Validate the change stream events.
+ var startAtOperationTime =
+ Timestamp(this.startAtOperationTime.t, this.startAtOperationTime.i);
+ checkChangeStream(db, collName, startAtOperationTime);
+
$super.teardown.apply(this, arguments);
};
$config.transitions = {
init: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4},
- moveChunk: {moveChunk: 0.2, multiUpdate: 0.3, multiDelete: 0.3, checkChangeStream: 0.2},
- multiUpdate: {moveChunk: 0.2, multiUpdate: 0.3, multiDelete: 0.3, checkChangeStream: 0.2},
- multiDelete: {moveChunk: 0.2, multiUpdate: 0.3, multiDelete: 0.3, checkChangeStream: 0.2},
- checkChangeStream: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4}
+ moveChunk: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4},
+ multiUpdate: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4},
+ multiDelete: {moveChunk: 0.2, multiUpdate: 0.4, multiDelete: 0.4},
};
return $config;