diff options
Diffstat (limited to 'jstests/libs/change_stream_util.js')
-rw-r--r-- | jstests/libs/change_stream_util.js | 139 |
1 files changed, 98 insertions, 41 deletions
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index c505e47f39f..7431a64627c 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -59,27 +59,28 @@ function assertInvalidateOp({cursor, opType}) { return null; } +function pruneOptionalFields(event, expected) { + if (!expected.hasOwnProperty("_id")) + delete event._id; + + if (!expected.hasOwnProperty("clusterTime")) + delete event.clusterTime; + + if (!expected.hasOwnProperty("txnNumber")) + delete event.txnNumber; + + if (!expected.hasOwnProperty("lsid")) + delete event.lsid; + + return event; +} /** * Helper to check whether a change stream event matches the given expected event. Ignores the * resume token and clusterTime unless they are explicitly listed in the expectedEvent. */ function assertChangeStreamEventEq(actualEvent, expectedEvent) { - const testEvent = Object.assign({}, actualEvent); - if (!expectedEvent.hasOwnProperty("_id")) { - delete testEvent._id; // Remove the resume token, if present. - } - if (!expectedEvent.hasOwnProperty("clusterTime")) { - delete testEvent.clusterTime; // Remove the cluster time, if present. - } + const testEvent = pruneOptionalFields(Object.assign({}, actualEvent), expectedEvent); - // The change stream transaction passthrough causes operations to have txnNumber and lsid - // values that the test doesn't expect, which can cause comparisons to fail. - if (!expectedEvent.hasOwnProperty("txnNumber")) { - delete testEvent.txnNumber; // Remove the txnNumber, if present. - } - if (!expectedEvent.hasOwnProperty("lsid")) { - delete testEvent.lsid; // Remove the lsid, if present. - } assert.docEq(testEvent, expectedEvent, "Change did not match expected change. Expected change: " + tojson(expectedEvent) + @@ -170,6 +171,34 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { return nextBatch[0]; } + self.getNextChanges = function(cursor, numChanges, skipFirst) { + let changes = []; + + for (let i = 0; i < numChanges; i++) { + // Since the first change may be on the original cursor, we need to check for that + // change on the cursor before we move the cursor forward. + if (i === 0 && !skipFirst) { + changes[0] = getNextDocFromCursor(cursor); + if (changes[0]) { + continue; + } + } + + assert.soon( + () => { + cursor = self.getNextBatch(cursor); + changes[i] = getNextDocFromCursor(cursor); + return changes[i] !== null; + }, + () => { + return "timed out waiting for another result from the change stream, observed changes: " + + tojson(changes) + ", expected changes: " + numChanges; + }); + } + + return changes; + }; + /** * Checks if the change has been invalidated. */ @@ -205,8 +234,14 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * * Returns a list of the changes seen. */ - self.assertNextChangesEqual = function( - {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) { + self.assertNextChangesEqual = function({ + cursor, + expectedChanges, + expectedNumChanges, + expectInvalidate, + skipFirstBatch, + ignoreOrder + }) { expectInvalidate = expectInvalidate || false; skipFirstBatch = skipFirstBatch || false; @@ -230,25 +265,23 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { expectedNumChanges = expectedChanges.length; } - let changes = []; - for (let i = 0; i < expectedNumChanges; i++) { - // Since the first change may be on the original cursor, we need to check for that - // change on the cursor before we move the cursor forward. - if (i === 0 && !skipFirstBatch) { - changes[0] = getNextDocFromCursor(cursor); - if (changes[0]) { - assertChangeIsExpected(expectedChanges, 0, changes, expectInvalidate); - continue; - } + let changes = self.getNextChanges(cursor, expectedNumChanges, skipFirstBatch); + if (ignoreOrder) { + const errMsgFunc = () => `${tojson(changes)} != ${tojson(expectedChanges)}`; + assert.eq(changes.length, expectedNumChanges, errMsgFunc); + for (let i = 0; i < changes.length; i++) { + assert(expectedChanges.some(expectedChange => { + return _convertExceptionToReturnStatus(() => { + assertChangeStreamEventEq(changes[i], expectedChange); + return true; + })(); + }), + errMsgFunc); + } + } else { + for (let i = 0; i < changes.length; i++) { + assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate); } - - assert.soon(function() { - // We need to replace the cursor variable so we return the correct cursor. - cursor = self.getNextBatch(cursor); - changes[i] = getNextDocFromCursor(cursor); - return changes[i] !== null; - }, "timed out waiting for another result from the change stream"); - assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate); } // If we expect invalidation, the final change should have operation type "invalidate". @@ -265,6 +298,25 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { }; /** + * Iterates through the change stream and asserts that the next changes are the expected ones. + * The order of the change events from the cursor relative to their order in the list of + * expected changes is ignored, however. + * + * Returns a list of the changes seen. + */ + self.assertNextChangesEqualUnordered = function( + {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) { + return self.assertNextChangesEqual({ + cursor: cursor, + expectedChanges: expectedChanges, + expectedNumChanges: expectedNumChanges, + expectInvalidate: expectInvalidate, + skipFirstBatch: skipFirstBatch, + ignoreOrder: true + }); + }; + + /** * Retrieves the next batch in the change stream and confirms that it is empty. */ self.assertNoChange = function(cursor) { @@ -277,12 +329,17 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") { * If the current batch has a document in it, that one will be ignored. */ self.getOneChange = function(cursor, expectInvalidate = false) { - changes = self.assertNextChangesEqual({ - cursor: cursor, - expectedNumChanges: 1, - expectInvalidate: expectInvalidate, - skipFirstBatch: true - }); + changes = self.getNextChanges(cursor, 1, true); + + if (expectInvalidate) { + assert(isInvalidated(changes[changes.length - 1]), + "Last change was not invalidated when it was expected: " + tojson(changes)); + + // We make sure that the next batch kills the cursor after an invalidation entry. + let finalCursor = self.getNextBatch(cursor); + assert.eq(finalCursor.id, 0, "Final cursor was not killed: " + tojson(finalCursor)); + } + return changes[0]; }; |