diff options
author | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-25 18:14:27 -0400 |
---|---|---|
committer | Bernard Gorman <bernard.gorman@gmail.com> | 2017-09-26 12:44:30 -0400 |
commit | b3b44c1ecd30adaf7421ef9c93a237693a1fca06 (patch) | |
tree | 8f96e8027522352963e5a62ef05d4e0678887f1f /jstests/change_streams | |
parent | 4edbec2c6caf55412e7aad36af6f33fcc8c67b29 (diff) | |
download | mongo-b3b44c1ecd30adaf7421ef9c93a237693a1fca06.tar.gz |
SERVER-29141 Refactor the way mongos handles tailable awaitData cursors
Diffstat (limited to 'jstests/change_streams')
-rw-r--r-- | jstests/change_streams/change_stream.js | 30 | ||||
-rw-r--r-- | jstests/change_streams/lookup_post_image.js | 24 | ||||
-rw-r--r-- | jstests/change_streams/only_wake_getmore_for_relevant_changes.js | 28 |
3 files changed, 46 insertions, 36 deletions
diff --git a/jstests/change_streams/change_stream.js b/jstests/change_streams/change_stream.js index 5e06eae5b52..09edb35f647 100644 --- a/jstests/change_streams/change_stream.js +++ b/jstests/change_streams/change_stream.js @@ -52,22 +52,20 @@ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); FixtureHelpers.awaitReplication(); - if (expectedBatch.length == 0) - FixtureHelpers.runCommandOnEachPrimary({ - dbName: "admin", - cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"} - }); + if (expectedBatch.length == 0) { + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); + } let res = assert.commandWorked(db.runCommand({ getMore: cursor.id, collection: getCollectionNameFromFullNamespace(cursor.ns), maxTimeMS: 5 * 60 * 1000, batchSize: (expectedBatch.length + 1) })); - if (expectedBatch.length == 0) - FixtureHelpers.runCommandOnEachPrimary({ - dbName: "admin", - cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"} - }); + if (expectedBatch.length == 0) { + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); + } assert.docEq(res.cursor.nextBatch, expectedBatch); } @@ -296,20 +294,16 @@ readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); FixtureHelpers.awaitReplication(); - FixtureHelpers.runCommandOnEachPrimary({ - dbName: "admin", - cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"} - }); + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); let res = assert.commandWorked(db.runCommand({ getMore: cursor.id, collection: getCollectionNameFromFullNamespace(cursor.ns), batchSize: 1 })); assert.eq(res.cursor.nextBatch.length, 1); - FixtureHelpers.runCommandOnEachPrimary({ - dbName: "admin", - cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"} - }); + assert.commandWorked( + db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); return res.cursor.nextBatch[0]; } diff --git a/jstests/change_streams/lookup_post_image.js b/jstests/change_streams/lookup_post_image.js index fa9301fa142..24e8c41b477 100644 --- a/jstests/change_streams/lookup_post_image.js +++ b/jstests/change_streams/lookup_post_image.js @@ -56,20 +56,16 @@ find: "foo", readConcern: {level: "local", afterClusterTime: db.getSession().getOperationTime()} })); - FixtureHelpers.runCommandOnEachPrimary({ - dbName: "admin", - cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"} - }); + assert.commandWorked(db.adminCommand( + {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "alwaysOn"})); let res = assert.commandWorked(db.runCommand({ getMore: cursor.id, collection: getCollectionNameFromFullNamespace(cursor.ns), batchSize: 1 })); assert.eq(res.cursor.nextBatch.length, 1); - FixtureHelpers.runCommandOnEachPrimary({ - dbName: "admin", - cmdObj: {configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"} - }); + assert.commandWorked( + db.adminCommand({configureFailPoint: "disableAwaitDataForGetMoreCmd", mode: "off"})); return res.cursor.nextBatch[0]; } @@ -191,6 +187,8 @@ // cursors cannot be established after a collection drop. assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); assert.writeOK(coll.update({_id: "fullDocument is lookup 2"}, {$set: {updated: true}})); + + // Open a $changeStream cursor with batchSize 0, so that no oplog entries are retrieved yet. res = assert.commandWorked(db.runCommand({ aggregate: coll.getName(), pipeline: [ @@ -200,7 +198,7 @@ cursor: {batchSize: 0} })); assert.neq(res.cursor.id, 0); - // Save another stream to test lookup after the collecton gets recreated. + // Save another stream to test post-image lookup after the collection is recreated. const resBeforeDrop = assert.commandWorked(db.runCommand({ aggregate: coll.getName(), pipeline: [ @@ -211,18 +209,20 @@ })); assert.neq(resBeforeDrop.cursor.id, 0); + // Drop the collection and wait until two-phase drop finishes. coll.drop(); - // Wait until two-phase drop finishes. assert.soon(function() { return !TwoPhaseDropCollectionTest.collectionIsPendingDropInDatabase(db, coll.getName()); }); + // Check the next $changeStream entry; this is the test document inserted above. The collection + // has been dropped, so our attempt to look up the post-image results in a null document. latestChange = getOneDoc(res.cursor); assert.eq(latestChange.operationType, "update"); assert(latestChange.hasOwnProperty("fullDocument")); assert.eq(latestChange.fullDocument, null); - // Test establishing new cursors with resume token on dropped collections failes. + // Test establishing new cursors with resume token on dropped collections fails. res = db.runCommand({ aggregate: coll.getName(), pipeline: [ @@ -241,6 +241,8 @@ // different UUID. assert.commandWorked(db.createCollection(coll.getName())); assert.writeOK(coll.insert({_id: "fullDocument is lookup 2"})); + + // Confirm that the next entry's post-image is null since new collection has a different UUID. latestChange = getOneDoc(resBeforeDrop.cursor); assert.eq(latestChange.operationType, "update"); assert(latestChange.hasOwnProperty("fullDocument")); diff --git a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js index 00e04b6641e..87c082199b7 100644 --- a/jstests/change_streams/only_wake_getmore_for_relevant_changes.js +++ b/jstests/change_streams/only_wake_getmore_for_relevant_changes.js @@ -14,7 +14,8 @@ * Note that 'event' will not have access to any local variables, since it will be executed in a * different scope. */ - function runGetMoreInParallelWithEvent({collection, awaitDataCursorId, maxTimeMS, event}) { + function runGetMoreInParallelWithEvent( + {collection, awaitDataCursorId, identifyingComment, maxTimeMS, event}) { // In some extreme cases, the parallel shell can take longer to start up than it takes for // the getMore to run. To prevent this from happening, the main thread waits for an insert // into "sentinel", to signal that the parallel shell has started and is waiting for the @@ -29,8 +30,11 @@ assert.writeOK(db.getCollection("${ shellSentinelCollection.getName() }").insert // Wait for the getMore to appear in currentOp. assert.soon(function() { - return db.currentOp({op: "getmore", "command.collection": "${collection.getName()}"}) - .inprog.length === 1; + return db.currentOp({ + op: "getmore", + "command.collection": "${collection.getName()}", + "originatingCommand.comment": "${identifyingComment}", + }).inprog.length === 1; }); const eventFn = ${ event.toString() }; @@ -56,10 +60,12 @@ eventFn();`, * @param [NumberLong] awaitDataCursorId - the id of the cursor to use in the getMore command. * @param [Function] event - the event that should be run during the getMore. */ - function assertEventDoesNotWakeCursor({collection, awaitDataCursorId, event}) { + function assertEventDoesNotWakeCursor( + {collection, awaitDataCursorId, identifyingComment, event}) { const {result, elapsedMs} = runGetMoreInParallelWithEvent({ collection: collection, awaitDataCursorId: awaitDataCursorId, + identifyingComment: identifyingComment, maxTimeMS: 1000, event: event, }); @@ -79,7 +85,7 @@ eventFn();`, * @param [NumberLong] awaitDataCursorId - the id of the cursor to use in the getMore command. * @param [Function] event - the event that should be run during the getMore. */ - function assertEventWakesCursor({collection, awaitDataCursorId, event}) { + function assertEventWakesCursor({collection, awaitDataCursorId, identifyingComment, event}) { // Run the original event, then (while still in the parallel shell) assert that the getMore // finishes soon after. This will be run in a parallel shell, which will not have a variable // 'event' in scope, so we'll have to stringify it here. @@ -88,6 +94,7 @@ eventFn();`, const {result, elapsedMs} = runGetMoreInParallelWithEvent({ collection: collection, awaitDataCursorId: awaitDataCursorId, + identifyingComment: identifyingComment, maxTimeMS: thirtyMinutes, event: event, }); @@ -102,11 +109,13 @@ eventFn();`, assert.commandWorked(db.createCollection(changesCollection.getName())); // Start a change stream cursor. + const wholeCollectionStreamComment = "change stream on entire collection"; let res = assert.commandWorked(db.runCommand({ aggregate: changesCollection.getName(), // Project out the timestamp, since that's subject to change unpredictably. pipeline: [{$changeStream: {}}, {$project: {"_id.clusterTime": 0}}], - cursor: {} + cursor: {}, + comment: wholeCollectionStreamComment })); const changeCursorId = res.cursor.id; assert.neq(changeCursorId, 0); @@ -117,6 +126,7 @@ eventFn();`, const getMoreResponse = assertEventWakesCursor({ collection: changesCollection, awaitDataCursorId: changeCursorId, + identifyingComment: wholeCollectionStreamComment, event: () => assert.writeOK(db.changes.insert({_id: "wake up"})) }); assert.eq(getMoreResponse.cursor.nextBatch.length, 1); @@ -135,6 +145,7 @@ eventFn();`, assertEventDoesNotWakeCursor({ collection: changesCollection, awaitDataCursorId: changeCursorId, + identifyingComment: wholeCollectionStreamComment, event: () => assert.writeOK(db.unrelated_collection.insert({_id: "unrelated change"})) }); assert.commandWorked( @@ -142,12 +153,14 @@ eventFn();`, // Test that changes ignored by filtering in later stages of the pipeline will not cause the // cursor to return before the getMore has exceeded maxTimeMS. + const noInvalidatesComment = "change stream filtering invalidate entries"; res = assert.commandWorked(db.runCommand({ aggregate: changesCollection.getName(), // This pipeline filters changes to only invalidates, so regular inserts should not cause // the awaitData to end early. pipeline: [{$changeStream: {}}, {$match: {operationType: "invalidate"}}], - cursor: {} + cursor: {}, + comment: noInvalidatesComment })); assert.eq( res.cursor.firstBatch.length, 0, "did not expect any invalidations on changes collection"); @@ -155,6 +168,7 @@ eventFn();`, assertEventDoesNotWakeCursor({ collection: changesCollection, awaitDataCursorId: res.cursor.id, + identifyingComment: noInvalidatesComment, event: () => assert.writeOK(db.changes.insert({_id: "should not appear"})) }); assert.commandWorked( |