diff options
Diffstat (limited to 'jstests/aggregation/sources/changeNotification/change_notification.js')
-rw-r--r-- | jstests/aggregation/sources/changeNotification/change_notification.js | 158 |
1 files changed, 5 insertions, 153 deletions
diff --git a/jstests/aggregation/sources/changeNotification/change_notification.js b/jstests/aggregation/sources/changeNotification/change_notification.js index e0df38950d6..7636b02fd53 100644 --- a/jstests/aggregation/sources/changeNotification/change_notification.js +++ b/jstests/aggregation/sources/changeNotification/change_notification.js @@ -4,28 +4,20 @@ (function() { "use strict"; - var oplogProjection = {$project: {"_id.ts": 0}}; - // Helper for testing that pipeline returns correct set of results. function testPipeline(pipeline, expectedResult, collection) { - // Limit to the last N documents from the end of the oplog, because currently - // $changeNotification always comes from the start of the oplog. - pipeline.push({$sort: {"_id.ts": -1}}); - if (expectedResult.length > 0) { - pipeline.push({$limit: expectedResult.length}); - } // Strip the oplog fields we aren't testing. - pipeline.push(oplogProjection); - assert.docEq(collection.aggregate(pipeline).toArray().reverse(), expectedResult); + pipeline.push({$limit: 1}); + pipeline.push({$project: {"_id.ts": 0}}); + assert.docEq(collection.aggregate(pipeline).toArray(), expectedResult); } - let replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1}); - let nodes = replTest.startSet(); + var replTest = new ReplSetTest({name: 'changeNotificationTest', nodes: 1}); + var nodes = replTest.startSet(); replTest.initiate(); replTest.awaitReplication(); db = replTest.getPrimary().getDB('test'); - db.getMongo().forceReadMode('commands'); jsTestLog("Testing single insert"); assert.writeOK(db.t1.insert({_id: 0, a: 1})); @@ -137,145 +129,5 @@ assert.writeOK(db.t3.insert({_id: 101, renameCollection: "test.dne1", to: "test.dne2"})); testPipeline([{$changeNotification: {}}], [], db.dne1); testPipeline([{$changeNotification: {}}], [], db.dne2); - - // Now make sure the cursor behaves like a tailable awaitData cursor. - jsTestLog("Testing tailability"); - let tailableCursor = db.tailable1.aggregate([{$changeNotification: {}}, oplogProjection]); - assert(!tailableCursor.hasNext()); - assert.writeOK(db.tailable1.insert({_id: 101, a: 1})); - assert(tailableCursor.hasNext()); - assert.docEq(tailableCursor.next(), { - "_id": { - "_id": 101, - "ns": "test.tailable1", - }, - "documentKey": {"_id": 101}, - "newDocument": {"_id": 101, "a": 1}, - "ns": {"coll": "tailable1", "db": "test"}, - "operationType": "insert" - }); - - jsTestLog("Testing awaitdata"); - let res = assert.commandWorked(db.runCommand({ - aggregate: "tailable2", - pipeline: [{$changeNotification: {}}, oplogProjection], - cursor: {} - })); - let aggcursor = res.cursor; - - // We should get a valid cursor. - assert.neq(aggcursor.id, 0); - - // Initial batch size should be zero as there should be no data. - assert.eq(aggcursor.firstBatch.length, 0); - - // No data, so should return no results, but cursor should remain valid. - res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 50})); - aggcursor = res.cursor; - assert.neq(aggcursor.id, 0); - assert.eq(aggcursor.nextBatch.length, 0); - - // Now insert something in parallel while waiting for it. - let insertshell = startParallelShell(function() { - // Wait for the getMore to appear in currentop. - assert.soon(function() { - return db.currentOp({op: "getmore", "command.collection": "tailable2"}).inprog.length == - 1; - }); - assert.writeOK(db.tailable2.insert({_id: 102, a: 2})); - }); - res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable2", maxTimeMS: 5 * 60 * 1000})); - aggcursor = res.cursor; - assert.eq(aggcursor.nextBatch.length, 1); - assert.docEq(aggcursor.nextBatch[0], { - "_id": { - "_id": 102, - "ns": "test.tailable2", - }, - "documentKey": {"_id": 102}, - "newDocument": {"_id": 102, "a": 2}, - "ns": {"coll": "tailable2", "db": "test"}, - "operationType": "insert" - }); - - // Wait for insert shell to terminate. - insertshell(); - - jsTestLog("Testing awaitdata - no wake on insert to another collection"); - res = assert.commandWorked(db.runCommand({ - aggregate: "tailable3", - pipeline: [{$changeNotification: {}}, oplogProjection], - cursor: {} - })); - aggcursor = res.cursor; - // We should get a valid cursor. - assert.neq(aggcursor.id, 0); - - // Initial batch size should be zero as there should be no data. - assert.eq(aggcursor.firstBatch.length, 0); - - // Now insert something in a different collection in parallel while waiting. - insertshell = startParallelShell(function() { - // Wait for the getMore to appear in currentop. - assert.soon(function() { - return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == - 1; - }); - assert.writeOK(db.tailable3a.insert({_id: 103, a: 2})); - }); - let start = new Date(); - res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 1000})); - let diff = (new Date()).getTime() - start.getTime(); - assert.gt(diff, 900, "AwaitData returned prematurely on insert to unrelated collection."); - aggcursor = res.cursor; - // Cursor should be valid with no data. - assert.neq(aggcursor.id, 0); - assert.eq(aggcursor.nextBatch.length, 0); - - // Wait for insert shell to terminate. - insertshell(); - - // This time, put something in a different collection, then in the correct collection. - // We should wake up with just the correct data. - insertshell = startParallelShell(function() { - // Wait for the getMore to appear in currentop. - assert.soon(function() { - return db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == - 1; - }); - assert.writeOK(db.tailable3a.insert({_id: 104, a: 2})); - assert(db.currentOp({op: "getmore", "command.collection": "tailable3"}).inprog.length == 1); - assert.writeOK(db.tailable3.insert({_id: 105, a: 3})); - }); - res = assert.commandWorked( - db.runCommand({getMore: aggcursor.id, collection: "tailable3", maxTimeMS: 5 * 60 * 1000})); - aggcursor = res.cursor; - assert.neq(aggcursor.id, 0); - assert.eq(aggcursor.nextBatch.length, 1); - assert.docEq(aggcursor.nextBatch[0], { - "_id": { - "_id": 105, - "ns": "test.tailable3", - }, - "documentKey": {"_id": 105}, - "newDocument": {"_id": 105, "a": 3}, - "ns": {"coll": "tailable3", "db": "test"}, - "operationType": "insert" - }); - - // Wait for insert shell to terminate. - insertshell(); - - jsTestLog("Ensuring attempt to read with legacy operations fails."); - db.getMongo().forceReadMode('legacy'); - tailableCursor = db.tailable2.aggregate([{$changeNotification: {}}, oplogProjection], - {cursor: {batchSize: 0}}); - assert.throws(function() { - tailableCursor.next(); - }, [], "Legacy getMore expected to fail on changeNotification cursor."); - replTest.stopSet(); }()); |