diff options
-rw-r--r-- | jstests/change_streams/change_stream_collation.js | 24 | ||||
-rw-r--r-- | jstests/change_streams/change_stream_shell_helper.js | 154 | ||||
-rw-r--r-- | src/mongo/scripting/mozjs/cursor.cpp | 14 | ||||
-rw-r--r-- | src/mongo/scripting/mozjs/cursor.h | 3 | ||||
-rw-r--r-- | src/mongo/shell/collection.js | 19 | ||||
-rw-r--r-- | src/mongo/shell/db.js | 5 | ||||
-rw-r--r-- | src/mongo/shell/query.js | 37 |
7 files changed, 251 insertions, 5 deletions
diff --git a/jstests/change_streams/change_stream_collation.js b/jstests/change_streams/change_stream_collation.js index a3ef58a14de..5093bbe4ed0 100644 --- a/jstests/change_streams/change_stream_collation.js +++ b/jstests/change_streams/change_stream_collation.js @@ -181,4 +181,28 @@ {cursor: englishCaseSensitiveStream, expectedChanges: [{docId: 1}]}); }()); + // Test that collation is supported by the shell helper. + // Test that creating a change stream with a non-default collation against a collection that has + // a simple default collation will use the collation specified on the operation. + (function() { + const noCollationCollection = db.change_stream_no_collation; + noCollationCollection.drop(); + assert.commandWorked(db.runCommand({create: noCollationCollection.getName()})); + + const cursor = noCollationCollection.watch( + [ + {$match: {"fullDocument.text": "abc"}}, + {$project: {docId: "$documentKey._id", _id: 0}} + ], + {collation: caseInsensitive}); + assert(!cursor.hasNext()); + assert.writeOK(noCollationCollection.insert({_id: 0, text: "aBc"})); + assert.writeOK(noCollationCollection.insert({_id: 1, text: "abc"})); + assert(cursor.hasNext()); + assert.docEq(cursor.next(), {docId: 0}); + assert(cursor.hasNext()); + assert.docEq(cursor.next(), {docId: 1}); + assert(!cursor.hasNext()); + }()); + })(); diff --git a/jstests/change_streams/change_stream_shell_helper.js b/jstests/change_streams/change_stream_shell_helper.js new file mode 100644 index 00000000000..4e972e0fd6f --- /dev/null +++ b/jstests/change_streams/change_stream_shell_helper.js @@ -0,0 +1,154 @@ +// Test DBCollection.watch() shell helper and its options. + +(function() { + "use strict"; + + const collName = "change_stream_shell_helper"; + const coll = db[collName]; + coll.drop(); + + function checkNextChange(cursor, expected) { + assert(cursor.hasNext()); + assert.docEq(cursor.next(), expected); + } + + function testCommandIsCalled(testFunc, checkFunc) { + const mongoRunCommandOriginal = Mongo.prototype.runCommand; + + const sentinel = {}; + let cmdObjSeen = sentinel; + + Mongo.prototype.runCommand = function runCommandSpy(dbName, cmdObj, options) { + cmdObjSeen = cmdObj; + return mongoRunCommandOriginal.apply(this, arguments); + }; + + try { + assert.doesNotThrow(testFunc); + } finally { + Mongo.prototype.runCommand = mongoRunCommandOriginal; + } + + if (cmdObjSeen === sentinel) { + throw new Error("Mongo.prototype.runCommand() was never called: " + + testFunc.toString()); + } + + checkFunc(cmdObjSeen); + } + + jsTestLog("Testing watch() without options"); + let cursor = coll.watch(); + assert(!cursor.hasNext()); + assert.writeOK(coll.insert({_id: 0, x: 1})); + assert(cursor.hasNext()); + let change = cursor.next(); + assert(!cursor.hasNext()); + let expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0, x: 1}, + ns: {db: "test", coll: collName}, + operationType: "insert", + }; + assert("_id" in change, "Got unexpected change: " + tojson(change)); + // Remember the _id of the first op to resume the stream. + const resumeToken = change._id; + delete change._id; + assert.docEq(change, expected); + + jsTestLog("Testing watch() with pipeline"); + cursor = coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}]); + assert.writeOK(coll.insert({_id: 1, x: 1})); + checkNextChange(cursor, {docId: 1}); + + jsTestLog("Testing watch() with pipeline and resumeAfter"); + cursor = + coll.watch([{$project: {_id: 0, docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); + checkNextChange(cursor, {docId: 1}); + + jsTestLog("Testing watch() with updateLookup"); + cursor = coll.watch([{$project: {_id: 0}}], {fullDocument: "updateLookup"}); + assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}})); + expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0, x: 10}, + ns: {db: "test", coll: collName}, + operationType: "update", + updateDescription: {removedFields: [], updatedFields: {x: 10}}, + }; + checkNextChange(cursor, expected); + + jsTestLog("Testing watch() with batchSize"); + // Only test mongod because mongos uses batch size 0 for aggregate commands internally to + // establish cursors quickly. + // GetMore on mongos doesn't respect batch size either due to SERVER-31992. + const isMongos = db.runCommand({isdbgrid: 1}).isdbgrid; + if (!isMongos) { + // Increase a field by 5 times and verify the batch size is respected. + for (let i = 0; i < 5; i++) { + assert.writeOK(coll.update({_id: 1}, {$inc: {x: 1}})); + } + + // Only watch the "update" changes of the specific doc since the beginning. + cursor = coll.watch( + [{$match: {documentKey: {_id: 1}, operationType: "update"}}, {$project: {_id: 0}}], + {resumeAfter: resumeToken, batchSize: 2}); + + // Check the first batch. + assert.eq(cursor.objsLeftInBatch(), 2); + // Consume the first batch. + assert(cursor.hasNext()); + cursor.next(); + assert(cursor.hasNext()); + cursor.next(); + assert.eq(cursor.objsLeftInBatch(), 0); + + // Check the batch returned by getMore. + assert(cursor.hasNext()); + assert.eq(cursor.objsLeftInBatch(), 2); + cursor.next(); + assert(cursor.hasNext()); + cursor.next(); + assert.eq(cursor.objsLeftInBatch(), 0); + // There are more changes coming, just not in the batch. + assert(cursor.hasNext()); + } + + jsTestLog("Testing watch() with maxAwaitTimeMS"); + cursor = coll.watch([], {maxAwaitTimeMS: 500}); + testCommandIsCalled( + function() { + assert(!cursor.hasNext()); + }, + function(cmdObj) { + assert.eq("getMore", + Object.keys(cmdObj)[0], + "expected getMore command, but was: " + tojson(cmdObj)); + assert(cmdObj.hasOwnProperty("maxTimeMS"), + "unexpected getMore command: " + tojson(cmdObj)); + assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj)); + }); + + jsTestLog("Testing the cursor gets closed when the collection gets dropped"); + cursor = coll.watch([{$project: {_id: 0}}]); + assert.writeOK(coll.insert({_id: 2, x: 1})); + expected = { + documentKey: {_id: 2}, + fullDocument: {_id: 2, x: 1}, + ns: {db: "test", coll: collName}, + operationType: "insert", + }; + checkNextChange(cursor, expected); + assert(!cursor.hasNext()); + assert(!cursor.isClosed()); + assert(!cursor.isExhausted()); + coll.drop(); + assert(cursor.hasNext()); + assert(cursor.isClosed()); + assert(!cursor.isExhausted()); + expected = {operationType: "invalidate"}; + checkNextChange(cursor, expected); + assert(!cursor.hasNext()); + assert(cursor.isClosed()); + assert(cursor.isExhausted()); +}()); diff --git a/src/mongo/scripting/mozjs/cursor.cpp b/src/mongo/scripting/mozjs/cursor.cpp index fb1d48e8c83..0b107a31367 100644 --- a/src/mongo/scripting/mozjs/cursor.cpp +++ b/src/mongo/scripting/mozjs/cursor.cpp @@ -40,12 +40,13 @@ namespace mongo { namespace mozjs { -const JSFunctionSpec CursorInfo::methods[6] = { +const JSFunctionSpec CursorInfo::methods[7] = { MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(close, CursorInfo), MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(hasNext, CursorInfo), MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(next, CursorInfo), MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(objsLeftInBatch, CursorInfo), MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(readOnly, CursorInfo), + MONGO_ATTACH_JS_CONSTRAINED_METHOD_NO_PROTO(isClosed, CursorInfo), JS_FS_END, }; @@ -126,5 +127,16 @@ void CursorInfo::Functions::close::call(JSContext* cx, JS::CallArgs args) { args.rval().setUndefined(); } +void CursorInfo::Functions::isClosed::call(JSContext* cx, JS::CallArgs args) { + auto cursor = getCursor(args); + + if (!cursor) { + args.rval().setBoolean(true); + return; + } + + args.rval().setBoolean(cursor->isDead()); +} + } // namespace mozjs } // namespace mongo diff --git a/src/mongo/scripting/mozjs/cursor.h b/src/mongo/scripting/mozjs/cursor.h index 3d3386f7bcc..a0190e6a359 100644 --- a/src/mongo/scripting/mozjs/cursor.h +++ b/src/mongo/scripting/mozjs/cursor.h @@ -47,12 +47,13 @@ struct CursorInfo : public BaseInfo { struct Functions { MONGO_DECLARE_JS_FUNCTION(close); MONGO_DECLARE_JS_FUNCTION(hasNext); + MONGO_DECLARE_JS_FUNCTION(isClosed); MONGO_DECLARE_JS_FUNCTION(next); MONGO_DECLARE_JS_FUNCTION(objsLeftInBatch); MONGO_DECLARE_JS_FUNCTION(readOnly); }; - static const JSFunctionSpec methods[6]; + static const JSFunctionSpec methods[7]; static const char* const className; static const unsigned classFlags = JSCLASS_HAS_PRIVATE; diff --git a/src/mongo/shell/collection.js b/src/mongo/shell/collection.js index 8d242a347a9..452a2361f30 100644 --- a/src/mongo/shell/collection.js +++ b/src/mongo/shell/collection.js @@ -1641,6 +1641,25 @@ DBCollection.prototype.latencyStats = function(options) { return this.aggregate([{$collStats: {latencyStats: options}}]); }; +DBCollection.prototype.watch = function(pipeline, options) { + pipeline = pipeline || []; + options = options || {}; + assert(pipeline instanceof Array, "'pipeline' argument must be an array"); + assert(options instanceof Object, "'options' argument must be an object"); + + let changeStreamStage = {fullDocument: options.fullDocument || "default"}; + delete options.fullDocument; + + if (options.hasOwnProperty("resumeAfter")) { + changeStreamStage.resumeAfter = options.resumeAfter; + delete options.resumeAfter; + } + + pipeline.unshift({$changeStream: changeStreamStage}); + // Pass options "batchSize", "collation" and "maxAwaitTimeMS" down to aggregate(). + return this.aggregate(pipeline, options); +}; + /** * PlanCache * Holds a reference to the collection. diff --git a/src/mongo/shell/db.js b/src/mongo/shell/db.js index a7e133df116..8ef40760efa 100644 --- a/src/mongo/shell/db.js +++ b/src/mongo/shell/db.js @@ -215,6 +215,9 @@ var DB; delete optcpy['useCursor']; } + const maxAwaitTimeMS = optcpy.maxAwaitTimeMS; + delete optcpy.maxAwaitTimeMS; + // Reassign the cleaned-up options. aggregateOptions = optcpy; @@ -263,7 +266,7 @@ var DB; batchSizeValue = cmdObj["cursor"]["batchSize"]; } - return new DBCommandCursor(this, res, batchSizeValue); + return new DBCommandCursor(this, res, batchSizeValue, maxAwaitTimeMS); } return res; diff --git a/src/mongo/shell/query.js b/src/mongo/shell/query.js index 6de7edb3661..431c3aa9455 100644 --- a/src/mongo/shell/query.js +++ b/src/mongo/shell/query.js @@ -681,6 +681,16 @@ DBQuery.prototype.close = function() { this._cursor.close(); }; +DBQuery.prototype.isClosed = function() { + this._exec(); + return this._cursor.isClosed(); +}; + +DBQuery.prototype.isExhausted = function() { + this._exec(); + return this._cursor.isClosed() && this._cursor.objsLeftInBatch() === 0; +}; + DBQuery.shellBatchSize = 20; /** @@ -697,7 +707,7 @@ DBQuery.Option = { partial: 0x80 }; -function DBCommandCursor(db, cmdResult, batchSize) { +function DBCommandCursor(db, cmdResult, batchSize, maxAwaitTimeMS) { if (cmdResult._mongo) { const newSession = new _DelegatingDriverSession(cmdResult._mongo, db.getSession()); db = newSession.getDatabase(db.getName()); @@ -713,6 +723,7 @@ function DBCommandCursor(db, cmdResult, batchSize) { this._useReadCommands = true; this._cursorid = cmdResult.cursor.id; this._batchSize = batchSize; + this._maxAwaitTimeMS = maxAwaitTimeMS; this._ns = cmdResult.cursor.ns; this._db = db; @@ -732,10 +743,27 @@ function DBCommandCursor(db, cmdResult, batchSize) { DBCommandCursor.prototype = {}; +/** + * Returns whether the cursor id is zero. + */ +DBCommandCursor.prototype.isClosed = function() { + if (this._useReadCommands) { + return bsonWoCompare({_: this._cursorid}, {_: NumberLong(0)}) === 0; + } + return this._cursor.isClosed(); +}; + +/** + * Returns whether the cursor has closed and has nothing in the batch. + */ +DBCommandCursor.prototype.isExhausted = function() { + return this.isClosed() && this.objsLeftInBatch() === 0; +}; + DBCommandCursor.prototype.close = function() { if (!this._useReadCommands) { this._cursor.close(); - } else if (this._cursorid != 0) { + } else if (bsonWoCompare({_: this._cursorid}, {_: NumberLong(0)}) !== 0) { var killCursorCmd = { killCursors: this._collName, cursors: [this._cursorid], @@ -764,6 +792,11 @@ DBCommandCursor.prototype._runGetMoreCommand = function() { getMoreCmd["batchSize"] = this._batchSize; } + // maxAwaitTimeMS is only supported when using read commands. + if (this._maxAwaitTimeMS) { + getMoreCmd.maxTimeMS = this._maxAwaitTimeMS; + } + // Deliver the getMore command, and check for errors in the response. var cmdRes = this._db.runCommand(getMoreCmd); if (cmdRes.ok != 1) { |