summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2017-09-14 10:03:00 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2017-09-18 17:09:10 -0400
commitb394a689561bc35f5e75ff1b6eef2e4fe1ddd512 (patch)
treef59f5dc550c2ab248273fb54fc9be6dd9c8b5805 /jstests/change_streams
parenta0802c08237c56e76efd5055dec24cdaa3eedb94 (diff)
downloadmongo-b394a689561bc35f5e75ff1b6eef2e4fe1ddd512.tar.gz
SERVER-29141 Clean up change stream tests' cursors
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/change_stream.js106
-rw-r--r--jstests/change_streams/lookup_post_image.js6
-rw-r--r--jstests/change_streams/only_wake_getmore_for_relevant_changes.js4
3 files changed, 67 insertions, 49 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 34099cdf4e9..c545456ef33 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -11,12 +11,20 @@
return ns.split(/\.(.+)/)[1];
}
+ // We will use this to keep track of cursors opened during this test, so that we can be sure to
+ // clean them up before this test completes.
+ let allCursors = [];
+
// Helpers for testing that pipeline returns correct set of results. Run startWatchingChanges
// with the pipeline, then insert the changes, then run assertNextBatchMatches with the result
// of startWatchingChanges and the expected set of results.
- function startWatchingChanges(pipeline, collection) {
- // Strip the oplog fields we aren't testing.
- pipeline.push(oplogProjection);
+ function startWatchingChanges({pipeline, collection, includeTs, aggregateOptions}) {
+ aggregateOptions = aggregateOptions || {cursor: {}};
+
+ if (!includeTs) {
+ // Strip the oplog fields we aren't testing.
+ pipeline.push(oplogProjection);
+ }
// TODO: SERVER-29126
// While change streams still uses read concern level local instead of read concern level
@@ -30,9 +38,10 @@
// Waiting for replication assures no previous operations will be included.
FixtureHelpers.awaitReplication();
- let res = assert.commandWorked(
- db.runCommand({aggregate: collection.getName(), "pipeline": pipeline, cursor: {}}));
+ let res = assert.commandWorked(db.runCommand(
+ Object.merge({aggregate: collection.getName(), pipeline: pipeline}, aggregateOptions)));
assert.neq(res.cursor.id, 0);
+ allCursors.push({db: db.getName(), coll: collection.getName(), cursorId: res.cursor.id});
return res.cursor;
}
@@ -64,7 +73,7 @@
jsTestLog("Testing single insert");
db.t1.drop();
- let cursor = startWatchingChanges([{$changeStream: {}}], db.t1);
+ let cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.insert({_id: 0, a: 1}));
const t1Uuid = getUUIDFromListCollections(db, db.t1.getName());
let expected = {
@@ -80,7 +89,7 @@
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing second insert");
- cursor = startWatchingChanges([{$changeStream: {}}], db.t1);
+ cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.insert({_id: 1, a: 2}));
expected = {
_id: {
@@ -95,7 +104,7 @@
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing update");
- cursor = startWatchingChanges([{$changeStream: {}}], db.t1);
+ cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 0}, {a: 3}));
expected = {
_id: {documentKey: {_id: 0}, uuid: t1Uuid},
@@ -107,7 +116,7 @@
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing update of another field");
- cursor = startWatchingChanges([{$changeStream: {}}], db.t1);
+ cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 0}, {b: 3}));
expected = {
_id: {documentKey: {_id: 0}, uuid: t1Uuid},
@@ -119,7 +128,7 @@
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing upsert");
- cursor = startWatchingChanges([{$changeStream: {}}], db.t1);
+ cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 2}, {a: 4}, {upsert: true}));
expected = {
_id: {
@@ -135,7 +144,7 @@
jsTestLog("Testing partial update with $inc");
assert.writeOK(db.t1.insert({_id: 3, a: 5, b: 1}));
- cursor = startWatchingChanges([{$changeStream: {}}], db.t1);
+ cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.update({_id: 3}, {$inc: {b: 2}}));
expected = {
_id: {documentKey: {_id: 3}, uuid: t1Uuid},
@@ -147,7 +156,7 @@
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing delete");
- cursor = startWatchingChanges([{$changeStream: {}}], db.t1);
+ cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
assert.writeOK(db.t1.remove({_id: 1}));
expected = {
_id: {documentKey: {_id: 1}, uuid: t1Uuid},
@@ -158,8 +167,8 @@
assertNextBatchMatches({cursor: cursor, expectedBatch: [expected]});
jsTestLog("Testing intervening write on another collection");
- cursor = startWatchingChanges([{$changeStream: {}}], db.t1);
- let t2cursor = startWatchingChanges([{$changeStream: {}}], db.t2);
+ cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t1});
+ let t2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
assert.writeOK(db.t2.insert({_id: 100, c: 1}));
const t2Uuid = getUUIDFromListCollections(db, db.t2.getName());
assertNextBatchMatches({cursor: cursor, expectedBatch: []});
@@ -182,7 +191,7 @@
jsTestLog("Testing rename");
db.t3.drop();
- t2cursor = startWatchingChanges([{$changeStream: {}}], db.t2);
+ t2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.t2});
assert.writeOK(db.t2.renameCollection("t3"));
expected = {_id: {uuid: t2Uuid}, operationType: "invalidate"};
assertNextBatchMatches({cursor: t2cursor, expectedBatch: [expected]});
@@ -190,8 +199,8 @@
jsTestLog("Testing insert that looks like rename");
db.dne1.drop();
db.dne2.drop();
- const dne1cursor = startWatchingChanges([{$changeStream: {}}], db.dne1);
- const dne2cursor = startWatchingChanges([{$changeStream: {}}], db.dne2);
+ const dne1cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne1});
+ const dne2cursor = startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.dne2});
assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"}));
assertNextBatchMatches({cursor: dne1cursor, expectedBatch: []});
assertNextBatchMatches({cursor: dne2cursor, expectedBatch: []});
@@ -219,9 +228,8 @@
db.tailable2.drop();
db.createCollection("tailable2");
const tailable2Uuid = getUUIDFromListCollections(db, db.tailable2.getName());
- let res = assert.commandWorked(db.runCommand(
- {aggregate: "tailable2", pipeline: [{$changeStream: {}}, oplogProjection], cursor: {}}));
- let aggcursor = res.cursor;
+ let aggcursor =
+ startWatchingChanges({pipeline: [{$changeStream: {}}], collection: db.tailable2});
// We should get a valid cursor.
assert.neq(aggcursor.id, 0);
@@ -232,7 +240,7 @@
// No data, so should return no results, but cursor should remain valid. Note we are
// specifically testing awaitdata behavior here, so we cannot use the failpoint to skip the
// wait.
- res = assert.commandWorked(
+ let res = assert.commandWorked(
db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 1000}));
aggcursor = res.cursor;
assert.neq(aggcursor.id, 0);
@@ -317,20 +325,16 @@
readConcern: {level: "local", afterClusterTime: db.getMongo().getOperationTime()}
}));
FixtureHelpers.awaitReplication();
- FixtureHelpers.runCommandOnEachPrimary({
- dbName: "admin",
- cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}
- });
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
let res = assert.commandWorked(db.runCommand({
getMore: cursor.id,
collection: getCollectionNameFromFullNamespace(cursor.ns),
batchSize: 1
}));
assert.eq(res.cursor.nextBatch.length, 0);
- FixtureHelpers.runCommandOnEachPrimary({
- dbName: "admin",
- cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}
- });
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
}
jsTestLog("Testing resumability");
@@ -338,11 +342,8 @@
assert.commandWorked(db.createCollection("resume1"));
// Note we do not project away 'id.ts' as it is part of the resume token.
- res = assert.commandWorked(
- db.runCommand({aggregate: "resume1", pipeline: [{$changeStream: {}}], cursor: {}}));
- let resumeCursor = res.cursor;
- assert.neq(resumeCursor.id, 0);
- assert.eq(resumeCursor.firstBatch.length, 0);
+ let resumeCursor = startWatchingChanges(
+ {pipeline: [{$changeStream: {}}], collection: db.resume1, includeTs: true});
// Insert a document and save the resulting change stream.
assert.writeOK(db.resume1.insert({_id: 1}));
@@ -350,12 +351,12 @@
assert.docEq(firstInsertChangeDoc.fullDocument, {_id: 1});
jsTestLog("Testing resume after one document.");
- res = assert.commandWorked(db.runCommand({
- aggregate: "resume1",
+ resumeCursor = startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
- cursor: {batchSize: 0}
- }));
- resumeCursor = res.cursor;
+ collection: db.resume1,
+ includeTs: true,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
assertNextBatchIsEmpty(resumeCursor);
jsTestLog("Inserting additional documents.");
@@ -368,23 +369,30 @@
assertNextBatchIsEmpty(resumeCursor);
jsTestLog("Testing resume after first document of three.");
- res = assert.commandWorked(db.runCommand({
- aggregate: "resume1",
+ resumeCursor = startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: firstInsertChangeDoc._id}}],
- cursor: {batchSize: 0}
- }));
- resumeCursor = res.cursor;
+ collection: db.resume1,
+ includeTs: true,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
assert.docEq(getOneDoc(resumeCursor), secondInsertChangeDoc);
assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
assertNextBatchIsEmpty(resumeCursor);
jsTestLog("Testing resume after second document of three.");
- res = assert.commandWorked(db.runCommand({
- aggregate: "resume1",
+ resumeCursor = startWatchingChanges({
pipeline: [{$changeStream: {resumeAfter: secondInsertChangeDoc._id}}],
- cursor: {batchSize: 0}
- }));
- resumeCursor = res.cursor;
+ collection: db.resume1,
+ includeTs: true,
+ aggregateOptions: {cursor: {batchSize: 0}},
+ });
assert.docEq(getOneDoc(resumeCursor), thirdInsertChangeDoc);
assertNextBatchIsEmpty(resumeCursor);
+
+ for (let testCursor of allCursors) {
+ assert.commandWorked(db.getSiblingDB(testCursor.db).runCommand({
+ killCursors: testCursor.coll,
+ cursors: [testCursor.cursorId]
+ }));
+ }
}());
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index 7c15055cbb8..44c1e8de956 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -27,6 +27,8 @@
const cmdResponse = assert.commandWorked(
db.runCommand({aggregate: collection.getName(), pipeline: pipeline, cursor: {}}));
assert.neq(cmdResponse.cursor.firstBatch.length, 0);
+ assert.commandWorked(
+ db.runCommand({killCursors: collection.getName(), cursors: [cmdResponse.cursor.id]}));
return cmdResponse.cursor.firstBatch[cmdResponse.cursor.firstBatch.length - 1];
}
@@ -78,6 +80,7 @@
db.runCommand({aggregate: coll.getName(), pipeline: [{$changeStream: {}}], cursor: {}}));
assert.writeOK(coll.insert({_id: "dummy"}));
const firstChange = getOneDoc(res.cursor);
+ assert.commandWorked(db.runCommand({killCursors: coll.getName(), cursors: [res.cursor.id]}));
jsTestLog("Testing change streams without 'fullDocument' specified");
// Test that not specifying 'fullDocument' does include a 'fullDocument' in the result for an
@@ -264,6 +267,8 @@
latestChange = getOneDoc(res.cursor);
assert.eq(latestChange.operationType, "invalidate");
assert(!latestChange.hasOwnProperty("fullDocument"));
+ assert.commandWorked(
+ db.runCommand({killCursors: db.collInvalidate.getName(), cursors: [res.cursor.id]}));
// TODO(russotto): Can just use "coll" here once read majority is working.
// For now, using the old collection results in us reading stale data sometimes.
@@ -307,4 +312,5 @@
}));
assert.eq(res.cursor.nextBatch.length, 1);
assert.docEq(res.cursor.nextBatch[0]["fullDocument"], {_id: "getMoreEnabled", updated: true});
+ assert.commandWorked(db.runCommand({killCursors: coll2.getName(), cursors: [res.cursor.id]}));
}());
diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
index 31aebf784e3..00e04b6641e 100644
--- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
+++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
@@ -137,6 +137,8 @@ eventFn();`,
awaitDataCursorId: changeCursorId,
event: () => assert.writeOK(db.unrelated_collection.insert({_id: "unrelated change"}))
});
+ assert.commandWorked(
+ db.runCommand({killCursors: changesCollection.getName(), cursors: [changeCursorId]}));
// Test that changes ignored by filtering in later stages of the pipeline will not cause the
// cursor to return before the getMore has exceeded maxTimeMS.
@@ -155,4 +157,6 @@ eventFn();`,
awaitDataCursorId: res.cursor.id,
event: () => assert.writeOK(db.changes.insert({_id: "should not appear"}))
});
+ assert.commandWorked(
+ db.runCommand({killCursors: changesCollection.getName(), cursors: [res.cursor.id]}));
}());