diff options
author | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-08-27 18:11:43 -0400 |
---|---|---|
committer | Charlie Swanson <charlie.swanson@mongodb.com> | 2018-08-28 17:21:00 -0400 |
commit | c727aeae4dbcafd8c33b607e8d21e2d3ae14c816 (patch) | |
tree | 55026a914aa945e62f153c37b593a8d919f7c953 | |
parent | 1ee551653a1ff9c154d94e59c429529eb3ea9098 (diff) | |
download | mongo-c727aeae4dbcafd8c33b607e8d21e2d3ae14c816.tar.gz |
SERVER-36899 Accept new 'startAfter' option in watch() helpers
-rw-r--r-- | jstests/change_streams/shell_helper.js | 40 | ||||
-rw-r--r-- | jstests/libs/change_stream_util.js | 6 | ||||
-rw-r--r-- | src/mongo/shell/collection.js | 23 | ||||
-rw-r--r-- | src/mongo/shell/db.js | 21 | ||||
-rw-r--r-- | src/mongo/shell/mongo.js | 38 |
5 files changed, 77 insertions, 51 deletions
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js index e833c053013..d440603b956 100644 --- a/jstests/change_streams/shell_helper.js +++ b/jstests/change_streams/shell_helper.js @@ -85,6 +85,11 @@ coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); checkNextChange(changeStreamCursor, {docId: 1}); + jsTestLog("Testing watch() with pipeline and startAfter"); + changeStreamCursor = + coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAfter: resumeToken}); + checkNextChange(changeStreamCursor, {docId: 1}); + jsTestLog("Testing watch() with pipeline and startAtOperationTime"); changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime}); @@ -150,7 +155,7 @@ }); jsTestLog("Testing the cursor gets closed when the collection gets dropped"); - changeStreamCursor = coll.watch([{$project: {_id: 0, clusterTime: 0}}]); + changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]); assert.writeOK(coll.insert({_id: 2, x: 1})); expected = { documentKey: {_id: 2}, @@ -170,5 +175,36 @@ expected = {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}; checkNextChange(changeStreamCursor, expected); // For single collection change streams, the drop should invalidate the stream. - assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"}); + const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"}); + + if (invalidateDoc) { + jsTestLog("Testing using the 'startAfter' option from the invalidate entry"); + assert.commandWorked(coll.insert({_id: "After drop"})); + let resumedFromInvalidate = + coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}}); + + // We should see the new insert after starting over. However, in sharded cluster + // passthroughs we may see more drop and invalidate notifications before we see the insert. + let firstChangeAfterDrop; + assert.soon(() => { + if (!resumedFromInvalidate.hasNext()) { + return false; + } + const next = resumedFromInvalidate.next(); + if (next.operationType == "invalidate") { + // Start again! + resumedFromInvalidate = + coll.watch([], {startAfter: next._id, collation: {locale: "simple"}}); + return false; + } + if (next.operationType == "drop") { + return false; + } + // THIS is the change we wanted. + firstChangeAfterDrop = next; + return true; + }); + + assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change)); + } }()); diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js index f6ef8084096..ca619e53af6 100644 --- a/jstests/libs/change_stream_util.js +++ b/jstests/libs/change_stream_util.js @@ -44,15 +44,19 @@ const runCommandChangeStreamPassthroughAware = * - single collection streams: drop, rename, and dropDatabase. * - whole DB streams: dropDatabase. * - whole cluster streams: none. + * Returns the invalidate document if there was one, or null otherwise. */ function assertInvalidateOp({cursor, opType}) { if (!isChangeStreamPassthrough() || (changeStreamPassthroughType() == ChangeStreamWatchMode.kDb && opType == "dropDatabase")) { assert.soon(() => cursor.hasNext()); - assert.eq(cursor.next().operationType, "invalidate"); + const invalidate = cursor.next(); + assert.eq(invalidate.operationType, "invalidate"); assert(cursor.isExhausted()); assert(cursor.isClosed()); + return invalidate; } + return null; } function ChangeStreamTest(_db, name = "ChangeStreamTest") { diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js index cc783066caf..55cafb2fac0 100644 --- a/src/mongo/shell/collection.js +++ b/src/mongo/shell/collection.js @@ -1556,26 +1556,11 @@ DBCollection.prototype.latencyStats = function(options) { DBCollection.prototype.watch = function(pipeline, options) { pipeline = pipeline || []; - options = options || {}; assert(pipeline instanceof Array, "'pipeline' argument must be an array"); - assert(options instanceof Object, "'options' argument must be an object"); - - let changeStreamStage = {fullDocument: options.fullDocument || "default"}; - delete options.fullDocument; - - if (options.hasOwnProperty("resumeAfter")) { - changeStreamStage.resumeAfter = options.resumeAfter; - delete options.resumeAfter; - } - - if (options.hasOwnProperty("startAtOperationTime")) { - changeStreamStage.startAtOperationTime = options.startAtOperationTime; - delete options.startAtOperationTime; - } - - pipeline.unshift({$changeStream: changeStreamStage}); - // Pass options "batchSize", "collation" and "maxAwaitTimeMS" down to aggregate(). - return this.aggregate(pipeline, options); + let changeStreamStage; + [changeStreamStage, aggOptions] = this.getMongo()._extractChangeStreamOptions(options); + pipeline.unshift(changeStreamStage); + return this.aggregate(pipeline, aggOptions); }; /** diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js index 1f0133f4449..9f6f497d5d1 100644 --- a/src/mongo/shell/db.js +++ b/src/mongo/shell/db.js @@ -1891,25 +1891,12 @@ var DB; DB.prototype.watch = function(pipeline, options) { pipeline = pipeline || []; - options = options || {}; assert(pipeline instanceof Array, "'pipeline' argument must be an array"); - assert(options instanceof Object, "'options' argument must be an object"); - let changeStreamStage = {fullDocument: options.fullDocument || "default"}; - delete options.fullDocument; - - if (options.hasOwnProperty("resumeAfter")) { - changeStreamStage.resumeAfter = options.resumeAfter; - delete options.resumeAfter; - } - - if (options.hasOwnProperty("startAtOperationTime")) { - changeStreamStage.startAtOperationTime = options.startAtOperationTime; - delete options.startAtOperationTime; - } - - pipeline.unshift({$changeStream: changeStreamStage}); - return this._runAggregate({aggregate: 1, pipeline: pipeline}, options); + let changeStreamStage; + [changeStreamStage, aggOptions] = this.getMongo()._extractChangeStreamOptions(options); + pipeline.unshift(changeStreamStage); + return this._runAggregate({aggregate: 1, pipeline: pipeline}, aggOptions); }; DB.prototype.getFreeMonitoringStatus = function() { diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js index c9688763c18..635c77dbc67 100644 --- a/src/mongo/shell/mongo.js +++ b/src/mongo/shell/mongo.js @@ -485,29 +485,43 @@ Mongo.prototype.waitForClusterTime = function waitForClusterTime(maxRetries = 10 throw new Error("failed waiting for non default clusterTime"); }; -Mongo.prototype.watch = function(pipeline, options) { - pipeline = pipeline || []; +/** + * Given the options object for a 'watch' helper, determines which options apply to the change + * stream stage, and which apply to the aggregate overall. Returns two objects: the change + * stream stage specification and the options for the aggregate command, respectively. + */ +Mongo.prototype._extractChangeStreamOptions = function(options) { options = options || {}; - assert(pipeline instanceof Array, "'pipeline' argument must be an array"); assert(options instanceof Object, "'options' argument must be an object"); - let changeStreamStage = { - allChangesForCluster: true, - fullDocument: options.fullDocument || "default" - }; - delete options.allChangesForCluster; + let changeStreamOptions = {fullDocument: options.fullDocument || "default"}; delete options.fullDocument; if (options.hasOwnProperty("resumeAfter")) { - changeStreamStage.resumeAfter = options.resumeAfter; + changeStreamOptions.resumeAfter = options.resumeAfter; delete options.resumeAfter; } + if (options.hasOwnProperty("startAfter")) { + changeStreamOptions.startAfter = options.startAfter; + delete options.startAfter; + } + if (options.hasOwnProperty("startAtOperationTime")) { - changeStreamStage.startAtOperationTime = options.startAtOperationTime; + changeStreamOptions.startAtOperationTime = options.startAtOperationTime; delete options.startAtOperationTime; } - pipeline.unshift({$changeStream: changeStreamStage}); - return this.getDB("admin")._runAggregate({aggregate: 1, pipeline: pipeline}, options); + return [{$changeStream: changeStreamOptions}, options]; +}; + +Mongo.prototype.watch = function(pipeline, options) { + pipeline = pipeline || []; + assert(pipeline instanceof Array, "'pipeline' argument must be an array"); + + let changeStreamStage; + [changeStreamStage, aggOptions] = this._extractChangeStreamOptions(options); + changeStreamStage.$changeStream.allChangesForCluster = true; + pipeline.unshift(changeStreamStage); + return this.getDB("admin")._runAggregate({aggregate: 1, pipeline: pipeline}, aggOptions); }; |