diff options
author | Alex Taskov <alex.taskov@mongodb.com> | 2021-05-14 10:28:26 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-05-14 10:41:47 +0000 |
commit | 955cd836df419fc13c453d56e67f7a98981fc1fb (patch) | |
tree | c3d47325248738b3aea975e9a32bdef8350ce1bf | |
parent | 9aef163066b2d4197d328fb112307c40f840a541 (diff) | |
download | mongo-955cd836df419fc13c453d56e67f7a98981fc1fb.tar.gz |
SERVER-40293 change_stream.js removal test should do an unordered check on the change stream
(cherry picked from commit 7702287e8db460034ce405dbcc7ef1529f5837fe)
-rw-r--r-- | jstests/change_streams/change_stream.js | 2 | ||||
-rw-r--r-- | jstests/libs/change_stream_util.js | 109 |
2 files changed, 71 insertions, 40 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 9f41255c599..7f3128ca626 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -180,7 +180,7 @@ operationType: "delete", } ]; - cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected}); + cst.assertNextChangesEqualUnordered(cursor, expected); jsTestLog("Testing intervening write on another collection"); cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index b52896d08b7..07740f9392f 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -53,27 +53,28 @@ function assertInvalidateOp({cursor, opType}) { } } +function pruneOptionalFields(event, expected) { + if (!expected.hasOwnProperty("_id")) + delete event._id; + + if (!expected.hasOwnProperty("clusterTime")) + delete event.clusterTime; + + if (!expected.hasOwnProperty("txnNumber")) + delete event.txnNumber; + + if (!expected.hasOwnProperty("lsid")) + delete event.lsid; + + return event; +} /** * Helper to check whether a change stream event matches the given expected event. Ignores the * resume token and clusterTime unless they are explicitly listed in the expectedEvent. */ function assertChangeStreamEventEq(actualEvent, expectedEvent) { - const testEvent = Object.assign({}, actualEvent); - if (!expectedEvent.hasOwnProperty("_id")) { - delete testEvent._id; // Remove the resume token, if present. - } - if (!expectedEvent.hasOwnProperty("clusterTime")) { - delete testEvent.clusterTime; // Remove the cluster time, if present. - } + const testEvent = pruneOptionalFields(Object.assign({}, actualEvent), expectedEvent); - // The change stream transaction passthrough causes operations to have txnNumber and lsid - // values that the test doesn't expect, which can cause comparisons to fail. - if (!expectedEvent.hasOwnProperty("txnNumber")) { - delete testEvent.txnNumber; // Remove the txnNumber, if present. - } - if (!expectedEvent.hasOwnProperty("lsid")) { - delete testEvent.lsid; // Remove the lsid, if present. - } assert.docEq(testEvent, expectedEvent, "Change did not match expected change. Expected change: " + tojson(expectedEvent) + @@ -164,6 +165,34 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { return nextBatch[0]; } + self.getNextChanges = function(cursor, numChanges, skipFirst) { + let changes = []; + + for (let i = 0; i < numChanges; i++) { + // Since the first change may be on the original cursor, we need to check for that + // change on the cursor before we move the cursor forward. + if (i === 0 && !skipFirst) { + changes[0] = getNextDocFromCursor(cursor); + if (changes[0]) { + continue; + } + } + + assert.soon( + () => { + cursor = self.getNextBatch(cursor); + changes[i] = getNextDocFromCursor(cursor); + return changes[i] !== null; + }, + () => { + return "timed out waiting for another result from the change stream, observed changes: " + + tojson(changes) + ", expected changes: " + numChanges; + }); + } + + return changes; + }; + /** * Checks if the change has been invalidated. */ @@ -233,24 +262,8 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { expectedNumChanges = expectedChanges.length; } - let changes = []; - for (let i = 0; i < expectedNumChanges; i++) { - // Since the first change may be on the original cursor, we need to check for that - // change on the cursor before we move the cursor forward. - if (i === 0 && !skipFirstBatch) { - changes[0] = getNextDocFromCursor(cursor); - if (changes[0]) { - assertChangeIsExpected(expectedChanges, 0, changes, expectInvalidate); - continue; - } - } - - assert.soon(function() { - // We need to replace the cursor variable so we return the correct cursor. - cursor = self.getNextBatch(cursor); - changes[i] = getNextDocFromCursor(cursor); - return changes[i] !== null; - }, "timed out waiting for another result from the change stream"); + let changes = self.getNextChanges(cursor, expectedNumChanges, skipFirstBatch); + for (let i = 0; i < changes.length; i++) { assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate); } @@ -268,6 +281,19 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { }; /** + * Iterates through the change stream and asserts that the next 'expected.length' changes are + * among the expected ones. The order of the change events from the cursor relative to their + * order in the list of expected changes is ignored, however. + */ + self.assertNextChangesEqualUnordered = function(cursor, expected) { + const changes = self.getNextChanges(cursor, expected.length).map(event => { + return pruneOptionalFields(event, expected[0]); + }); + + assert.sameMembers(changes, expected); + }; + + /** * Retrieves the next batch in the change stream and confirms that it is empty. */ self.assertNoChange = function(cursor) { @@ -280,12 +306,17 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * If the current batch has a document in it, that one will be ignored. */ self.getOneChange = function(cursor, expectInvalidate = false) { - changes = self.assertNextChangesEqual({ - cursor: cursor, - expectedNumChanges: 1, - expectInvalidate: expectInvalidate, - skipFirstBatch: true - }); + changes = self.getNextChanges(cursor, 1, true); + + if (expectInvalidate) { + assert(isInvalidated(changes[changes.length - 1]), + "Last change was not invalidated when it was expected: " + tojson(changes)); + + // We make sure that the next batch kills the cursor after an invalidation entry. + let finalCursor = self.getNextBatch(cursor); + assert.eq(finalCursor.id, 0, "Final cursor was not killed: " + tojson(finalCursor)); + } + return changes[0]; }; |