diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-14 10:03:00 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2017-09-18 17:09:10 -0400 |
commit | b394a689561bc35f5e75ff1b6eef2e4fe1ddd512 (patch) | |
tree | f59f5dc550c2ab248273fb54fc9be6dd9c8b5805 /jstests/change_streams | |
parent | a0802c08237c56e76efd5055dec24cdaa3eedb94 (diff) | |
download | mongo-b394a689561bc35f5e75ff1b6eef2e4fe1ddd512.tar.gz |
SERVER-29141 Clean up change stream tests' cursors
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/change_stream.js | 106 | ||||
-rw-r--r-- | jstests/change_streams/lookup_post_image.js | 6 | ||||
-rw-r--r-- | jstests/change_streams/only_wake_getmore_for_relevant_changes.js | 4 |
3 files changed, 67 insertions, 49 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 34099cdf4e9..c545456ef33 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -11,12 +11,20 @@ return ns.split(/\.(.+)/)[1]; } + // We will use this to keep track of cursors opened during this test, so that we can be sure to + // clean them up before this test completes. + let allCursors = []; + // Helpers for testing that pipeline returns correct set of results. Run startWatchingChanges // with the pipeline, then insert the changes, then run assertNextBatchMatches with the result // of startWatchingChanges and the expected set of results. - function startWatchingChanges(pipeline, collection) { - // Strip the oplog fields we aren't testing. - pipeline.push(oplogProjection); + function startWatchingChanges({pipeline, collection, includeTs, aggregateOptions}) { + aggregateOptions = aggregateOptions || {cursor: {}}; + + if (!includeTs) { + // Strip the oplog fields we aren't testing. + pipeline.push(oplogProjection); + } // TODO: SERVER-29126 // While change streams still uses read concern level local instead of read concern level @@ -30,9 +38,10 @@ // Waiting for replication assures no previous operations will be included. FixtureHelpers.awaitReplication(); - let res = assert.commandWorked( - db.runCommand({aggregate: collection.getName(), "pipeline": pipeline, cursor: {}})); + let res = assert.commandWorked(db.runCommand( + Object.merge({aggregate: collection.getName(), pipeline: pipeline}, aggregateOptions))); assert.neq(res.cursor.id, 0); + allCursors.push({db: db.getName(), coll: collection.getName(), cursorId: res.cursor.id}); return res.cursor; } @@ -64,7 +73,7 @@ jsTestLog("Testing single insert"); db.t1.drop(); - let cursor = startWatchingChanges([{$changeStream: {}}], db.t1); + let cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.insert({_id: 0, a: 1})); const t1Uuid = getUUIDFromListCollections(db, db.t1.getName()); let expected = { @@ -80,7 +89,7 @@ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing second insert"); - cursor = startWatchingChanges([{$changeStream: {}}], db.t1); + cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.insert({_id: 1, a: 2})); expected = { _id: { @@ -95,7 +104,7 @@ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing update"); - cursor = startWatchingChanges([{$changeStream: {}}], db.t1); + cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.update({_id: 0}, {a: 3})); expected = { _id: {documentKey: {_id: 0}, uuid: t1Uuid}, @@ -107,7 +116,7 @@ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing update of another field"); - cursor = startWatchingChanges([{$changeStream: {}}], db.t1); + cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.update({_id: 0}, {b: 3})); expected = { _id: {documentKey: {_id: 0}, uuid: t1Uuid}, @@ -119,7 +128,7 @@ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing upsert"); - cursor = startWatchingChanges([{$changeStream: {}}], db.t1); + cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.update({_id: 2}, {a: 4}, {upsert: true})); expected = { _id: { @@ -135,7 +144,7 @@ jsTestLog("Testing partial update with $inc"); assert.writeOK(db.t1.insert({_id: 3, a: 5, b: 1})); - cursor = startWatchingChanges([{$changeStream: {}}], db.t1); + cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.update({_id: 3}, {$inc: {b: 2}})); expected = { _id: {documentKey: {_id: 3}, uuid: t1Uuid}, @@ -147,7 +156,7 @@ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing delete"); - cursor = startWatchingChanges([{$changeStream: {}}], db.t1); + cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); assert.writeOK(db.t1.remove({_id: 1})); expected = { _id: {documentKey: {_id: 1}, uuid: t1Uuid}, @@ -158,8 +167,8 @@ assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]}); jsTestLog("Testing intervening write on another collection"); - cursor = startWatchingChanges([{$changeStream: {}}], db.t1); - let t2cursor = startWatchingChanges([{$changeStream: {}}], db.t2); + cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1}); + let t2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); assert.writeOK(db.t2.insert({_id: 100, c: 1})); const t2Uuid = getUUIDFromListCollections(db, db.t2.getName()); assertNextBatchMatches({cursor: cursor, expectedBatch: []}); @@ -182,7 +191,7 @@ jsTestLog("Testing rename"); db.t3.drop(); - t2cursor = startWatchingChanges([{$changeStream: {}}], db.t2); + t2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2}); assert.writeOK(db.t2.renameCollection("t3")); expected = {_id: {uuid: t2Uuid}, operationType: "invalidate"}; assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]}); @@ -190,8 +199,8 @@ jsTestLog("Testing insert that looks like rename"); db.dne1.drop(); db.dne2.drop(); - const dne1cursor = startWatchingChanges([{$changeStream: {}}], db.dne1); - const dne2cursor = startWatchingChanges([{$changeStream: {}}], db.dne2); + const dne1cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne1}); + const dne2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne2}); assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"})); assertNextBatchMatches({cursor: dne1cursor, expectedBatch: []}); assertNextBatchMatches({cursor: dne2cursor, expectedBatch: []}); @@ -219,9 +228,8 @@ db.tailable2.drop(); db.createCollection("tailable2"); const tailable2Uuid = getUUIDFromListCollections(db, db.tailable2.getName()); - let res = assert.commandWorked(db.runCommand( - {aggregate: "tailable2", pipeline: [{$changeStream: {}}, oplogProjection], cursor: {}})); - let aggcursor = res.cursor; + let aggcursor = + startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.tailable2}); // We should get a valid cursor. assert.neq(aggcursor.id, 0); @@ -232,7 +240,7 @@ // No data, so should return no results, but cursor should remain valid. Note we are // specifically testing awaitdata behavior here, so we cannot use the failpoint to skip the // wait. - res = assert.commandWorked( + let res = assert.commandWorked( db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 1000})); aggcursor = res.cursor; assert.neq(aggcursor.id, 0); @@ -317,20 +325,16 @@ readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()} })); FixtureHelpers.awaitReplication(); - FixtureHelpers.runCommandOnEachPrimary({ - dbName: "admin", - cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"} - }); + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); let res = assert.commandWorked(db.runCommand({ getMore: cursor.id, collection: getCollectionNameFromFullNamespace(cursor.ns), batchSize: 1 })); assert.eq(res.cursor.nextBatch.length, 0); - FixtureHelpers.runCommandOnEachPrimary({ - dbName: "admin", - cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"} - }); + assert.commandWorked( + db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); } jsTestLog("Testing resumability"); @@ -338,11 +342,8 @@ assert.commandWorked(db.createCollection("resume1")); // Note we do not project away 'id.ts' as it is part of the resume token. - res = assert.commandWorked( - db.runCommand({aggregate: "resume1", pipeline: [{$changeStream: {}}], cursor: {}})); - let resumeCursor = res.cursor; - assert.neq(resumeCursor.id, 0); - assert.eq(resumeCursor.firstBatch.length, 0); + let resumeCursor = startWatchingChanges( + {pipeline: [{$changeStream: {}}], collection: db.resume1, includeTs: true}); // Insert a document and save the resulting change stream. assert.writeOK(db.resume1.insert({_id: 1})); @@ -350,12 +351,12 @@ assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1}); jsTestLog("Testing resume after one document."); - res = assert.commandWorked(db.runCommand({ - aggregate: "resume1", + resumeCursor = startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], - cursor: {batchSize: 0} - })); - resumeCursor = res.cursor; + collection: db.resume1, + includeTs: true, + aggregateOptions: {cursor: {batchSize: 0}}, + }); assertNextBatchIsEmpty(resumeCursor); jsTestLog("Inserting additional documents."); @@ -368,23 +369,30 @@ assertNextBatchIsEmpty(resumeCursor); jsTestLog("Testing resume after first document of three."); - res = assert.commandWorked(db.runCommand({ - aggregate: "resume1", + resumeCursor = startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}], - cursor: {batchSize: 0} - })); - resumeCursor = res.cursor; + collection: db.resume1, + includeTs: true, + aggregateOptions: {cursor: {batchSize: 0}}, + }); assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc); assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc); assertNextBatchIsEmpty(resumeCursor); jsTestLog("Testing resume after second document of three."); - res = assert.commandWorked(db.runCommand({ - aggregate: "resume1", + resumeCursor = startWatchingChanges({ pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}], - cursor: {batchSize: 0} - })); - resumeCursor = res.cursor; + collection: db.resume1, + includeTs: true, + aggregateOptions: {cursor: {batchSize: 0}}, + }); assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc); assertNextBatchIsEmpty(resumeCursor); + + for (let testCursor of allCursors) { + assert.commandWorked(db.getSiblingDB(testCursor.db).runCommand({ + killCursors: testCursor.coll, + cursors: [testCursor.cursorId] + })); + } }()); diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index 7c15055cbb8..44c1e8de956 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -27,6 +27,8 @@ const cmdResponse = assert.commandWorked( db.runCommand({aggregate: collection.getName(), pipeline: pipeline, cursor: {}})); assert.neq(cmdResponse.cursor.firstBatch.length, 0); + assert.commandWorked( + db.runCommand({killCursors: collection.getName(), cursors: [cmdResponse.cursor.id]})); return cmdResponse.cursor.firstBatch[cmdResponse.cursor.firstBatch.length - 1]; } @@ -78,6 +80,7 @@ db.runCommand({aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}})); assert.writeOK(coll.insert({_id: "dummy"})); const firstChange = getOneDoc(res.cursor); + assert.commandWorked(db.runCommand({killCursors: coll.getName(), cursors: [res.cursor.id]})); jsTestLog("Testing change streams without 'fullDocument' specified"); // Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for an @@ -264,6 +267,8 @@ latestChange = getOneDoc(res.cursor); assert.eq(latestChange.operationType, "invalidate"); assert(!latestChange.hasOwnProperty("fullDocument")); + assert.commandWorked( + db.runCommand({killCursors: db.collInvalidate.getName(), cursors: [res.cursor.id]})); // TODO(russotto): Can just use "coll" here once read majority is working. // For now, using the old collection results in us reading stale data sometimes. @@ -307,4 +312,5 @@ })); assert.eq(res.cursor.nextBatch.length, 1); assert.docEq(res.cursor.nextBatch[0]["fullDocument"], {_id: "getMoreEnabled", updated: true}); + assert.commandWorked(db.runCommand({killCursors: coll2.getName(), cursors: [res.cursor.id]})); }()); diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js index 31aebf784e3..00e04b6641e 100644 --- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js +++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js @@ -137,6 +137,8 @@ eventFn();`, awaitDataCursorId: changeCursorId, event: () => assert.writeOK(db.unrelated_collection.insert({_id: "unrelated change"})) }); + assert.commandWorked( + db.runCommand({killCursors: changesCollection.getName(), cursors: [changeCursorId]})); // Test that changes ignored by filtering in later stages of the pipeline will not cause the // cursor to return before the getMore has exceeded maxTimeMS. @@ -155,4 +157,6 @@ eventFn();`, awaitDataCursorId: res.cursor.id, event: () => assert.writeOK(db.changes.insert({_id: "should not appear"})) }); + assert.commandWorked( + db.runCommand({killCursors: changesCollection.getName(), cursors: [res.cursor.id]})); }()); |