From 6927a814ad505920179e5dd50e3ccb085f591273 Mon Sep 17 00:00:00 2001 From: Hubert Argasinski Date: Fri, 15 Apr 2022 00:06:27 -0400 Subject: fix: update priorityQueue functionality to match queue (#1790) --- lib/internal/queue.js | 13 ++- lib/priorityQueue.js | 62 ++++++------ test/es2017/awaitableFunctions.js | 60 ++++++++++- test/priorityQueue.js | 206 ++++++++++++++++++++++++++++++++++++-- test/queue.js | 2 - 5 files changed, 293 insertions(+), 50 deletions(-) diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 886fe7a..75712c2 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -60,12 +60,11 @@ export default function queue(worker, concurrency, payload) { res(args) } - var item = { + var item = q._createTaskItem( data, - callback: rejectOnError ? - promiseCallback : + rejectOnError ? promiseCallback : (callback || promiseCallback) - }; + ); if (insertAtFront) { q._tasks.unshift(item); @@ -147,6 +146,12 @@ export default function queue(worker, concurrency, payload) { var isProcessing = false; var q = { _tasks: new DLL(), + _createTaskItem (data, callback) { + return { + data, + callback + }; + }, *[Symbol.iterator] () { yield* q._tasks[Symbol.iterator]() }, diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js index 53ea017..e418e48 100644 --- a/lib/priorityQueue.js +++ b/lib/priorityQueue.js @@ -1,4 +1,3 @@ -import setImmediate from './setImmediate.js' import queue from './queue.js' import Heap from './internal/Heap.js' @@ -19,54 +18,51 @@ import Heap from './internal/Heap.js' * @param {number} concurrency - An `integer` for determining how many `worker` * functions should be run in parallel. If omitted, the concurrency defaults to * `1`. If the concurrency is `0`, an error is thrown. - * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two + * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are three * differences between `queue` and `priorityQueue` objects: * * `push(task, priority, [callback])` - `priority` should be a number. If an * array of `tasks` is given, all tasks will be assigned the same priority. - * * The `unshift` method was removed. + * * `pushAsync(task, priority, [callback])` - the same as `priorityQueue.push`, + * except this returns a promise that rejects if an error occurs. + * * The `unshift` and `unshiftAsync` methods were removed. */ export default function(worker, concurrency) { // Start with a normal queue var q = queue(worker, concurrency); - var processingScheduled = false; + + var { + push, + pushAsync + } = q; q._tasks = new Heap(); + q._createTaskItem = ({data, priority}, callback) => { + return { + data, + priority, + callback + }; + }; - // Override push to accept second parameter representing priority - q.push = function(data, priority = 0, callback = () => {}) { - if (typeof callback !== 'function') { - throw new Error('task callback must be a function'); - } - q.started = true; - if (!Array.isArray(data)) { - data = [data]; - } - if (data.length === 0 && q.idle()) { - // call drain immediately if there are no tasks - return setImmediate(() => q.drain()); + function createDataItems(tasks, priority) { + if (!Array.isArray(tasks)) { + return {data: tasks, priority}; } + return tasks.map(data => { return {data, priority}; }); + } - for (var i = 0, l = data.length; i < l; i++) { - var item = { - data: data[i], - priority, - callback - }; - - q._tasks.push(item); - } + // Override push to accept second parameter representing priority + q.push = function(data, priority = 0, callback) { + return push(createDataItems(data, priority), callback); + }; - if (!processingScheduled) { - processingScheduled = true; - setImmediate(() => { - processingScheduled = false; - q.process(); - }); - } + q.pushAsync = function(data, priority = 0, callback) { + return pushAsync(createDataItems(data, priority), callback); }; - // Remove unshift function + // Remove unshift functions delete q.unshift; + delete q.unshiftAsync; return q; } diff --git a/test/es2017/awaitableFunctions.js b/test/es2017/awaitableFunctions.js index f961857..d68a18f 100644 --- a/test/es2017/awaitableFunctions.js +++ b/test/es2017/awaitableFunctions.js @@ -540,7 +540,7 @@ module.exports = function () { it('should work with queues', async () => { const q = async.queue(async (data) => { if (data === 2) throw new Error('oh noes') - await new Promise(resolve => setTimeout(resolve, 10)) + await new Promise(resolve => setTimeout(() => resolve(data), 10)) return data }, 5) @@ -561,7 +561,7 @@ module.exports = function () { const multiP = Promise.all(q.push([9, 10])) await q.drain() - await multiP + const res = await multiP expect(calls.join()).to.eql([ 'saturated', 'push cb 1', @@ -581,6 +581,62 @@ module.exports = function () { expect(emptyCalls).to.eql([ 'empty', ]) + + expect(res).to.eql([ + 9, + 10 + ]) + }) + + it('should work with priorityQueues', async () => { + const q = async.priorityQueue(async (data) => { + if (data === 2) throw new Error('oh noes') + await new Promise(resolve => setTimeout(() => resolve(data), 10)) + return data + }, 5) + + const calls = [] + const errorCalls = [] + const emptyCalls = [] + q.error().catch(d => errorCalls.push('error ' + d)) + q.saturated().then(() => calls.push('saturated')) + q.unsaturated().then(() => calls.push('unsaturated')) + q.empty().then(() => emptyCalls.push('empty')) + + q.push(1, 1).then(d => calls.push('push cb ' + d)) + q.push(2, 1).then(d => errorCalls.push('push cb ' + d)) + q.push([3, 4, 5, 6], 0).map(p => p.then(d => calls.push('push cb ' + d))) + q.push(7, 3).then(d => calls.push('push cb ' + d)) + q.push(8, 3).then(d => calls.push('push cb ' + d)) + + const multiP = Promise.all(q.push([9, 10], 1)) + + await q.drain() + const res = await multiP + expect(calls.join()).to.eql([ + 'saturated', + 'push cb 3', + 'push cb 4', + 'push cb 5', + 'push cb 6', + 'push cb 1', + 'unsaturated', + 'push cb 7', + 'push cb 8' + ].join()) + + expect(errorCalls).to.eql([ + 'push cb undefined', + 'error Error: oh noes', + ]) + expect(emptyCalls).to.eql([ + 'empty', + ]) + + expect(res).to.eql([ + 9, + 10 + ]) }) /* diff --git a/test/priorityQueue.js b/test/priorityQueue.js index 564a0d6..40f7fe6 100644 --- a/test/priorityQueue.js +++ b/test/priorityQueue.js @@ -16,13 +16,13 @@ describe('priorityQueue', () => { q.push(1, 1.4, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); - expect(q.length()).to.equal(2); + expect(q.length()).to.equal(3); call_order.push('callback ' + 1); }); q.push(2, 0.2, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); - expect(q.length()).to.equal(3); + expect(q.length()).to.equal(4); call_order.push('callback ' + 2); }); q.push(3, 3.8, (err, arg) => { @@ -31,26 +31,25 @@ describe('priorityQueue', () => { expect(q.length()).to.equal(0); call_order.push('callback ' + 3); }); - q.push(4, 2.9, (err, arg) => { + q.push(['arr', 'arr'], 2.9, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); - expect(q.length()).to.equal(1); - call_order.push('callback ' + 4); + call_order.push('callback arr'); }); - expect(q.length()).to.equal(4); + expect(q.length()).to.equal(5); expect(q.concurrency).to.equal(1); q.drain(() => { expect(call_order).to.eql([ 'process 2', 'callback 2', 'process 1', 'callback 1', - 'process 4', 'callback 4', + 'process arr', 'callback arr', + 'process arr', 'callback arr', 'process 3', 'callback 3' ]); expect(q.concurrency).to.equal(1); expect(q.length()).to.equal(0); - q.push([]) - expect(q.length()).to.equal(0) + expect(q.idle()).to.be.equal(true); done() }); try { @@ -79,27 +78,32 @@ describe('priorityQueue', () => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(2); + expect(q.running()).to.equal(1); call_order.push('callback ' + 1); }); q.push(2, 0.2, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(1); + expect(q.running()).to.equal(1); call_order.push('callback ' + 2); }); q.push(3, 3.8, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(0); + expect(q.running()).to.equal(1); call_order.push('callback ' + 3); }); q.push(4, 2.9, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); call_order.push('callback ' + 4); }); expect(q.length()).to.equal(4); + expect(q.running()).to.equal(0); expect(q.concurrency).to.equal(2); q.drain(() => { @@ -111,10 +115,30 @@ describe('priorityQueue', () => { ]); expect(q.concurrency).to.equal(2); expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); done(); }); }); + it('pushAsync', done => { + const calls = []; + const q = async.priorityQueue((task, cb) => { + if (task === 2) return cb(new Error('fail')); + cb(); + }) + + q.pushAsync(1, 1, () => { throw new Error('should not be called') }).then(() => calls.push(1)); + q.pushAsync(2, 0).catch(err => { + expect(err.message).to.equal('fail'); + calls.push(2); + }); + q.pushAsync([3, 4], 0).map(p => p.then(() => calls.push('arr'))); + q.drain(() => setTimeout(() => { + expect(calls).to.eql([2, 'arr', 'arr', 1]); + done(); + })); + }); + it('pause in worker with concurrency', (done) => { var call_order = []; var q = async.priorityQueue((task, callback) => { @@ -144,6 +168,104 @@ describe('priorityQueue', () => { }); }); + it('kill', (done) => { + var q = async.priorityQueue((/*task, callback*/) => { + setTimeout(() => { + throw new Error("Function should never be called"); + }, 20); + }, 1); + q.drain(() => { + throw new Error("Function should never be called"); + }); + + q.push(0); + + q.kill(); + + setTimeout(() => { + expect(q.length()).to.equal(0); + done(); + }, 40); + }); + + context('q.workersList():', () => { + it('should be the same length as running()', (done) => { + var q = async.priorityQueue((task, cb) => { + async.setImmediate(() => { + expect(q.workersList().length).to.equal(q.running()); + cb(); + }); + }, 2); + + q.drain(() => { + expect(q.workersList().length).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }); + + q.push('foo', 2); + q.push('bar', 1); + q.push('baz', 0); + }); + + it('should contain the items being processed', (done) => { + var itemsBeingProcessed = { + 'foo': [ + {data: 'bar', priority: 1}, + {data: 'foo', priority: 2} + ], + 'foo_cb': [ + {data: 'foo', priority: 2} + ], + 'bar': [ + {data: 'baz', priority: 0}, + {data: 'bar', priority: 1} + ], + 'bar_cb': [ + {data: 'bar', priority: 1}, + {data: 'foo', priority: 2} + ], + 'baz': [ + {data: 'baz', priority: 0} + ], + 'baz_cb': [ + {data: 'baz', priority: 0}, + {data: 'bar', priority: 1} + ] + }; + + function getWorkersListData(q) { + return q.workersList().map(({data, priority}) => { + return {data, priority}; + }); + } + + var q = async.priorityQueue((task, cb) => { + expect( + getWorkersListData(q) + ).to.eql(itemsBeingProcessed[task]); + expect(q.workersList().length).to.equal(q.running()); + async.setImmediate(() => { + expect( + getWorkersListData(q) + ).to.eql(itemsBeingProcessed[task+'_cb']); + expect(q.workersList().length).to.equal(q.running()); + cb(); + }); + }, 2); + + q.drain(() => { + expect(q.workersList()).to.eql([]); + expect(q.workersList().length).to.equal(q.running()); + done(); + }); + + q.push('foo', 2); + q.push('bar', 1); + q.push('baz', 0); + }); + }); + context('q.saturated(): ', () => { it('should call the saturated callback if tasks length is concurrency', (done) => { var calls = []; @@ -247,6 +369,27 @@ describe('priorityQueue', () => { }); }); + it('should call the drain callback if receives an empty push', (done) => { + var call_order = []; + + var q = async.priorityQueue((task, callback) => { + call_order.push(task); + callback('error', 'arg'); + }, 1); + + q.drain(() => { + call_order.push('drain') + expect(call_order).to.eql([ + 'drain' + ]); + expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }); + + q.push([], 1, () => { throw new Error('should not be called') }); + }); + it('should not call the drain callback if receives empty push and tasks are still pending', (done) => { var call_order = []; @@ -282,4 +425,49 @@ describe('priorityQueue', () => { q.push([], 1, () => {}); }); + + it('should be iterable', (done) => { + var q = async.priorityQueue((data, cb) => { + if (data === 3) { + q.push(6) + expect([...q]).to.eql([4, 5, 6]); + } + async.setImmediate(cb); + }); + + q.push([1, 2, 3, 4, 5]); + + expect([...q]).to.eql([1, 2, 3, 4, 5]); + + q.drain(() => { + expect([...q]).to.eql([]); + done(); + }); + }); + + it('should error when calling unshift', () => { + var q = async.priorityQueue(() => {}); + expect(() => { + q.unshift(1); + }).to.throw(); + }); + + it('should error when calling unshiftAsync', () => { + var q = async.priorityQueue(() => {}); + expect(() => { + q.unshiftAsync(1); + }).to.throw(); + }); + + it('should error when the callback is called more than once', (done) => { + var q = async.priorityQueue((task, callback) => { + callback(); + expect(() => { + callback(); + }).to.throw(); + done(); + }, 2); + + q.push(1); + }); }); diff --git a/test/queue.js b/test/queue.js index f99e91a..615dd6b 100644 --- a/test/queue.js +++ b/test/queue.js @@ -170,7 +170,6 @@ describe('queue', function(){ }) q.pushAsync([3, 4]).map(p => p.then(() => calls.push('arr'))) q.drain(() => setTimeout(() => { - console.log('drain') expect(calls).to.eql([1, 2, 'arr', 'arr']) done() })) @@ -190,7 +189,6 @@ describe('queue', function(){ }) q.unshiftAsync([3, 4]).map(p => p.then(() => calls.push('arr'))) q.drain(() => setTimeout(() => { - console.log('drain') expect(calls).to.eql(['arr', 'arr', 2, 1]) done() })) -- cgit v1.2.1