summaryrefslogtreecommitdiff
path: root/jstests/libs/change_stream_util.js
diff options
context:
space:
mode:
Diffstat (limited to 'jstests/libs/change_stream_util.js')
-rw-r--r--jstests/libs/change_stream_util.js139
1 files changed, 98 insertions, 41 deletions
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index c505e47f39f..7431a64627c 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -59,27 +59,28 @@ function assertInvalidateOp({cursor, opType}) {
return null;
}
+function pruneOptionalFields(event, expected) {
+ if (!expected.hasOwnProperty("_id"))
+ delete event._id;
+
+ if (!expected.hasOwnProperty("clusterTime"))
+ delete event.clusterTime;
+
+ if (!expected.hasOwnProperty("txnNumber"))
+ delete event.txnNumber;
+
+ if (!expected.hasOwnProperty("lsid"))
+ delete event.lsid;
+
+ return event;
+}
/**
* Helper to check whether a change stream event matches the given expected event. Ignores the
* resume token and clusterTime unless they are explicitly listed in the expectedEvent.
*/
function assertChangeStreamEventEq(actualEvent, expectedEvent) {
- const testEvent = Object.assign({}, actualEvent);
- if (!expectedEvent.hasOwnProperty("_id")) {
- delete testEvent._id; // Remove the resume token, if present.
- }
- if (!expectedEvent.hasOwnProperty("clusterTime")) {
- delete testEvent.clusterTime; // Remove the cluster time, if present.
- }
+ const testEvent = pruneOptionalFields(Object.assign({}, actualEvent), expectedEvent);
- // The change stream transaction passthrough causes operations to have txnNumber and lsid
- // values that the test doesn't expect, which can cause comparisons to fail.
- if (!expectedEvent.hasOwnProperty("txnNumber")) {
- delete testEvent.txnNumber; // Remove the txnNumber, if present.
- }
- if (!expectedEvent.hasOwnProperty("lsid")) {
- delete testEvent.lsid; // Remove the lsid, if present.
- }
assert.docEq(testEvent,
expectedEvent,
"Change did not match expected change. Expected change: " + tojson(expectedEvent) +
@@ -170,6 +171,34 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
return nextBatch[0];
}
+ self.getNextChanges = function(cursor, numChanges, skipFirst) {
+ let changes = [];
+
+ for (let i = 0; i < numChanges; i++) {
+ // Since the first change may be on the original cursor, we need to check for that
+ // change on the cursor before we move the cursor forward.
+ if (i === 0 && !skipFirst) {
+ changes[0] = getNextDocFromCursor(cursor);
+ if (changes[0]) {
+ continue;
+ }
+ }
+
+ assert.soon(
+ () => {
+ cursor = self.getNextBatch(cursor);
+ changes[i] = getNextDocFromCursor(cursor);
+ return changes[i] !== null;
+ },
+ () => {
+ return "timed out waiting for another result from the change stream, observed changes: " +
+ tojson(changes) + ", expected changes: " + numChanges;
+ });
+ }
+
+ return changes;
+ };
+
/**
* Checks if the change has been invalidated.
*/
@@ -205,8 +234,14 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
*
* Returns a list of the changes seen.
*/
- self.assertNextChangesEqual = function(
- {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) {
+ self.assertNextChangesEqual = function({
+ cursor,
+ expectedChanges,
+ expectedNumChanges,
+ expectInvalidate,
+ skipFirstBatch,
+ ignoreOrder
+ }) {
expectInvalidate = expectInvalidate || false;
skipFirstBatch = skipFirstBatch || false;
@@ -230,25 +265,23 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
expectedNumChanges = expectedChanges.length;
}
- let changes = [];
- for (let i = 0; i < expectedNumChanges; i++) {
- // Since the first change may be on the original cursor, we need to check for that
- // change on the cursor before we move the cursor forward.
- if (i === 0 && !skipFirstBatch) {
- changes[0] = getNextDocFromCursor(cursor);
- if (changes[0]) {
- assertChangeIsExpected(expectedChanges, 0, changes, expectInvalidate);
- continue;
- }
+ let changes = self.getNextChanges(cursor, expectedNumChanges, skipFirstBatch);
+ if (ignoreOrder) {
+ const errMsgFunc = () => `${tojson(changes)} != ${tojson(expectedChanges)}`;
+ assert.eq(changes.length, expectedNumChanges, errMsgFunc);
+ for (let i = 0; i < changes.length; i++) {
+ assert(expectedChanges.some(expectedChange => {
+ return _convertExceptionToReturnStatus(() => {
+ assertChangeStreamEventEq(changes[i], expectedChange);
+ return true;
+ })();
+ }),
+ errMsgFunc);
+ }
+ } else {
+ for (let i = 0; i < changes.length; i++) {
+ assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate);
}
-
- assert.soon(function() {
- // We need to replace the cursor variable so we return the correct cursor.
- cursor = self.getNextBatch(cursor);
- changes[i] = getNextDocFromCursor(cursor);
- return changes[i] !== null;
- }, "timed out waiting for another result from the change stream");
- assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate);
}
// If we expect invalidation, the final change should have operation type "invalidate".
@@ -265,6 +298,25 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
};
/**
+ * Iterates through the change stream and asserts that the next changes are the expected ones.
+ * The order of the change events from the cursor relative to their order in the list of
+ * expected changes is ignored, however.
+ *
+ * Returns a list of the changes seen.
+ */
+ self.assertNextChangesEqualUnordered = function(
+ {cursor, expectedChanges, expectedNumChanges, expectInvalidate, skipFirstBatch}) {
+ return self.assertNextChangesEqual({
+ cursor: cursor,
+ expectedChanges: expectedChanges,
+ expectedNumChanges: expectedNumChanges,
+ expectInvalidate: expectInvalidate,
+ skipFirstBatch: skipFirstBatch,
+ ignoreOrder: true
+ });
+ };
+
+ /**
* Retrieves the next batch in the change stream and confirms that it is empty.
*/
self.assertNoChange = function(cursor) {
@@ -277,12 +329,17 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
* If the current batch has a document in it, that one will be ignored.
*/
self.getOneChange = function(cursor, expectInvalidate = false) {
- changes = self.assertNextChangesEqual({
- cursor: cursor,
- expectedNumChanges: 1,
- expectInvalidate: expectInvalidate,
- skipFirstBatch: true
- });
+ changes = self.getNextChanges(cursor, 1, true);
+
+ if (expectInvalidate) {
+ assert(isInvalidated(changes[changes.length - 1]),
+ "Last change was not invalidated when it was expected: " + tojson(changes));
+
+ // We make sure that the next batch kills the cursor after an invalidation entry.
+ let finalCursor = self.getNextBatch(cursor);
+ assert.eq(finalCursor.id, 0, "Final cursor was not killed: " + tojson(finalCursor));
+ }
+
return changes[0];
};