summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Taskov <alex.taskov@mongodb.com>2021-05-14 10:28:26 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-05-14 10:41:47 +0000
commit955cd836df419fc13c453d56e67f7a98981fc1fb (patch)
treec3d47325248738b3aea975e9a32bdef8350ce1bf
parent9aef163066b2d4197d328fb112307c40f840a541 (diff)
downloadmongo-955cd836df419fc13c453d56e67f7a98981fc1fb.tar.gz
SERVER-40293 change_stream.js removal test should do an unordered check on the change stream
(cherry picked from commit 7702287e8db460034ce405dbcc7ef1529f5837fe)
-rw-r--r--jstests/change_streams/change_stream.js2
-rw-r--r--jstests/libs/change_stream_util.js109
2 files changed, 71 insertions, 40 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 9f41255c599..7f3128ca626 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -180,7 +180,7 @@
operationType: "delete",
}
];
- cst.assertNextChangesEqual({cursor: cursor, expectedChanges: expected});
+ cst.assertNextChangesEqualUnordered(cursor, expected);
jsTestLog("Testing intervening write on another collection");
cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index b52896d08b7..07740f9392f 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -53,27 +53,28 @@ function assertInvalidateOp({cursor, opType}) {
}
}
+function pruneOptionalFields(event, expected) {
+ if (!expected.hasOwnProperty("_id"))
+ delete event._id;
+
+ if (!expected.hasOwnProperty("clusterTime"))
+ delete event.clusterTime;
+
+ if (!expected.hasOwnProperty("txnNumber"))
+ delete event.txnNumber;
+
+ if (!expected.hasOwnProperty("lsid"))
+ delete event.lsid;
+
+ return event;
+}
/**
* Helper to check whether a change stream event matches the given expected event. Ignores the
* resume token and clusterTime unless they are explicitly listed in the expectedEvent.
*/
function assertChangeStreamEventEq(actualEvent, expectedEvent) {
- const testEvent = Object.assign({}, actualEvent);
- if (!expectedEvent.hasOwnProperty("_id")) {
- delete testEvent._id; // Remove the resume token, if present.
- }
- if (!expectedEvent.hasOwnProperty("clusterTime")) {
- delete testEvent.clusterTime; // Remove the cluster time, if present.
- }
+ const testEvent = pruneOptionalFields(Object.assign({}, actualEvent), expectedEvent);
- // The change stream transaction passthrough causes operations to have txnNumber and lsid
- // values that the test doesn't expect, which can cause comparisons to fail.
- if (!expectedEvent.hasOwnProperty("txnNumber")) {
- delete testEvent.txnNumber; // Remove the txnNumber, if present.
- }
- if (!expectedEvent.hasOwnProperty("lsid")) {
- delete testEvent.lsid; // Remove the lsid, if present.
- }
assert.docEq(testEvent,
expectedEvent,
"Change did not match expected change. Expected change: " + tojson(expectedEvent) +
@@ -164,6 +165,34 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
return nextBatch[0];
}
+ self.getNextChanges = function(cursor, numChanges, skipFirst) {
+ let changes = [];
+
+ for (let i = 0; i < numChanges; i++) {
+ // Since the first change may be on the original cursor, we need to check for that
+ // change on the cursor before we move the cursor forward.
+ if (i === 0 && !skipFirst) {
+ changes[0] = getNextDocFromCursor(cursor);
+ if (changes[0]) {
+ continue;
+ }
+ }
+
+ assert.soon(
+ () => {
+ cursor = self.getNextBatch(cursor);
+ changes[i] = getNextDocFromCursor(cursor);
+ return changes[i] !== null;
+ },
+ () => {
+ return "timed out waiting for another result from the change stream, observed changes: " +
+ tojson(changes) + ", expected changes: " + numChanges;
+ });
+ }
+
+ return changes;
+ };
+
/**
* Checks if the change has been invalidated.
*/
@@ -233,24 +262,8 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
expectedNumChanges = expectedChanges.length;
}
- let changes = [];
- for (let i = 0; i < expectedNumChanges; i++) {
- // Since the first change may be on the original cursor, we need to check for that
- // change on the cursor before we move the cursor forward.
- if (i === 0 && !skipFirstBatch) {
- changes[0] = getNextDocFromCursor(cursor);
- if (changes[0]) {
- assertChangeIsExpected(expectedChanges, 0, changes, expectInvalidate);
- continue;
- }
- }
-
- assert.soon(function() {
- // We need to replace the cursor variable so we return the correct cursor.
- cursor = self.getNextBatch(cursor);
- changes[i] = getNextDocFromCursor(cursor);
- return changes[i] !== null;
- }, "timed out waiting for another result from the change stream");
+ let changes = self.getNextChanges(cursor, expectedNumChanges, skipFirstBatch);
+ for (let i = 0; i < changes.length; i++) {
assertChangeIsExpected(expectedChanges, i, changes, expectInvalidate);
}
@@ -268,6 +281,19 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
};
/**
+ * Iterates through the change stream and asserts that the next 'expected.length' changes are
+ * among the expected ones. The order of the change events from the cursor relative to their
+ * order in the list of expected changes is ignored, however.
+ */
+ self.assertNextChangesEqualUnordered = function(cursor, expected) {
+ const changes = self.getNextChanges(cursor, expected.length).map(event => {
+ return pruneOptionalFields(event, expected[0]);
+ });
+
+ assert.sameMembers(changes, expected);
+ };
+
+ /**
* Retrieves the next batch in the change stream and confirms that it is empty.
*/
self.assertNoChange = function(cursor) {
@@ -280,12 +306,17 @@ function ChangeStreamTest(_db, name = "ChangeStreamTest") {
* If the current batch has a document in it, that one will be ignored.
*/
self.getOneChange = function(cursor, expectInvalidate = false) {
- changes = self.assertNextChangesEqual({
- cursor: cursor,
- expectedNumChanges: 1,
- expectInvalidate: expectInvalidate,
- skipFirstBatch: true
- });
+ changes = self.getNextChanges(cursor, 1, true);
+
+ if (expectInvalidate) {
+ assert(isInvalidated(changes[changes.length - 1]),
+ "Last change was not invalidated when it was expected: " + tojson(changes));
+
+ // We make sure that the next batch kills the cursor after an invalidation entry.
+ let finalCursor = self.getNextBatch(cursor);
+ assert.eq(finalCursor.id, 0, "Final cursor was not killed: " + tojson(finalCursor));
+ }
+
return changes[0];
};