diff options
Diffstat (limited to 'jstests/change_streams/shell_helper.js')
-rw-r--r-- | jstests/change_streams/shell_helper.js | 394 |
1 files changed, 197 insertions, 197 deletions
diff --git a/jstests/change_streams/shell_helper.js b/jstests/change_streams/shell_helper.js index a044ba76e50..29330a433e9 100644 --- a/jstests/change_streams/shell_helper.js +++ b/jstests/change_streams/shell_helper.js @@ -7,215 +7,215 @@ // based on the commit oplog entry, which would cause this test to fail. // @tags: [change_stream_does_not_expect_txns] (function() { - "use strict"; +"use strict"; - load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. - load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. - load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp. +load("jstests/libs/collection_drop_recreate.js"); // For assert[Drop|Create]Collection. +load("jstests/libs/fixture_helpers.js"); // For FixtureHelpers. +load("jstests/libs/change_stream_util.js"); // For assertInvalidateOp. - const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper"); +const coll = assertDropAndRecreateCollection(db, "change_stream_shell_helper"); - function checkNextChange(cursor, expected) { - assert.soon(() => cursor.hasNext()); - const nextObj = cursor.next(); - assertChangeStreamEventEq(nextObj, expected); - return nextObj; - } - - function testCommandIsCalled(testFunc, checkFunc) { - const mongoRunCommandOriginal = Mongo.prototype.runCommand; +function checkNextChange(cursor, expected) { + assert.soon(() => cursor.hasNext()); + const nextObj = cursor.next(); + assertChangeStreamEventEq(nextObj, expected); + return nextObj; +} - const sentinel = {}; - let cmdObjSeen = sentinel; +function testCommandIsCalled(testFunc, checkFunc) { + const mongoRunCommandOriginal = Mongo.prototype.runCommand; - Mongo.prototype.runCommand = function runCommandSpy(dbName, cmdObj, options) { - cmdObjSeen = cmdObj; - return mongoRunCommandOriginal.apply(this, arguments); - }; + const sentinel = {}; + let cmdObjSeen = sentinel; - try { - assert.doesNotThrow(testFunc); - } finally { - Mongo.prototype.runCommand = mongoRunCommandOriginal; - } - - if (cmdObjSeen === sentinel) { - throw new Error("Mongo.prototype.runCommand() was never called: " + - testFunc.toString()); - } + Mongo.prototype.runCommand = function runCommandSpy(dbName, cmdObj, options) { + cmdObjSeen = cmdObj; + return mongoRunCommandOriginal.apply(this, arguments); + }; - checkFunc(cmdObjSeen); + try { + assert.doesNotThrow(testFunc); + } finally { + Mongo.prototype.runCommand = mongoRunCommandOriginal; } - jsTestLog("Testing watch() without options"); - let changeStreamCursor = coll.watch(); - - assert(!changeStreamCursor.hasNext()); - - // Write the first document into the collection. We will save the resume token from this change. - assert.writeOK(coll.insert({_id: 0, x: 1})); - let resumeToken; - - // Test that each of the change stream cursors picks up the change. - assert.soon(() => changeStreamCursor.hasNext()); - let change = changeStreamCursor.next(); - assert(!changeStreamCursor.hasNext()); - let expected = { - documentKey: {_id: 0}, - fullDocument: {_id: 0, x: 1}, - ns: {db: "test", coll: coll.getName()}, - operationType: "insert", - }; - assert("_id" in change, "Got unexpected change: " + tojson(change)); - // Remember the _id of the first op to resume the stream. - resumeToken = change._id; - // Remove the fields we cannot predict, then test that the change is as expected. - delete change._id; - delete change.clusterTime; - assert.docEq(change, expected); - - jsTestLog("Testing watch() with pipeline"); - changeStreamCursor = coll.watch([{$project: {clusterTime: 1, docId: "$documentKey._id"}}]); - - // Store the cluster time of the insert as the timestamp to start from. - const resumeTime = - assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]})) - .operationTime; - jsTestLog("Insert of document with _id 1 got operationTime " + tojson(resumeTime)); - - const changeForInsert = checkNextChange(changeStreamCursor, {docId: 1}); - jsTestLog("Change stream event for document with _id 1 reports clusterTime " + - tojson(changeForInsert.clusterTime)); - - // We expect the clusterTime returned by the change stream event and the operationTime returned - // by the insert to be the same. - assert.eq(changeForInsert.clusterTime, resumeTime); - - jsTestLog("Testing watch() with pipeline and resumeAfter"); - changeStreamCursor = - coll.watch([{$project: {docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); - checkNextChange(changeStreamCursor, {docId: 1}); - - jsTestLog("Testing watch() with pipeline and startAfter"); - changeStreamCursor = - coll.watch([{$project: {docId: "$documentKey._id"}}], {startAfter: resumeToken}); - checkNextChange(changeStreamCursor, {docId: 1}); - - jsTestLog("Testing watch() with pipeline and startAtOperationTime"); - changeStreamCursor = - coll.watch([{$project: {docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime}); - checkNextChange(changeStreamCursor, {docId: 1}); - - jsTestLog("Testing watch() with updateLookup"); - changeStreamCursor = coll.watch([], {fullDocument: "updateLookup"}); - - assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}})); - expected = { - documentKey: {_id: 0}, - fullDocument: {_id: 0, x: 10}, - ns: {db: "test", coll: coll.getName()}, - operationType: "update", - updateDescription: {removedFields: [], updatedFields: {x: 10}}, - }; - checkNextChange(changeStreamCursor, expected); - - jsTestLog("Testing watch() with batchSize"); - // Only test mongod because mongos uses batch size 0 for aggregate commands internally to - // establish cursors quickly. GetMore on mongos doesn't respect batch size due to SERVER-31992. - const isMongos = FixtureHelpers.isMongos(db); - if (!isMongos) { - // Increase a field by 5 times and verify the batch size is respected. - for (let i = 0; i < 5; i++) { - assert.writeOK(coll.update({_id: 1}, {$inc: {x: 1}})); - } + if (cmdObjSeen === sentinel) { + throw new Error("Mongo.prototype.runCommand() was never called: " + testFunc.toString()); + } - // Only watch the "update" changes of the specific doc since the beginning. - changeStreamCursor = - coll.watch([{$match: {documentKey: {_id: 1}, operationType: "update"}}], - {resumeAfter: resumeToken, batchSize: 2}); - - // Check the first batch. - assert.eq(changeStreamCursor.objsLeftInBatch(), 2); - // Consume the first batch. - assert(changeStreamCursor.hasNext()); - changeStreamCursor.next(); - assert(changeStreamCursor.hasNext()); - changeStreamCursor.next(); - // Confirm that the batch is empty. - assert.eq(changeStreamCursor.objsLeftInBatch(), 0); - - // Check the batch returned by getMore. - assert(changeStreamCursor.hasNext()); - assert.eq(changeStreamCursor.objsLeftInBatch(), 2); - changeStreamCursor.next(); - assert(changeStreamCursor.hasNext()); - changeStreamCursor.next(); - assert.eq(changeStreamCursor.objsLeftInBatch(), 0); - // There are more changes coming, just not in the batch. - assert(changeStreamCursor.hasNext()); + checkFunc(cmdObjSeen); +} + +jsTestLog("Testing watch() without options"); +let changeStreamCursor = coll.watch(); + +assert(!changeStreamCursor.hasNext()); + +// Write the first document into the collection. We will save the resume token from this change. +assert.writeOK(coll.insert({_id: 0, x: 1})); +let resumeToken; + +// Test that each of the change stream cursors picks up the change. +assert.soon(() => changeStreamCursor.hasNext()); +let change = changeStreamCursor.next(); +assert(!changeStreamCursor.hasNext()); +let expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0, x: 1}, + ns: {db: "test", coll: coll.getName()}, + operationType: "insert", +}; +assert("_id" in change, "Got unexpected change: " + tojson(change)); +// Remember the _id of the first op to resume the stream. +resumeToken = change._id; +// Remove the fields we cannot predict, then test that the change is as expected. +delete change._id; +delete change.clusterTime; +assert.docEq(change, expected); + +jsTestLog("Testing watch() with pipeline"); +changeStreamCursor = coll.watch([{$project: {clusterTime: 1, docId: "$documentKey._id"}}]); + +// Store the cluster time of the insert as the timestamp to start from. +const resumeTime = + assert.commandWorked(db.runCommand({insert: coll.getName(), documents: [{_id: 1, x: 1}]})) + .operationTime; +jsTestLog("Insert of document with _id 1 got operationTime " + tojson(resumeTime)); + +const changeForInsert = checkNextChange(changeStreamCursor, {docId: 1}); +jsTestLog("Change stream event for document with _id 1 reports clusterTime " + + tojson(changeForInsert.clusterTime)); + +// We expect the clusterTime returned by the change stream event and the operationTime returned +// by the insert to be the same. +assert.eq(changeForInsert.clusterTime, resumeTime); + +jsTestLog("Testing watch() with pipeline and resumeAfter"); +changeStreamCursor = + coll.watch([{$project: {docId: "$documentKey._id"}}], {resumeAfter: resumeToken}); +checkNextChange(changeStreamCursor, {docId: 1}); + +jsTestLog("Testing watch() with pipeline and startAfter"); +changeStreamCursor = + coll.watch([{$project: {docId: "$documentKey._id"}}], {startAfter: resumeToken}); +checkNextChange(changeStreamCursor, {docId: 1}); + +jsTestLog("Testing watch() with pipeline and startAtOperationTime"); +changeStreamCursor = + coll.watch([{$project: {docId: "$documentKey._id"}}], {startAtOperationTime: resumeTime}); +checkNextChange(changeStreamCursor, {docId: 1}); + +jsTestLog("Testing watch() with updateLookup"); +changeStreamCursor = coll.watch([], {fullDocument: "updateLookup"}); + +assert.writeOK(coll.update({_id: 0}, {$set: {x: 10}})); +expected = { + documentKey: {_id: 0}, + fullDocument: {_id: 0, x: 10}, + ns: {db: "test", coll: coll.getName()}, + operationType: "update", + updateDescription: {removedFields: [], updatedFields: {x: 10}}, +}; +checkNextChange(changeStreamCursor, expected); + +jsTestLog("Testing watch() with batchSize"); +// Only test mongod because mongos uses batch size 0 for aggregate commands internally to +// establish cursors quickly. GetMore on mongos doesn't respect batch size due to SERVER-31992. +const isMongos = FixtureHelpers.isMongos(db); +if (!isMongos) { + // Increase a field by 5 times and verify the batch size is respected. + for (let i = 0; i < 5; i++) { + assert.writeOK(coll.update({_id: 1}, {$inc: {x: 1}})); } - jsTestLog("Testing watch() with maxAwaitTimeMS"); - changeStreamCursor = coll.watch([], {maxAwaitTimeMS: 500}); - testCommandIsCalled(() => assert(!changeStreamCursor.hasNext()), (cmdObj) => { - assert.eq("getMore", - Object.keys(cmdObj)[0], - "expected getMore command, but was: " + tojson(cmdObj)); - assert(cmdObj.hasOwnProperty("maxTimeMS"), "unexpected getMore command: " + tojson(cmdObj)); - assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj)); + // Only watch the "update" changes of the specific doc since the beginning. + changeStreamCursor = coll.watch([{$match: {documentKey: {_id: 1}, operationType: "update"}}], + {resumeAfter: resumeToken, batchSize: 2}); + + // Check the first batch. + assert.eq(changeStreamCursor.objsLeftInBatch(), 2); + // Consume the first batch. + assert(changeStreamCursor.hasNext()); + changeStreamCursor.next(); + assert(changeStreamCursor.hasNext()); + changeStreamCursor.next(); + // Confirm that the batch is empty. + assert.eq(changeStreamCursor.objsLeftInBatch(), 0); + + // Check the batch returned by getMore. + assert(changeStreamCursor.hasNext()); + assert.eq(changeStreamCursor.objsLeftInBatch(), 2); + changeStreamCursor.next(); + assert(changeStreamCursor.hasNext()); + changeStreamCursor.next(); + assert.eq(changeStreamCursor.objsLeftInBatch(), 0); + // There are more changes coming, just not in the batch. + assert(changeStreamCursor.hasNext()); +} + +jsTestLog("Testing watch() with maxAwaitTimeMS"); +changeStreamCursor = coll.watch([], {maxAwaitTimeMS: 500}); +testCommandIsCalled(() => assert(!changeStreamCursor.hasNext()), (cmdObj) => { + assert.eq( + "getMore", Object.keys(cmdObj)[0], "expected getMore command, but was: " + tojson(cmdObj)); + assert(cmdObj.hasOwnProperty("maxTimeMS"), "unexpected getMore command: " + tojson(cmdObj)); + assert.eq(500, cmdObj.maxTimeMS, "unexpected getMore command: " + tojson(cmdObj)); +}); + +jsTestLog("Testing the cursor gets closed when the collection gets dropped"); +changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]); +assert.writeOK(coll.insert({_id: 2, x: 1})); +expected = { + documentKey: {_id: 2}, + fullDocument: {_id: 2, x: 1}, + ns: {db: "test", coll: coll.getName()}, + operationType: "insert", +}; +checkNextChange(changeStreamCursor, expected); +assert(!changeStreamCursor.hasNext()); +assert(!changeStreamCursor.isClosed()); +assert(!changeStreamCursor.isExhausted()); + +// Dropping the collection should trigger a drop notification. +assertDropCollection(db, coll.getName()); +assert.soon(() => changeStreamCursor.hasNext()); +assert(!changeStreamCursor.isExhausted()); +expected = { + operationType: "drop", + ns: {db: db.getName(), coll: coll.getName()} +}; +checkNextChange(changeStreamCursor, expected); +// For single collection change streams, the drop should invalidate the stream. +const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"}); + +if (invalidateDoc) { + jsTestLog("Testing using the 'startAfter' option from the invalidate entry"); + assert.commandWorked(coll.insert({_id: "After drop"})); + let resumedFromInvalidate = + coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}}); + + // We should see the new insert after starting over. However, in sharded cluster + // passthroughs we may see more drop and invalidate notifications before we see the insert. + let firstChangeAfterDrop; + assert.soon(() => { + if (!resumedFromInvalidate.hasNext()) { + return false; + } + const next = resumedFromInvalidate.next(); + if (next.operationType == "invalidate") { + // Start again! + resumedFromInvalidate = + coll.watch([], {startAfter: next._id, collation: {locale: "simple"}}); + return false; + } + if (next.operationType == "drop") { + return false; + } + // THIS is the change we wanted. + firstChangeAfterDrop = next; + return true; }); - jsTestLog("Testing the cursor gets closed when the collection gets dropped"); - changeStreamCursor = coll.watch([{$project: {clusterTime: 0}}]); - assert.writeOK(coll.insert({_id: 2, x: 1})); - expected = { - documentKey: {_id: 2}, - fullDocument: {_id: 2, x: 1}, - ns: {db: "test", coll: coll.getName()}, - operationType: "insert", - }; - checkNextChange(changeStreamCursor, expected); - assert(!changeStreamCursor.hasNext()); - assert(!changeStreamCursor.isClosed()); - assert(!changeStreamCursor.isExhausted()); - - // Dropping the collection should trigger a drop notification. - assertDropCollection(db, coll.getName()); - assert.soon(() => changeStreamCursor.hasNext()); - assert(!changeStreamCursor.isExhausted()); - expected = {operationType: "drop", ns: {db: db.getName(), coll: coll.getName()}}; - checkNextChange(changeStreamCursor, expected); - // For single collection change streams, the drop should invalidate the stream. - const invalidateDoc = assertInvalidateOp({cursor: changeStreamCursor, opType: "drop"}); - - if (invalidateDoc) { - jsTestLog("Testing using the 'startAfter' option from the invalidate entry"); - assert.commandWorked(coll.insert({_id: "After drop"})); - let resumedFromInvalidate = - coll.watch([], {startAfter: invalidateDoc._id, collation: {locale: "simple"}}); - - // We should see the new insert after starting over. However, in sharded cluster - // passthroughs we may see more drop and invalidate notifications before we see the insert. - let firstChangeAfterDrop; - assert.soon(() => { - if (!resumedFromInvalidate.hasNext()) { - return false; - } - const next = resumedFromInvalidate.next(); - if (next.operationType == "invalidate") { - // Start again! - resumedFromInvalidate = - coll.watch([], {startAfter: next._id, collation: {locale: "simple"}}); - return false; - } - if (next.operationType == "drop") { - return false; - } - // THIS is the change we wanted. - firstChangeAfterDrop = next; - return true; - }); - - assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change)); - } + assert.eq(firstChangeAfterDrop.documentKey._id, "After drop", tojson(change)); +} }()); |