summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/change_streams/change_stream_collation.js24
-rw-r--r--jstests/change_streams/change_stream_shell_helper.js154
-rw-r--r--src/mongo/scripting/mozjs/cursor.cpp14
-rw-r--r--src/mongo/scripting/mozjs/cursor.h3
-rw-r--r--src/mongo/shell/collection.js19
-rw-r--r--src/mongo/shell/db.js5
-rw-r--r--src/mongo/shell/query.js37
7 files changed, 251 insertions, 5 deletions
diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js
index a3ef58a14de..5093bbe4ed0 100644
--- a/jstests/change_streams/change_stream_collation.js
+++ b/jstests/change_streams/change_stream_collation.js
@@ -181,4 +181,28 @@
{cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]});
}());
+ // Test that collation is supported by the shell helper.
+ // Test that creating a change stream with a non-default collation against a collection that has
+ // a simple default collation will use the collation specified on the operation.
+ (function() {
+ const noCollationCollection = db.change_stream_no_collation;
+ noCollationCollection.drop();
+ assert.commandWorked(db.runCommand({create: noCollationCollection.getName()}));
+
+ const cursor = noCollationCollection.watch(
+ [
+ {$match: {"fullDocument.text": "abc"}},
+ {$project: {docId: "$documentKey._id", _id: 0}}
+ ],
+ {collation: caseInsensitive});
+ assert(!cursor.hasNext());
+ assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"}));
+ assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"}));
+ assert(cursor.hasNext());
+ assert.docEq(cursor.next(), {docId: 0});
+ assert(cursor.hasNext());
+ assert.docEq(cursor.next(), {docId: 1});
+ assert(!cursor.hasNext());
+ }());
+
})();
diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js
new file mode 100644
index 00000000000..4e972e0fd6f
--- /dev/null
+++ b/jstests/change_streams/change_stream_shell_helper.js
@@ -0,0 +1,154 @@
+// Test DBCollection.watch() shell helper and its options.
+
+(function() {
+ "use strict";
+
+ const collName = "change_stream_shell_helper";
+ const coll = db[collName];
+ coll.drop();
+
+ function checkNextChange(cursor, expected) {
+ assert(cursor.hasNext());
+ assert.docEq(cursor.next(), expected);
+ }
+
+ function testCommandIsCalled(testFunc, checkFunc) {
+ const mongoRunCommandOriginal = Mongo.prototype.runCommand;
+
+ const sentinel = {};
+ let cmdObjSeen = sentinel;
+
+ Mongo.prototype.runCommand = function runCommandSpy(dbName, cmdObj, options) {
+ cmdObjSeen = cmdObj;
+ return mongoRunCommandOriginal.apply(this, arguments);
+ };
+
+ try {
+ assert.doesNotThrow(testFunc);
+ } finally {
+ Mongo.prototype.runCommand = mongoRunCommandOriginal;
+ }
+
+ if (cmdObjSeen === sentinel) {
+ throw new Error("Mongo.prototype.runCommand() was never called: " +
+ testFunc.toString());
+ }
+
+ checkFunc(cmdObjSeen);
+ }
+
+ jsTestLog("Testing watch() without options");
+ let cursor = coll.watch();
+ assert(!cursor.hasNext());
+ assert.writeOK(coll.insert({_id: 0, x: 1}));
+ assert(cursor.hasNext());
+ let change = cursor.next();
+ assert(!cursor.hasNext());
+ let expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, x: 1},
+ ns: {db: "test", coll: collName},
+ operationType: "insert",
+ };
+ assert("_id" in change, "Got unexpected change: " + tojson(change));
+ // Remember the _id of the first op to resume the stream.
+ const resumeToken = change._id;
+ delete change._id;
+ assert.docEq(change, expected);
+
+ jsTestLog("Testing watch() with pipeline");
+ cursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]);
+ assert.writeOK(coll.insert({_id: 1, x: 1}));
+ checkNextChange(cursor, {docId: 1});
+
+ jsTestLog("Testing watch() with pipeline and resumeAfter");
+ cursor =
+ coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken});
+ checkNextChange(cursor, {docId: 1});
+
+ jsTestLog("Testing watch() with updateLookup");
+ cursor = coll.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"});
+ assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}}));
+ expected = {
+ documentKey: {_id: 0},
+ fullDocument: {_id: 0, x: 10},
+ ns: {db: "test", coll: collName},
+ operationType: "update",
+ updateDescription: {removedFields: [], updatedFields: {x: 10}},
+ };
+ checkNextChange(cursor, expected);
+
+ jsTestLog("Testing watch() with batchSize");
+ // Only test mongod because mongos uses batch size 0 for aggregate commands internally to
+ // establish cursors quickly.
+ // GetMore on mongos doesn't respect batch size either due to SERVER-31992.
+ const isMongos = db.runCommand({isdbgrid: 1}).isdbgrid;
+ if (!isMongos) {
+ // Increase a field by 5 times and verify the batch size is respected.
+ for (let i = 0; i < 5; i++) {
+ assert.writeOK(coll.update({_id: 1}, {$inc: {x: 1}}));
+ }
+
+ // Only watch the "update" changes of the specific doc since the beginning.
+ cursor = coll.watch(
+ [{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}],
+ {resumeAfter: resumeToken, batchSize: 2});
+
+ // Check the first batch.
+ assert.eq(cursor.objsLeftInBatch(), 2);
+ // Consume the first batch.
+ assert(cursor.hasNext());
+ cursor.next();
+ assert(cursor.hasNext());
+ cursor.next();
+ assert.eq(cursor.objsLeftInBatch(), 0);
+
+ // Check the batch returned by getMore.
+ assert(cursor.hasNext());
+ assert.eq(cursor.objsLeftInBatch(), 2);
+ cursor.next();
+ assert(cursor.hasNext());
+ cursor.next();
+ assert.eq(cursor.objsLeftInBatch(), 0);
+ // There are more changes coming, just not in the batch.
+ assert(cursor.hasNext());
+ }
+
+ jsTestLog("Testing watch() with maxAwaitTimeMS");
+ cursor = coll.watch([], {maxAwaitTimeMS: 500});
+ testCommandIsCalled(
+ function() {
+ assert(!cursor.hasNext());
+ },
+ function(cmdObj) {
+ assert.eq("getMore",
+ Object.keys(cmdObj)[0],
+ "expected getMore command, but was: " + tojson(cmdObj));
+ assert(cmdObj.hasOwnProperty("maxTimeMS"),
+ "unexpected getMore command: " + tojson(cmdObj));
+ assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj));
+ });
+
+ jsTestLog("Testing the cursor gets closed when the collection gets dropped");
+ cursor = coll.watch([{$project: {_id: 0}}]);
+ assert.writeOK(coll.insert({_id: 2, x: 1}));
+ expected = {
+ documentKey: {_id: 2},
+ fullDocument: {_id: 2, x: 1},
+ ns: {db: "test", coll: collName},
+ operationType: "insert",
+ };
+ checkNextChange(cursor, expected);
+ assert(!cursor.hasNext());
+ assert(!cursor.isClosed());
+ assert(!cursor.isExhausted());
+ coll.drop();
+ assert(cursor.hasNext());
+ assert(cursor.isClosed());
+ assert(!cursor.isExhausted());
+ expected = {operationType: "invalidate"};
+ checkNextChange(cursor, expected);
+ assert(!cursor.hasNext());
+ assert(cursor.isClosed());
+ assert(cursor.isExhausted());
+}());
diff --git a/src/mongo/scripting/mozjs/cursor.cpp b/src/mongo/scripting/mozjs/cursor.cpp
index fb1d48e8c83..0b107a31367 100644
--- a/src/mongo/scripting/mozjs/cursor.cpp
+++ b/src/mongo/scripting/mozjs/cursor.cpp
@@ -40,12 +40,13 @@
namespace mongo {
namespace mozjs {
-const JSFunctionSpec CursorInfo::methods[6] = {
+const JSFunctionSpec CursorInfo::methods[7] = {
MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(close, CursorInfo),
MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(hasNext, CursorInfo),
MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(next, CursorInfo),
MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(objsLeftInBatch, CursorInfo),
MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(readOnly, CursorInfo),
+ MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(isClosed, CursorInfo),
JS_FS_END,
};
@@ -126,5 +127,16 @@ void CursorInfo::Functions::close::call(JSContext* cx, JS::CallArgs args) {
args.rval().setUndefined();
}
+void CursorInfo::Functions::isClosed::call(JSContext* cx, JS::CallArgs args) {
+ auto cursor = getCursor(args);
+
+ if (!cursor) {
+ args.rval().setBoolean(true);
+ return;
+ }
+
+ args.rval().setBoolean(cursor->isDead());
+}
+
} // namespace mozjs
} // namespace mongo
diff --git a/src/mongo/scripting/mozjs/cursor.h b/src/mongo/scripting/mozjs/cursor.h
index 3d3386f7bcc..a0190e6a359 100644
--- a/src/mongo/scripting/mozjs/cursor.h
+++ b/src/mongo/scripting/mozjs/cursor.h
@@ -47,12 +47,13 @@ struct CursorInfo : public BaseInfo {
struct Functions {
MONGO_DECLARE_JS_FUNCTION(close);
MONGO_DECLARE_JS_FUNCTION(hasNext);
+ MONGO_DECLARE_JS_FUNCTION(isClosed);
MONGO_DECLARE_JS_FUNCTION(next);
MONGO_DECLARE_JS_FUNCTION(objsLeftInBatch);
MONGO_DECLARE_JS_FUNCTION(readOnly);
};
- static const JSFunctionSpec methods[6];
+ static const JSFunctionSpec methods[7];
static const char* const className;
static const unsigned classFlags = JSCLASS_HAS_PRIVATE;
diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js
index 8d242a347a9..452a2361f30 100644
--- a/src/mongo/shell/collection.js
+++ b/src/mongo/shell/collection.js
@@ -1641,6 +1641,25 @@ DBCollection.prototype.latencyStats = function(options) {
return this.aggregate([{$collStats: {latencyStats: options}}]);
};
+DBCollection.prototype.watch = function(pipeline, options) {
+ pipeline = pipeline || [];
+ options = options || {};
+ assert(pipeline instanceof Array, "'pipeline' argument must be an array");
+ assert(options instanceof Object, "'options' argument must be an object");
+
+ let changeStreamStage = {fullDocument: options.fullDocument || "default"};
+ delete options.fullDocument;
+
+ if (options.hasOwnProperty("resumeAfter")) {
+ changeStreamStage.resumeAfter = options.resumeAfter;
+ delete options.resumeAfter;
+ }
+
+ pipeline.unshift({$changeStream: changeStreamStage});
+ // Pass options "batchSize", "collation" and "maxAwaitTimeMS" down to aggregate().
+ return this.aggregate(pipeline, options);
+};
+
/**
* PlanCache
* Holds a reference to the collection.
diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js
index a7e133df116..8ef40760efa 100644
--- a/src/mongo/shell/db.js
+++ b/src/mongo/shell/db.js
@@ -215,6 +215,9 @@ var DB;
delete optcpy['useCursor'];
}
+ const maxAwaitTimeMS = optcpy.maxAwaitTimeMS;
+ delete optcpy.maxAwaitTimeMS;
+
// Reassign the cleaned-up options.
aggregateOptions = optcpy;
@@ -263,7 +266,7 @@ var DB;
batchSizeValue = cmdObj["cursor"]["batchSize"];
}
- return new DBCommandCursor(this, res, batchSizeValue);
+ return new DBCommandCursor(this, res, batchSizeValue, maxAwaitTimeMS);
}
return res;
diff --git a/src/mongo/shell/query.js b/src/mongo/shell/query.js
index 6de7edb3661..431c3aa9455 100644
--- a/src/mongo/shell/query.js
+++ b/src/mongo/shell/query.js
@@ -681,6 +681,16 @@ DBQuery.prototype.close = function() {
this._cursor.close();
};
+DBQuery.prototype.isClosed = function() {
+ this._exec();
+ return this._cursor.isClosed();
+};
+
+DBQuery.prototype.isExhausted = function() {
+ this._exec();
+ return this._cursor.isClosed() && this._cursor.objsLeftInBatch() === 0;
+};
+
DBQuery.shellBatchSize = 20;
/**
@@ -697,7 +707,7 @@ DBQuery.Option = {
partial: 0x80
};
-function DBCommandCursor(db, cmdResult, batchSize) {
+function DBCommandCursor(db, cmdResult, batchSize, maxAwaitTimeMS) {
if (cmdResult._mongo) {
const newSession = new _DelegatingDriverSession(cmdResult._mongo, db.getSession());
db = newSession.getDatabase(db.getName());
@@ -713,6 +723,7 @@ function DBCommandCursor(db, cmdResult, batchSize) {
this._useReadCommands = true;
this._cursorid = cmdResult.cursor.id;
this._batchSize = batchSize;
+ this._maxAwaitTimeMS = maxAwaitTimeMS;
this._ns = cmdResult.cursor.ns;
this._db = db;
@@ -732,10 +743,27 @@ function DBCommandCursor(db, cmdResult, batchSize) {
DBCommandCursor.prototype = {};
+/**
+ * Returns whether the cursor id is zero.
+ */
+DBCommandCursor.prototype.isClosed = function() {
+ if (this._useReadCommands) {
+ return bsonWoCompare({_: this._cursorid}, {_: NumberLong(0)}) === 0;
+ }
+ return this._cursor.isClosed();
+};
+
+/**
+ * Returns whether the cursor has closed and has nothing in the batch.
+ */
+DBCommandCursor.prototype.isExhausted = function() {
+ return this.isClosed() && this.objsLeftInBatch() === 0;
+};
+
DBCommandCursor.prototype.close = function() {
if (!this._useReadCommands) {
this._cursor.close();
- } else if (this._cursorid != 0) {
+ } else if (bsonWoCompare({_: this._cursorid}, {_: NumberLong(0)}) !== 0) {
var killCursorCmd = {
killCursors: this._collName,
cursors: [this._cursorid],
@@ -764,6 +792,11 @@ DBCommandCursor.prototype._runGetMoreCommand = function() {
getMoreCmd["batchSize"] = this._batchSize;
}
+ // maxAwaitTimeMS is only supported when using read commands.
+ if (this._maxAwaitTimeMS) {
+ getMoreCmd.maxTimeMS = this._maxAwaitTimeMS;
+ }
+
// Deliver the getMore command, and check for errors in the response.
var cmdRes = this._db.runCommand(getMoreCmd);
if (cmdRes.ok != 1) {