diff options
author | Alex Early <alexander.early@gmail.com> | 2019-05-19 18:30:18 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-05-19 18:30:18 -0700 |
commit | e0446642d70817f4353b4ed12a3c86e5d769cf01 (patch) | |
tree | a6a492683ec0550b9dd55edd0ac29e039885b37e /lib/internal/queue.js | |
parent | 1d458d980a8bfee8c941061dca364a33cf15fac0 (diff) | |
download | async-e0446642d70817f4353b4ed12a3c86e5d769cf01.tar.gz |
BREAKING CHANGE: awaitable queues (#1641)
* BREAKING CHANGE: awaitable queues
* fix priorityQueue tests
* fix tests in firefox
* make the upgrade a bit more user-friendly
* clarify docs
Diffstat (limited to 'lib/internal/queue.js')
-rw-r--r-- | lib/internal/queue.js | 136 |
1 files changed, 104 insertions, 32 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 751b562..0db96a8 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -3,8 +3,6 @@ import setImmediate from './setImmediate'; import DLL from './DoublyLinkedList'; import wrapAsync from './wrapAsync'; -const noop = () => {} - export default function queue(worker, concurrency, payload) { if (concurrency == null) { concurrency = 1; @@ -16,6 +14,35 @@ export default function queue(worker, concurrency, payload) { var _worker = wrapAsync(worker); var numRunning = 0; var workersList = []; + const events = { + error: [], + drain: [], + saturated: [], + unsaturated: [], + empty: [] + } + + function on (event, handler) { + events[event].push(handler) + } + + function once (event, handler) { + const handleAndRemove = (...args) => { + off(event, handleAndRemove) + handler(...args) + } + events[event].push(handleAndRemove) + } + + function off (event, handler) { + if (!event) return Object.keys(events).forEach(ev => events[ev] = []) + if (!handler) return events[event] = [] + events[event] = events[event].filter(ev => ev !== handler) + } + + function trigger (event, ...args) { + events[event].forEach(handler => handler(...args)) + } var processingScheduled = false; function _insert(data, insertAtFront, callback) { @@ -23,25 +50,32 @@ export default function queue(worker, concurrency, payload) { throw new Error('task callback must be a function'); } q.started = true; - if (!Array.isArray(data)) { - data = [data]; - } - if (data.length === 0 && q.idle()) { - // call drain immediately if there are no tasks - return setImmediate(() => q.drain()); + if (Array.isArray(data)) { + if (data.length === 0 && q.idle()) { + // call drain immediately if there are no tasks + return setImmediate(() => trigger('drain')); + } + + return data.map(datum => _insert(datum, insertAtFront, callback)); } - for (var i = 0, l = data.length; i < l; i++) { - var item = { - data: data[i], - callback: callback || noop - }; + var res; - if (insertAtFront) { - q._tasks.unshift(item); - } else { - q._tasks.push(item); + var item = { + data, + callback: callback || function (err, ...args) { + // we don't care about the error, let the global error handler + // deal with it + if (err) return + if (args.length <= 1) return res(args[0]) + res(args) } + }; + + if (insertAtFront) { + q._tasks.unshift(item); + } else { + q._tasks.push(item); } if (!processingScheduled) { @@ -51,9 +85,15 @@ export default function queue(worker, concurrency, payload) { q.process(); }); } + + if (!callback) { + return new Promise((resolve) => { + res = resolve + }) + } } - function _next(tasks) { + function _createCB(tasks) { return function (err, ...args) { numRunning -= 1; @@ -70,21 +110,35 @@ export default function queue(worker, concurrency, payload) { task.callback(err, ...args); if (err != null) { - q.error(err, task.data); + trigger('error', err, task.data); } } if (numRunning <= (q.concurrency - q.buffer) ) { - q.unsaturated(); + trigger('unsaturated') } if (q.idle()) { - q.drain(); + trigger('drain') } q.process(); }; } + const eventMethod = (name) => (handler) => { + if (!handler) { + return new Promise((resolve, reject) => { + once(name, (err, data) => { + if (err) return reject(err) + resolve(data) + }) + }) + } + off(name) + on(name, handler) + + } + var isProcessing = false; var q = { _tasks: new DLL(), @@ -93,23 +147,18 @@ export default function queue(worker, concurrency, payload) { }, concurrency, payload, - saturated: noop, - unsaturated:noop, buffer: concurrency / 4, - empty: noop, - drain: noop, - error: noop, started: false, paused: false, push (data, callback) { - _insert(data, false, callback); + return _insert(data, false, callback); }, kill () { - q.drain = noop; + off() q._tasks.empty(); }, unshift (data, callback) { - _insert(data, true, callback); + return _insert(data, true, callback); }, remove (testFn) { q._tasks.remove(testFn); @@ -135,14 +184,14 @@ export default function queue(worker, concurrency, payload) { numRunning += 1; if (q._tasks.length === 0) { - q.empty(); + trigger('empty'); } if (numRunning === q.concurrency) { - q.saturated(); + trigger('saturated'); } - var cb = onlyOnce(_next(tasks)); + var cb = onlyOnce(_createCB(tasks)); _worker(data, cb); } isProcessing = false; @@ -168,5 +217,28 @@ export default function queue(worker, concurrency, payload) { setImmediate(q.process); } }; + // define these as fixed properties, so people get useful errors when updating + Object.defineProperties(q, { + saturated: { + writable: false, + value: eventMethod('saturated') + }, + unsaturated: { + writable: false, + value: eventMethod('unsaturated') + }, + empty: { + writable: false, + value: eventMethod('empty') + }, + drain: { + writable: false, + value: eventMethod('drain') + }, + error: { + writable: false, + value: eventMethod('error') + }, + }) return q; } |