summaryrefslogtreecommitdiff
path: root/jstests/change_streams/change_stream.js
diff options
context:
space:
mode:
authorJudah Schvimer <judah@mongodb.com>2017-10-04 17:12:00 -0400
committerJudah Schvimer <judah@mongodb.com>2017-10-04 17:12:00 -0400
commit5180b8e6272169f1f8f237f1d64fe57f690b4802 (patch)
tree92903b3440920582e1bb1cb1f1c36480bdcec481 /jstests/change_streams/change_stream.js
parentd4eb562ac63717904f24de4a22e395070687bc62 (diff)
downloadmongo-5180b8e6272169f1f8f237f1d64fe57f690b4802.tar.gz
SERVER-31134 Adapt change stream tests to relax assertions on change visibility
Diffstat (limited to 'jstests/change_streams/change_stream.js')
-rw-r--r--jstests/change_streams/change_stream.js242
1 files changed, 48 insertions, 194 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index a139b0449aa..9ac347f406b 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -2,57 +2,17 @@
(function() {
"use strict";
+ load("jstests/libs/change_stream_util.js");
load('jstests/libs/uuid_util.js');
- const oplogProjection = {$project: {"_id.clusterTime": 0}};
-
- function getCollectionNameFromFullNamespace(ns) {
- return ns.split(/\.(.+)/)[1];
- }
-
- // We will use this to keep track of cursors opened during this test, so that we can be sure to
- // clean them up before this test completes.
- let allCursors = [];
-
- // Helpers for testing that pipeline returns correct set of results. Run startWatchingChanges
- // with the pipeline, then insert the changes, then run assertNextBatchMatches with the result
- // of startWatchingChanges and the expected set of results.
- function startWatchingChanges({pipeline, collection, includeTs, aggregateOptions}) {
- aggregateOptions = aggregateOptions || {cursor: {}};
-
- if (!includeTs) {
- // Strip the oplog fields we aren't testing.
- pipeline.push(oplogProjection);
- }
-
- let res = assert.commandWorked(db.runCommand(
- Object.merge({aggregate: collection.getName(), pipeline: pipeline}, aggregateOptions)));
- assert.neq(res.cursor.id, 0);
- allCursors.push({db: db.getName(), coll: collection.getName(), cursorId: res.cursor.id});
- return res.cursor;
- }
-
- function assertNextBatchMatches({cursor, expectedBatch}) {
- if (expectedBatch.length == 0) {
- assert.commandWorked(db.adminCommand(
- {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
- }
- let res = assert.commandWorked(db.runCommand({
- getMore: cursor.id,
- collection: getCollectionNameFromFullNamespace(cursor.ns),
- maxTimeMS: 5 * 60 * 1000,
- batchSize: (expectedBatch.length + 1)
- }));
- if (expectedBatch.length == 0) {
- assert.commandWorked(db.adminCommand(
- {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
- }
- assert.docEq(res.cursor.nextBatch, expectedBatch);
- }
+ let cst = new ChangeStreamTest(db);
jsTestLog("Testing single insert");
db.t1.drop();
- let cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ // Test that if there are no changes, we return an empty batch.
+ assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor));
+
assert.writeOK(db.t1.insert({_id: 0, a: 1}));
const t1Uuid = getUUIDFromListCollections(db, db.t1.getName());
let expected = {
@@ -65,10 +25,14 @@
ns: {db: "test", coll: "t1"},
operationType: "insert",
};
- assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
+
+ // Test that if there are no changes during a subsequent 'getMore', we return an empty batch.
+ cursor = cst.getNextBatch(cursor);
+ assert.eq(0, cursor.nextBatch.length, "Cursor had changes: " + tojson(cursor));
jsTestLog("Testing second insert");
- cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.insert({_id: 1, a: 2}));
expected = {
_id: {
@@ -80,10 +44,10 @@
ns: {db: "test", coll: "t1"},
operationType: "insert",
};
- assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
jsTestLog("Testing update");
- cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 0}, {a: 3}));
expected = {
_id: {documentKey: {_id: 0}, uuid: t1Uuid},
@@ -92,10 +56,10 @@
ns: {db: "test", coll: "t1"},
operationType: "replace",
};
- assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
jsTestLog("Testing update of another field");
- cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 0}, {b: 3}));
expected = {
_id: {documentKey: {_id: 0}, uuid: t1Uuid},
@@ -104,10 +68,10 @@
ns: {db: "test", coll: "t1"},
operationType: "replace",
};
- assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
jsTestLog("Testing upsert");
- cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 2}, {a: 4}, {upsert: true}));
expected = {
_id: {
@@ -119,11 +83,11 @@
ns: {db: "test", coll: "t1"},
operationType: "insert",
};
- assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
jsTestLog("Testing partial update with $inc");
assert.writeOK(db.t1.insert({_id: 3, a: 5, b: 1}));
- cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 3}, {$inc: {b: 2}}));
expected = {
_id: {documentKey: {_id: 3}, uuid: t1Uuid},
@@ -132,10 +96,10 @@
operationType: "update",
updateDescription: {removedFields: [], updatedFields: {b: 3}},
};
- assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
jsTestLog("Testing delete");
- cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.remove({_id: 1}));
expected = {
_id: {documentKey: {_id: 1}, uuid: t1Uuid},
@@ -143,14 +107,14 @@
ns: {db: "test", coll: "t1"},
operationType: "delete",
};
- assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]});
jsTestLog("Testing intervening write on another collection");
- cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
- let t2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
+ cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
assert.writeOK(db.t2.insert({_id: 100, c: 1}));
const t2Uuid = getUUIDFromListCollections(db, db.t2.getName());
- assertNextBatchMatches({cursor: cursor, expectedBatch: []});
+ cst.assertNextChangesEqual({cursor: cursor, expectedChanges: []});
expected = {
_id: {
documentKey: {_id: 100},
@@ -161,7 +125,7 @@
ns: {db: "test", coll: "t2"},
operationType: "insert",
};
- assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]});
+ cst.assertNextChangesEqual({cursor: t2cursor, expectedChanges: [expected]});
jsTestLog("Testing drop of unrelated collection");
assert.writeOK(db.dropping.insert({}));
@@ -170,93 +134,28 @@
jsTestLog("Testing rename");
db.t3.drop();
- t2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
+ t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
assert.writeOK(db.t2.renameCollection("t3"));
expected = {_id: {uuid: t2Uuid}, operationType: "invalidate"};
- assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]});
+ cst.assertNextChangesEqual(
+ {cursor: t2cursor, expectedChanges: [expected], expectInvalidate: true});
jsTestLog("Testing insert that looks like rename");
db.dne1.drop();
db.dne2.drop();
- const dne1cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne1});
- const dne2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne2});
+ const dne1cursor =
+ cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne1});
+ const dne2cursor =
+ cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne2});
assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"}));
- assertNextBatchMatches({cursor: dne1cursor, expectedBatch: []});
- assertNextBatchMatches({cursor: dne2cursor, expectedBatch: []});
-
- // Now make sure the cursor behaves like a tailable awaitData cursor.
- jsTestLog("Testing tailability");
- db.tailable1.drop();
- const tailableCursor = db.tailable1.aggregate([{$changeStream: {}}, oplogProjection]);
- assert(!tailableCursor.hasNext());
- assert.writeOK(db.tailable1.insert({_id: 101, a: 1}));
- const tailable1Uuid = getUUIDFromListCollections(db, db.tailable1.getName());
- assert(tailableCursor.hasNext());
- assert.docEq(tailableCursor.next(), {
- _id: {
- documentKey: {_id: 101},
- uuid: tailable1Uuid,
- },
- documentKey: {_id: 101},
- fullDocument: {_id: 101, a: 1},
- ns: {db: "test", coll: "tailable1"},
- operationType: "insert",
- });
-
- jsTestLog("Testing awaitdata");
- db.tailable2.drop();
- db.createCollection("tailable2");
- const tailable2Uuid = getUUIDFromListCollections(db, db.tailable2.getName());
- let aggcursor =
- startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.tailable2});
-
- // We should get a valid cursor.
- assert.neq(aggcursor.id, 0);
-
- // Initial batch size should be zero as there should be no data.
- assert.eq(aggcursor.firstBatch.length, 0);
-
- // No data, so should return no results, but cursor should remain valid. Note we are
- // specifically testing awaitdata behavior here, so we cannot use the failpoint to skip the
- // wait.
- let res = assert.commandWorked(
- db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 1000}));
- aggcursor = res.cursor;
- assert.neq(aggcursor.id, 0);
- assert.eq(aggcursor.nextBatch.length, 0);
-
- // Now insert something in parallel while waiting for it.
- let insertshell = startParallelShell(function() {
- // Wait for the getMore to appear in currentop.
- assert.soon(function() {
- return db.currentOp({op: "getmore", "command.collection": "tailable2"}).inprog.length ==
- 1;
- });
- assert.writeOK(db.tailable2.insert({_id: 102, a: 2}));
- });
- res = assert.commandWorked(
- db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 5 * 60 * 1000}));
- aggcursor = res.cursor;
- assert.eq(aggcursor.nextBatch.length, 1);
- assert.docEq(aggcursor.nextBatch[0], {
- _id: {
- documentKey: {_id: 102},
- uuid: tailable2Uuid,
- },
- documentKey: {_id: 102},
- fullDocument: {_id: 102, a: 2},
- ns: {db: "test", coll: "tailable2"},
- operationType: "insert",
- });
-
- // Wait for insert shell to terminate.
- insertshell();
+ cst.assertNextChangesEqual({cursor: dne1cursor, expectedChanges: []});
+ cst.assertNextChangesEqual({cursor: dne2cursor, expectedChanges: []});
const isMongos = db.runCommand({isdbgrid: 1}).isdbgrid;
if (!isMongos) {
jsTestLog("Ensuring attempt to read with legacy operations fails.");
db.getMongo().forceReadMode('legacy');
- const legacyCursor = db.tailable2.aggregate([{$changeStream: {}}, oplogProjection],
+ const legacyCursor = db.tailable2.aggregate([{$changeStream: {}}, cst.oplogProjection],
{cursor: {batchSize: 0}});
assert.throws(function() {
legacyCursor.next();
@@ -264,98 +163,53 @@
db.getMongo().forceReadMode('commands');
}
- /**
- * Gets one document from the cursor using getMore with awaitData disabled. Asserts if no
- * document is present.
- */
- function getOneDoc(cursor) {
- assert.commandWorked(db.adminCommand(
- {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
- let res = assert.commandWorked(db.runCommand({
- getMore: cursor.id,
- collection: getCollectionNameFromFullNamespace(cursor.ns),
- batchSize: 1
- }));
- assert.eq(res.cursor.nextBatch.length, 1);
- assert.commandWorked(
- db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
- return res.cursor.nextBatch[0];
- }
-
- /**
- * Attempts to get a document from the cursor with awaitData disabled, and asserts if a
- * document
- * is present.
- */
- function assertNextBatchIsEmpty(cursor) {
- assert.commandWorked(db.adminCommand(
- {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
- let res = assert.commandWorked(db.runCommand({
- getMore: cursor.id,
- collection: getCollectionNameFromFullNamespace(cursor.ns),
- batchSize: 1
- }));
- assert.eq(res.cursor.nextBatch.length, 0);
- assert.commandWorked(
- db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
- }
-
jsTestLog("Testing resumability");
db.resume1.drop();
assert.commandWorked(db.createCollection("resume1"));
// Note we do not project away 'id.ts' as it is part of the resume token.
- let resumeCursor = startWatchingChanges(
+ let resumeCursor = cst.startWatchingChanges(
{pipeline: [{$changeStream: {}}], collection: db.resume1, includeTs: true});
// Insert a document and save the resulting change stream.
assert.writeOK(db.resume1.insert({_id: 1}));
- const firstInsertChangeDoc = getOneDoc(resumeCursor);
+ const firstInsertChangeDoc = cst.getOneChange(resumeCursor);
assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
jsTestLog("Testing resume after one document.");
- resumeCursor = startWatchingChanges({
+ resumeCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
collection: db.resume1,
includeTs: true,
aggregateOptions: {cursor: {batchSize: 0}},
});
- assertNextBatchIsEmpty(resumeCursor);
jsTestLog("Inserting additional documents.");
assert.writeOK(db.resume1.insert({_id: 2}));
- const secondInsertChangeDoc = getOneDoc(resumeCursor);
+ const secondInsertChangeDoc = cst.getOneChange(resumeCursor);
assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2});
assert.writeOK(db.resume1.insert({_id: 3}));
- const thirdInsertChangeDoc = getOneDoc(resumeCursor);
+ const thirdInsertChangeDoc = cst.getOneChange(resumeCursor);
assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3});
- assertNextBatchIsEmpty(resumeCursor);
jsTestLog("Testing resume after first document of three.");
- resumeCursor = startWatchingChanges({
+ resumeCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
collection: db.resume1,
includeTs: true,
aggregateOptions: {cursor: {batchSize: 0}},
});
- assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc);
- assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
- assertNextBatchIsEmpty(resumeCursor);
+ assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc);
+ assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
jsTestLog("Testing resume after second document of three.");
- resumeCursor = startWatchingChanges({
+ resumeCursor = cst.startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}],
collection: db.resume1,
includeTs: true,
aggregateOptions: {cursor: {batchSize: 0}},
});
- assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
- assertNextBatchIsEmpty(resumeCursor);
+ assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc);
- for (let testCursor of allCursors) {
- assert.commandWorked(db.getSiblingDB(testCursor.db).runCommand({
- killCursors: testCursor.coll,
- cursors: [testCursor.cursorId]
- }));
- }
+ cst.cleanUp();
}());