summaryrefslogtreecommitdiff
path: root/jstests/change_streams
diff options
context:
space:
mode:
authorBernard Gorman <bernard.gorman@gmail.com>2017-09-25 18:14:27 -0400
committerBernard Gorman <bernard.gorman@gmail.com>2017-09-26 12:44:30 -0400
commitb3b44c1ecd30adaf7421ef9c93a237693a1fca06 (patch)
tree8f96e8027522352963e5a62ef05d4e0678887f1f /jstests/change_streams
parent4edbec2c6caf55412e7aad36af6f33fcc8c67b29 (diff)
downloadmongo-b3b44c1ecd30adaf7421ef9c93a237693a1fca06.tar.gz
SERVER-29141 Refactor the way mongos handles tailable awaitData cursors
Diffstat (limited to 'jstests/change_streams')
-rw-r--r--jstests/change_streams/change_stream.js30
-rw-r--r--jstests/change_streams/lookup_post_image.js24
-rw-r--r--jstests/change_streams/only_wake_getmore_for_relevant_changes.js28
3 files changed, 46 insertions, 36 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js
index 5e06eae5b52..09edb35f647 100644
--- a/jstests/change_streams/change_stream.js
+++ b/jstests/change_streams/change_stream.js
@@ -52,22 +52,20 @@
readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
FixtureHelpers.awaitReplication();
- if (expectedBatch.length == 0)
- FixtureHelpers.runCommandOnEachPrimary({
- dbName: "admin",
- cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}
- });
+ if (expectedBatch.length == 0) {
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
+ }
let res = assert.commandWorked(db.runCommand({
getMore: cursor.id,
collection: getCollectionNameFromFullNamespace(cursor.ns),
maxTimeMS: 5 * 60 * 1000,
batchSize: (expectedBatch.length + 1)
}));
- if (expectedBatch.length == 0)
- FixtureHelpers.runCommandOnEachPrimary({
- dbName: "admin",
- cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}
- });
+ if (expectedBatch.length == 0) {
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
+ }
assert.docEq(res.cursor.nextBatch, expectedBatch);
}
@@ -296,20 +294,16 @@
readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
FixtureHelpers.awaitReplication();
- FixtureHelpers.runCommandOnEachPrimary({
- dbName: "admin",
- cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}
- });
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
let res = assert.commandWorked(db.runCommand({
getMore: cursor.id,
collection: getCollectionNameFromFullNamespace(cursor.ns),
batchSize: 1
}));
assert.eq(res.cursor.nextBatch.length, 1);
- FixtureHelpers.runCommandOnEachPrimary({
- dbName: "admin",
- cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}
- });
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
return res.cursor.nextBatch[0];
}
diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js
index fa9301fa142..24e8c41b477 100644
--- a/jstests/change_streams/lookup_post_image.js
+++ b/jstests/change_streams/lookup_post_image.js
@@ -56,20 +56,16 @@
find: "foo",
readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()}
}));
- FixtureHelpers.runCommandOnEachPrimary({
- dbName: "admin",
- cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}
- });
+ assert.commandWorked(db.adminCommand(
+ {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"}));
let res = assert.commandWorked(db.runCommand({
getMore: cursor.id,
collection: getCollectionNameFromFullNamespace(cursor.ns),
batchSize: 1
}));
assert.eq(res.cursor.nextBatch.length, 1);
- FixtureHelpers.runCommandOnEachPrimary({
- dbName: "admin",
- cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}
- });
+ assert.commandWorked(
+ db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"}));
return res.cursor.nextBatch[0];
}
@@ -191,6 +187,8 @@
// cursors cannot be established after a collection drop.
assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}}));
+
+ // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet.
res = assert.commandWorked(db.runCommand({
aggregate: coll.getName(),
pipeline: [
@@ -200,7 +198,7 @@
cursor: {batchSize: 0}
}));
assert.neq(res.cursor.id, 0);
- // Save another stream to test lookup after the collecton gets recreated.
+ // Save another stream to test post-image lookup after the collection is recreated.
const resBeforeDrop = assert.commandWorked(db.runCommand({
aggregate: coll.getName(),
pipeline: [
@@ -211,18 +209,20 @@
}));
assert.neq(resBeforeDrop.cursor.id, 0);
+ // Drop the collection and wait until two-phase drop finishes.
coll.drop();
- // Wait until two-phase drop finishes.
assert.soon(function() {
return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName());
});
+ // Check the next $changeStream entry; this is the test document inserted above. The collection
+ // has been dropped, so our attempt to look up the post-image results in a null document.
latestChange = getOneDoc(res.cursor);
assert.eq(latestChange.operationType, "update");
assert(latestChange.hasOwnProperty("fullDocument"));
assert.eq(latestChange.fullDocument, null);
- // Test establishing new cursors with resume token on dropped collections failes.
+ // Test establishing new cursors with resume token on dropped collections fails.
res = db.runCommand({
aggregate: coll.getName(),
pipeline: [
@@ -241,6 +241,8 @@
// different UUID.
assert.commandWorked(db.createCollection(coll.getName()));
assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"}));
+
+ // Confirm that the next entry's post-image is null since new collection has a different UUID.
latestChange = getOneDoc(resBeforeDrop.cursor);
assert.eq(latestChange.operationType, "update");
assert(latestChange.hasOwnProperty("fullDocument"));
diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
index 00e04b6641e..87c082199b7 100644
--- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
+++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js
@@ -14,7 +14,8 @@
* Note that 'event' will not have access to any local variables, since it will be executed in a
* different scope.
*/
- function runGetMoreInParallelWithEvent({collection, awaitDataCursorId, maxTimeMS, event}) {
+ function runGetMoreInParallelWithEvent(
+ {collection, awaitDataCursorId, identifyingComment, maxTimeMS, event}) {
// In some extreme cases, the parallel shell can take longer to start up than it takes for
// the getMore to run. To prevent this from happening, the main thread waits for an insert
// into "sentinel", to signal that the parallel shell has started and is waiting for the
@@ -29,8 +30,11 @@ assert.writeOK(db.getCollection("${ shellSentinelCollection.getName() }").insert
// Wait for the getMore to appear in currentOp.
assert.soon(function() {
- return db.currentOp({op: "getmore", "command.collection": "${collection.getName()}"})
- .inprog.length === 1;
+ return db.currentOp({
+ op: "getmore",
+ "command.collection": "${collection.getName()}",
+ "originatingCommand.comment": "${identifyingComment}",
+ }).inprog.length === 1;
});
const eventFn = ${ event.toString() };
@@ -56,10 +60,12 @@ eventFn();`,
* @param [NumberLong] awaitDataCursorId - the id of the cursor to use in the getMore command.
* @param [Function] event - the event that should be run during the getMore.
*/
- function assertEventDoesNotWakeCursor({collection, awaitDataCursorId, event}) {
+ function assertEventDoesNotWakeCursor(
+ {collection, awaitDataCursorId, identifyingComment, event}) {
const {result, elapsedMs} = runGetMoreInParallelWithEvent({
collection: collection,
awaitDataCursorId: awaitDataCursorId,
+ identifyingComment: identifyingComment,
maxTimeMS: 1000,
event: event,
});
@@ -79,7 +85,7 @@ eventFn();`,
* @param [NumberLong] awaitDataCursorId - the id of the cursor to use in the getMore command.
* @param [Function] event - the event that should be run during the getMore.
*/
- function assertEventWakesCursor({collection, awaitDataCursorId, event}) {
+ function assertEventWakesCursor({collection, awaitDataCursorId, identifyingComment, event}) {
// Run the original event, then (while still in the parallel shell) assert that the getMore
// finishes soon after. This will be run in a parallel shell, which will not have a variable
// 'event' in scope, so we'll have to stringify it here.
@@ -88,6 +94,7 @@ eventFn();`,
const {result, elapsedMs} = runGetMoreInParallelWithEvent({
collection: collection,
awaitDataCursorId: awaitDataCursorId,
+ identifyingComment: identifyingComment,
maxTimeMS: thirtyMinutes,
event: event,
});
@@ -102,11 +109,13 @@ eventFn();`,
assert.commandWorked(db.createCollection(changesCollection.getName()));
// Start a change stream cursor.
+ const wholeCollectionStreamComment = "change stream on entire collection";
let res = assert.commandWorked(db.runCommand({
aggregate: changesCollection.getName(),
// Project out the timestamp, since that's subject to change unpredictably.
pipeline: [{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}],
- cursor: {}
+ cursor: {},
+ comment: wholeCollectionStreamComment
}));
const changeCursorId = res.cursor.id;
assert.neq(changeCursorId, 0);
@@ -117,6 +126,7 @@ eventFn();`,
const getMoreResponse = assertEventWakesCursor({
collection: changesCollection,
awaitDataCursorId: changeCursorId,
+ identifyingComment: wholeCollectionStreamComment,
event: () => assert.writeOK(db.changes.insert({_id: "wake up"}))
});
assert.eq(getMoreResponse.cursor.nextBatch.length, 1);
@@ -135,6 +145,7 @@ eventFn();`,
assertEventDoesNotWakeCursor({
collection: changesCollection,
awaitDataCursorId: changeCursorId,
+ identifyingComment: wholeCollectionStreamComment,
event: () => assert.writeOK(db.unrelated_collection.insert({_id: "unrelated change"}))
});
assert.commandWorked(
@@ -142,12 +153,14 @@ eventFn();`,
// Test that changes ignored by filtering in later stages of the pipeline will not cause the
// cursor to return before the getMore has exceeded maxTimeMS.
+ const noInvalidatesComment = "change stream filtering invalidate entries";
res = assert.commandWorked(db.runCommand({
aggregate: changesCollection.getName(),
// This pipeline filters changes to only invalidates, so regular inserts should not cause
// the awaitData to end early.
pipeline: [{$changeStream: {}}, {$match: {operationType: "invalidate"}}],
- cursor: {}
+ cursor: {},
+ comment: noInvalidatesComment
}));
assert.eq(
res.cursor.firstBatch.length, 0, "did not expect any invalidations on changes collection");
@@ -155,6 +168,7 @@ eventFn();`,
assertEventDoesNotWakeCursor({
collection: changesCollection,
awaitDataCursorId: res.cursor.id,
+ identifyingComment: noInvalidatesComment,
event: () => assert.writeOK(db.changes.insert({_id: "should not appear"}))
});
assert.commandWorked(