summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCharlie Swanson <charlie.swanson@mongodb.com>2018-08-27 18:11:43 -0400
committerCharlie Swanson <charlie.swanson@mongodb.com>2018-08-28 17:21:00 -0400
commitc727aeae4dbcafd8c33b607e8d21e2d3ae14c816 (patch)
tree55026a914aa945e62f153c37b593a8d919f7c953
parent1ee551653a1ff9c154d94e59c429529eb3ea9098 (diff)
downloadmongo-c727aeae4dbcafd8c33b607e8d21e2d3ae14c816.tar.gz
SERVER-36899 Accept new 'startAfter' option in watch() helpers
-rw-r--r--jstests/change_streams/shell_helper.js40
-rw-r--r--jstests/libs/change_stream_util.js6
-rw-r--r--src/mongo/shell/collection.js23
-rw-r--r--src/mongo/shell/db.js21
-rw-r--r--src/mongo/shell/mongo.js38
5 files changed, 77 insertions, 51 deletions
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js
index e833c053013..d440603b956 100644
--- a/jstests/change_streams/shell_helper.js
+++ b/jstests/change_streams/shell_helper.js
@@ -85,6 +85,11 @@
coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
checkNextChange(changeStreamCursor, {docId: 1});
+ jsTestLog("Testing watch() with pipeline and startAfter");
+ changeStreamCursor =
+ coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {startAfter: resumeToken});
+ checkNextChange(changeStreamCursor, {docId: 1});
+
jsTestLog("Testing watch() with pipeline and startAtOperationTime");
changeStreamCursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}],
{startAtOperationTime: resumeTime});
@@ -150,7 +155,7 @@
});
jsTestLog("Testing the cursor gets closed when the collection gets dropped");
- changeStreamCursor = coll.watch([{$project: {_id: 0, clusterTime: 0}}]);
+ changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]);
assert.writeOK(coll.insert({_id: 2, x: 1}));
expected = {
documentKey: {_id: 2},
@@ -170,5 +175,36 @@
expected = {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}};
checkNextChange(changeStreamCursor, expected);
// For single collection change streams, the drop should invalidate the stream.
- assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"});
+ const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"});
+
+ if (invalidateDoc) {
+ jsTestLog("Testing using the 'startAfter' option from the invalidate entry");
+ assert.commandWorked(coll.insert({_id: "After drop"}));
+ let resumedFromInvalidate =
+ coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}});
+
+ // We should see the new insert after starting over. However, in sharded cluster
+ // passthroughs we may see more drop and invalidate notifications before we see the insert.
+ let firstChangeAfterDrop;
+ assert.soon(() => {
+ if (!resumedFromInvalidate.hasNext()) {
+ return false;
+ }
+ const next = resumedFromInvalidate.next();
+ if (next.operationType == "invalidate") {
+ // Start again!
+ resumedFromInvalidate =
+ coll.watch([], {startAfter: next._id, collation: {locale: "simple"}});
+ return false;
+ }
+ if (next.operationType == "drop") {
+ return false;
+ }
+ // THIS is the change we wanted.
+ firstChangeAfterDrop = next;
+ return true;
+ });
+
+ assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change));
+ }
}());
diff --git a/jstests/libs/change_stream_util.js b/jstests/libs/change_stream_util.js
index f6ef8084096..ca619e53af6 100644
--- a/jstests/libs/change_stream_util.js
+++ b/jstests/libs/change_stream_util.js
@@ -44,15 +44,19 @@ const runCommandChangeStreamPassthroughAware =
* - single collection streams: drop, rename, and dropDatabase.
* - whole DB streams: dropDatabase.
* - whole cluster streams: none.
+ * Returns the invalidate document if there was one, or null otherwise.
*/
function assertInvalidateOp({cursor, opType}) {
if (!isChangeStreamPassthrough() ||
(changeStreamPassthroughType() == ChangeStreamWatchMode.kDb && opType == "dropDatabase")) {
assert.soon(() => cursor.hasNext());
- assert.eq(cursor.next().operationType, "invalidate");
+ const invalidate = cursor.next();
+ assert.eq(invalidate.operationType, "invalidate");
assert(cursor.isExhausted());
assert(cursor.isClosed());
+ return invalidate;
}
+ return null;
}
function ChangeStreamTest(_db, name = "ChangeStreamTest") {
diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js
index cc783066caf..55cafb2fac0 100644
--- a/src/mongo/shell/collection.js
+++ b/src/mongo/shell/collection.js
@@ -1556,26 +1556,11 @@ DBCollection.prototype.latencyStats = function(options) {
DBCollection.prototype.watch = function(pipeline, options) {
pipeline = pipeline || [];
- options = options || {};
assert(pipeline instanceof Array, "'pipeline' argument must be an array");
- assert(options instanceof Object, "'options' argument must be an object");
-
- let changeStreamStage = {fullDocument: options.fullDocument || "default"};
- delete options.fullDocument;
-
- if (options.hasOwnProperty("resumeAfter")) {
- changeStreamStage.resumeAfter = options.resumeAfter;
- delete options.resumeAfter;
- }
-
- if (options.hasOwnProperty("startAtOperationTime")) {
- changeStreamStage.startAtOperationTime = options.startAtOperationTime;
- delete options.startAtOperationTime;
- }
-
- pipeline.unshift({$changeStream: changeStreamStage});
- // Pass options "batchSize", "collation" and "maxAwaitTimeMS" down to aggregate().
- return this.aggregate(pipeline, options);
+ let changeStreamStage;
+ [changeStreamStage, aggOptions] = this.getMongo()._extractChangeStreamOptions(options);
+ pipeline.unshift(changeStreamStage);
+ return this.aggregate(pipeline, aggOptions);
};
/**
diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js
index 1f0133f4449..9f6f497d5d1 100644
--- a/src/mongo/shell/db.js
+++ b/src/mongo/shell/db.js
@@ -1891,25 +1891,12 @@ var DB;
DB.prototype.watch = function(pipeline, options) {
pipeline = pipeline || [];
- options = options || {};
assert(pipeline instanceof Array, "'pipeline' argument must be an array");
- assert(options instanceof Object, "'options' argument must be an object");
- let changeStreamStage = {fullDocument: options.fullDocument || "default"};
- delete options.fullDocument;
-
- if (options.hasOwnProperty("resumeAfter")) {
- changeStreamStage.resumeAfter = options.resumeAfter;
- delete options.resumeAfter;
- }
-
- if (options.hasOwnProperty("startAtOperationTime")) {
- changeStreamStage.startAtOperationTime = options.startAtOperationTime;
- delete options.startAtOperationTime;
- }
-
- pipeline.unshift({$changeStream: changeStreamStage});
- return this._runAggregate({aggregate: 1, pipeline: pipeline}, options);
+ let changeStreamStage;
+ [changeStreamStage, aggOptions] = this.getMongo()._extractChangeStreamOptions(options);
+ pipeline.unshift(changeStreamStage);
+ return this._runAggregate({aggregate: 1, pipeline: pipeline}, aggOptions);
};
DB.prototype.getFreeMonitoringStatus = function() {
diff --git a/src/mongo/shell/mongo.js b/src/mongo/shell/mongo.js
index c9688763c18..635c77dbc67 100644
--- a/src/mongo/shell/mongo.js
+++ b/src/mongo/shell/mongo.js
@@ -485,29 +485,43 @@ Mongo.prototype.waitForClusterTime = function waitForClusterTime(maxRetries = 10
throw new Error("failed waiting for non default clusterTime");
};
-Mongo.prototype.watch = function(pipeline, options) {
- pipeline = pipeline || [];
+/**
+ * Given the options object for a 'watch' helper, determines which options apply to the change
+ * stream stage, and which apply to the aggregate overall. Returns two objects: the change
+ * stream stage specification and the options for the aggregate command, respectively.
+ */
+Mongo.prototype._extractChangeStreamOptions = function(options) {
options = options || {};
- assert(pipeline instanceof Array, "'pipeline' argument must be an array");
assert(options instanceof Object, "'options' argument must be an object");
- let changeStreamStage = {
- allChangesForCluster: true,
- fullDocument: options.fullDocument || "default"
- };
- delete options.allChangesForCluster;
+ let changeStreamOptions = {fullDocument: options.fullDocument || "default"};
delete options.fullDocument;
if (options.hasOwnProperty("resumeAfter")) {
- changeStreamStage.resumeAfter = options.resumeAfter;
+ changeStreamOptions.resumeAfter = options.resumeAfter;
delete options.resumeAfter;
}
+ if (options.hasOwnProperty("startAfter")) {
+ changeStreamOptions.startAfter = options.startAfter;
+ delete options.startAfter;
+ }
+
if (options.hasOwnProperty("startAtOperationTime")) {
- changeStreamStage.startAtOperationTime = options.startAtOperationTime;
+ changeStreamOptions.startAtOperationTime = options.startAtOperationTime;
delete options.startAtOperationTime;
}
- pipeline.unshift({$changeStream: changeStreamStage});
- return this.getDB("admin")._runAggregate({aggregate: 1, pipeline: pipeline}, options);
+ return [{$changeStream: changeStreamOptions}, options];
+};
+
+Mongo.prototype.watch = function(pipeline, options) {
+ pipeline = pipeline || [];
+ assert(pipeline instanceof Array, "'pipeline' argument must be an array");
+
+ let changeStreamStage;
+ [changeStreamStage, aggOptions] = this._extractChangeStreamOptions(options);
+ changeStreamStage.$changeStream.allChangesForCluster = true;
+ pipeline.unshift(changeStreamStage);
+ return this.getDB("admin")._runAggregate({aggregate: 1, pipeline: pipeline}, aggOptions);
};