diff options
author | Judah Schvimer <judah@mongodb.com> | 2017-10-04 17:12:00 -0400 |
---|---|---|
committer | Judah Schvimer <judah@mongodb.com> | 2017-10-04 17:12:00 -0400 |
commit | 5180b8e6272169f1f8f237f1d64fe57f690b4802 (patch) | |
tree | 92903b3440920582e1bb1cb1f1c36480bdcec481 /jstests/change_streams/change_stream.js | |
parent | d4eb562ac63717904f24de4a22e395070687bc62 (diff) | |
download | mongo-5180b8e6272169f1f8f237f1d64fe57f690b4802.tar.gz |
SERVER-31134 Adapt change stream tests to relax assertions on change visibility
Diffstat (limited to 'jstests/change_streams/change_stream.js')
-rw-r--r-- | jstests/change_streams/change_stream.js | 242 |
1 files changed, 48 insertions, 194 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index a139b0449aa..9ac347f406b 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -2,57 +2,17 @@ (function() { "use strict"; + load("jstests/libs/change_stream_util.js"); load('jstests/libs/uuid_util.js'); - const oplogProjection = {$project: {"_id.clusterTime": 0}}; - - function getCollectionNameFromFullNamespace(ns) { - return ns.split(/\.(.+)/)[1]; - } - - // We will use this to keep track of cursors opened during this test, so that we can be sure to - // clean them up before this test completes. - let allCursors = []; - - // Helpers for testing that pipeline returns correct set of results. Run startWatchingChanges - // with the pipeline, then insert the changes, then run assertNextBatchMatches with the result - // of startWatchingChanges and the expected set of results. - function startWatchingChanges({pipeline, collection, includeTs, aggregateOptions}) { - aggregateOptions = aggregateOptions || {cursor: {}}; - - if (!includeTs) { - // Strip the oplog fields we aren't testing. - pipeline.push(oplogProjection); - } - - let res = assert.commandWorked(db.runCommand( - Object.merge({aggregate: collection.getName(), pipeline: pipeline}, aggregateOptions))); - assert.neq(res.cursor.id, 0); - allCursors.push({db: db.getName(), coll: collection.getName(), cursorId: res.cursor.id}); - return res.cursor; - } - - function assertNextBatchMatches({cursor, expectedBatch}) { - if (expectedBatch.length == 0) { - assert.commandWorked(db.adminCommand( - {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); - } - let res = assert.commandWorked(db.runCommand({ - getMore: cursor.id, - collection: getCollectionNameFromFullNamespace(cursor.ns), - maxTimeMS: 5 * 60 * 1000, - batchSize: (expectedBatch.length + 1) - })); - if (expectedBatch.length == 0) { - assert.commandWorked(db.adminCommand( - {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); - } - assert.docEq(res.cursor.nextBatch, expectedBatch); - } + let cst = new ChangeStreamTest(db); jsTestLog("Testing single insert"); db.t1.drop(); - let cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + let cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + // Test that if there are no changes, we return an empty batch. + assert.eq(0, cursor.firstBatch.length, "Cursor had changes: " + tojson(cursor)); + assert.writeOK(db.t1.insert({_id: 0, a: 1})); const t1Uuid = getUUIDFromListCollections(db, db.t1.getName()); let expected = { @@ -65,10 +25,14 @@ ns: {db: "test", coll: "t1"}, operationType: "insert", }; - assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); + + // Test that if there are no changes during a subsequent 'getMore', we return an empty batch. + cursor = cst.getNextBatch(cursor); + assert.eq(0, cursor.nextBatch.length, "Cursor had changes: " + tojson(cursor)); jsTestLog("Testing second insert"); - cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.insert({_id: 1, a: 2})); expected = { _id: { @@ -80,10 +44,10 @@ ns: {db: "test", coll: "t1"}, operationType: "insert", }; - assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); jsTestLog("Testing update"); - cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.update({_id: 0}, {a: 3})); expected = { _id: {documentKey: {_id: 0}, uuid: t1Uuid}, @@ -92,10 +56,10 @@ ns: {db: "test", coll: "t1"}, operationType: "replace", }; - assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); jsTestLog("Testing update of another field"); - cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.update({_id: 0}, {b: 3})); expected = { _id: {documentKey: {_id: 0}, uuid: t1Uuid}, @@ -104,10 +68,10 @@ ns: {db: "test", coll: "t1"}, operationType: "replace", }; - assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); jsTestLog("Testing upsert"); - cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.update({_id: 2}, {a: 4}, {upsert: true})); expected = { _id: { @@ -119,11 +83,11 @@ ns: {db: "test", coll: "t1"}, operationType: "insert", }; - assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); jsTestLog("Testing partial update with $inc"); assert.writeOK(db.t1.insert({_id: 3, a: 5, b: 1})); - cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.update({_id: 3}, {$inc: {b: 2}})); expected = { _id: {documentKey: {_id: 3}, uuid: t1Uuid}, @@ -132,10 +96,10 @@ operationType: "update", updateDescription: {removedFields: [], updatedFields: {b: 3}}, }; - assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); jsTestLog("Testing delete"); - cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.remove({_id: 1})); expected = { _id: {documentKey: {_id: 1}, uuid: t1Uuid}, @@ -143,14 +107,14 @@ ns: {db: "test", coll: "t1"}, operationType: "delete", }; - assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: [expected]}); jsTestLog("Testing intervening write on another collection"); - cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); - let t2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); + cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + let t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); assert.writeOK(db.t2.insert({_id: 100, c: 1})); const t2Uuid = getUUIDFromListCollections(db, db.t2.getName()); - assertNextBatchMatches({cursor: cursor, expectedBatch: []}); + cst.assertNextChangesEqual({cursor: cursor, expectedChanges: []}); expected = { _id: { documentKey: {_id: 100}, @@ -161,7 +125,7 @@ ns: {db: "test", coll: "t2"}, operationType: "insert", }; - assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]}); + cst.assertNextChangesEqual({cursor: t2cursor, expectedChanges: [expected]}); jsTestLog("Testing drop of unrelated collection"); assert.writeOK(db.dropping.insert({})); @@ -170,93 +134,28 @@ jsTestLog("Testing rename"); db.t3.drop(); - t2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); + t2cursor = cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); assert.writeOK(db.t2.renameCollection("t3")); expected = {_id: {uuid: t2Uuid}, operationType: "invalidate"}; - assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]}); + cst.assertNextChangesEqual( + {cursor: t2cursor, expectedChanges: [expected], expectInvalidate: true}); jsTestLog("Testing insert that looks like rename"); db.dne1.drop(); db.dne2.drop(); - const dne1cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne1}); - const dne2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne2}); + const dne1cursor = + cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne1}); + const dne2cursor = + cst.startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne2}); assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"})); - assertNextBatchMatches({cursor: dne1cursor, expectedBatch: []}); - assertNextBatchMatches({cursor: dne2cursor, expectedBatch: []}); - - // Now make sure the cursor behaves like a tailable awaitData cursor. - jsTestLog("Testing tailability"); - db.tailable1.drop(); - const tailableCursor = db.tailable1.aggregate([{$changeStream: {}}, oplogProjection]); - assert(!tailableCursor.hasNext()); - assert.writeOK(db.tailable1.insert({_id: 101, a: 1})); - const tailable1Uuid = getUUIDFromListCollections(db, db.tailable1.getName()); - assert(tailableCursor.hasNext()); - assert.docEq(tailableCursor.next(), { - _id: { - documentKey: {_id: 101}, - uuid: tailable1Uuid, - }, - documentKey: {_id: 101}, - fullDocument: {_id: 101, a: 1}, - ns: {db: "test", coll: "tailable1"}, - operationType: "insert", - }); - - jsTestLog("Testing awaitdata"); - db.tailable2.drop(); - db.createCollection("tailable2"); - const tailable2Uuid = getUUIDFromListCollections(db, db.tailable2.getName()); - let aggcursor = - startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.tailable2}); - - // We should get a valid cursor. - assert.neq(aggcursor.id, 0); - - // Initial batch size should be zero as there should be no data. - assert.eq(aggcursor.firstBatch.length, 0); - - // No data, so should return no results, but cursor should remain valid. Note we are - // specifically testing awaitdata behavior here, so we cannot use the failpoint to skip the - // wait. - let res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 1000})); - aggcursor = res.cursor; - assert.neq(aggcursor.id, 0); - assert.eq(aggcursor.nextBatch.length, 0); - - // Now insert something in parallel while waiting for it. - let insertshell = startParallelShell(function() { - // Wait for the getMore to appear in currentop. - assert.soon(function() { - return db.currentOp({op: "getmore", "command.collection": "tailable2"}).inprog.length == - 1; - }); - assert.writeOK(db.tailable2.insert({_id: 102, a: 2})); - }); - res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 5 * 60 * 1000})); - aggcursor = res.cursor; - assert.eq(aggcursor.nextBatch.length, 1); - assert.docEq(aggcursor.nextBatch[0], { - _id: { - documentKey: {_id: 102}, - uuid: tailable2Uuid, - }, - documentKey: {_id: 102}, - fullDocument: {_id: 102, a: 2}, - ns: {db: "test", coll: "tailable2"}, - operationType: "insert", - }); - - // Wait for insert shell to terminate. - insertshell(); + cst.assertNextChangesEqual({cursor: dne1cursor, expectedChanges: []}); + cst.assertNextChangesEqual({cursor: dne2cursor, expectedChanges: []}); const isMongos = db.runCommand({isdbgrid: 1}).isdbgrid; if (!isMongos) { jsTestLog("Ensuring attempt to read with legacy operations fails."); db.getMongo().forceReadMode('legacy'); - const legacyCursor = db.tailable2.aggregate([{$changeStream: {}}, oplogProjection], + const legacyCursor = db.tailable2.aggregate([{$changeStream: {}}, cst.oplogProjection], {cursor: {batchSize: 0}}); assert.throws(function() { legacyCursor.next(); @@ -264,98 +163,53 @@ db.getMongo().forceReadMode('commands'); } - /** - * Gets one document from the cursor using getMore with awaitData disabled. Asserts if no - * document is present. - */ - function getOneDoc(cursor) { - assert.commandWorked(db.adminCommand( - {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); - let res = assert.commandWorked(db.runCommand({ - getMore: cursor.id, - collection: getCollectionNameFromFullNamespace(cursor.ns), - batchSize: 1 - })); - assert.eq(res.cursor.nextBatch.length, 1); - assert.commandWorked( - db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); - return res.cursor.nextBatch[0]; - } - - /** - * Attempts to get a document from the cursor with awaitData disabled, and asserts if a - * document - * is present. - */ - function assertNextBatchIsEmpty(cursor) { - assert.commandWorked(db.adminCommand( - {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); - let res = assert.commandWorked(db.runCommand({ - getMore: cursor.id, - collection: getCollectionNameFromFullNamespace(cursor.ns), - batchSize: 1 - })); - assert.eq(res.cursor.nextBatch.length, 0); - assert.commandWorked( - db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); - } - jsTestLog("Testing resumability"); db.resume1.drop(); assert.commandWorked(db.createCollection("resume1")); // Note we do not project away 'id.ts' as it is part of the resume token. - let resumeCursor = startWatchingChanges( + let resumeCursor = cst.startWatchingChanges( {pipeline: [{$changeStream: {}}], collection: db.resume1, includeTs: true}); // Insert a document and save the resulting change stream. assert.writeOK(db.resume1.insert({_id: 1})); - const firstInsertChangeDoc = getOneDoc(resumeCursor); + const firstInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1}); jsTestLog("Testing resume after one document."); - resumeCursor = startWatchingChanges({ + resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], collection: db.resume1, includeTs: true, aggregateOptions: {cursor: {batchSize: 0}}, }); - assertNextBatchIsEmpty(resumeCursor); jsTestLog("Inserting additional documents."); assert.writeOK(db.resume1.insert({_id: 2})); - const secondInsertChangeDoc = getOneDoc(resumeCursor); + const secondInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq(secondInsertChangeDoc.fullDocument, {_id: 2}); assert.writeOK(db.resume1.insert({_id: 3})); - const thirdInsertChangeDoc = getOneDoc(resumeCursor); + const thirdInsertChangeDoc = cst.getOneChange(resumeCursor); assert.docEq(thirdInsertChangeDoc.fullDocument, {_id: 3}); - assertNextBatchIsEmpty(resumeCursor); jsTestLog("Testing resume after first document of three."); - resumeCursor = startWatchingChanges({ + resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], collection: db.resume1, includeTs: true, aggregateOptions: {cursor: {batchSize: 0}}, }); - assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc); - assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc); - assertNextBatchIsEmpty(resumeCursor); + assert.docEq(cst.getOneChange(resumeCursor), secondInsertChangeDoc); + assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); jsTestLog("Testing resume after second document of three."); - resumeCursor = startWatchingChanges({ + resumeCursor = cst.startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}], collection: db.resume1, includeTs: true, aggregateOptions: {cursor: {batchSize: 0}}, }); - assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc); - assertNextBatchIsEmpty(resumeCursor); + assert.docEq(cst.getOneChange(resumeCursor), thirdInsertChangeDoc); - for (let testCursor of allCursors) { - assert.commandWorked(db.getSiblingDB(testCursor.db).runCommand({ - killCursors: testCursor.coll, - cursors: [testCursor.cursorId] - })); - } + cst.cleanUp(); }()); |