summaryrefslogtreecommitdiff
path: root/lib/internal/queue.js
diff options
context:
space:
mode:
authorAlex Early <alexander.early@gmail.com>2019-05-19 18:30:18 -0700
committerGitHub <noreply@github.com>2019-05-19 18:30:18 -0700
commite0446642d70817f4353b4ed12a3c86e5d769cf01 (patch)
treea6a492683ec0550b9dd55edd0ac29e039885b37e /lib/internal/queue.js
parent1d458d980a8bfee8c941061dca364a33cf15fac0 (diff)
downloadasync-e0446642d70817f4353b4ed12a3c86e5d769cf01.tar.gz
BREAKING CHANGE: awaitable queues (#1641)
* BREAKING CHANGE: awaitable queues * fix priorityQueue tests * fix tests in firefox * make the upgrade a bit more user-friendly * clarify docs
Diffstat (limited to 'lib/internal/queue.js')
-rw-r--r--lib/internal/queue.js136
1 files changed, 104 insertions, 32 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js
index 751b562..0db96a8 100644
--- a/lib/internal/queue.js
+++ b/lib/internal/queue.js
@@ -3,8 +3,6 @@ import setImmediate from './setImmediate';
import DLL from './DoublyLinkedList';
import wrapAsync from './wrapAsync';
-const noop = () => {}
-
export default function queue(worker, concurrency, payload) {
if (concurrency == null) {
concurrency = 1;
@@ -16,6 +14,35 @@ export default function queue(worker, concurrency, payload) {
var _worker = wrapAsync(worker);
var numRunning = 0;
var workersList = [];
+ const events = {
+ error: [],
+ drain: [],
+ saturated: [],
+ unsaturated: [],
+ empty: []
+ }
+
+ function on (event, handler) {
+ events[event].push(handler)
+ }
+
+ function once (event, handler) {
+ const handleAndRemove = (...args) => {
+ off(event, handleAndRemove)
+ handler(...args)
+ }
+ events[event].push(handleAndRemove)
+ }
+
+ function off (event, handler) {
+ if (!event) return Object.keys(events).forEach(ev => events[ev] = [])
+ if (!handler) return events[event] = []
+ events[event] = events[event].filter(ev => ev !== handler)
+ }
+
+ function trigger (event, ...args) {
+ events[event].forEach(handler => handler(...args))
+ }
var processingScheduled = false;
function _insert(data, insertAtFront, callback) {
@@ -23,25 +50,32 @@ export default function queue(worker, concurrency, payload) {
throw new Error('task callback must be a function');
}
q.started = true;
- if (!Array.isArray(data)) {
- data = [data];
- }
- if (data.length === 0 && q.idle()) {
- // call drain immediately if there are no tasks
- return setImmediate(() => q.drain());
+ if (Array.isArray(data)) {
+ if (data.length === 0 && q.idle()) {
+ // call drain immediately if there are no tasks
+ return setImmediate(() => trigger('drain'));
+ }
+
+ return data.map(datum => _insert(datum, insertAtFront, callback));
}
- for (var i = 0, l = data.length; i < l; i++) {
- var item = {
- data: data[i],
- callback: callback || noop
- };
+ var res;
- if (insertAtFront) {
- q._tasks.unshift(item);
- } else {
- q._tasks.push(item);
+ var item = {
+ data,
+ callback: callback || function (err, ...args) {
+ // we don't care about the error, let the global error handler
+ // deal with it
+ if (err) return
+ if (args.length <= 1) return res(args[0])
+ res(args)
}
+ };
+
+ if (insertAtFront) {
+ q._tasks.unshift(item);
+ } else {
+ q._tasks.push(item);
}
if (!processingScheduled) {
@@ -51,9 +85,15 @@ export default function queue(worker, concurrency, payload) {
q.process();
});
}
+
+ if (!callback) {
+ return new Promise((resolve) => {
+ res = resolve
+ })
+ }
}
- function _next(tasks) {
+ function _createCB(tasks) {
return function (err, ...args) {
numRunning -= 1;
@@ -70,21 +110,35 @@ export default function queue(worker, concurrency, payload) {
task.callback(err, ...args);
if (err != null) {
- q.error(err, task.data);
+ trigger('error', err, task.data);
}
}
if (numRunning <= (q.concurrency - q.buffer) ) {
- q.unsaturated();
+ trigger('unsaturated')
}
if (q.idle()) {
- q.drain();
+ trigger('drain')
}
q.process();
};
}
+ const eventMethod = (name) => (handler) => {
+ if (!handler) {
+ return new Promise((resolve, reject) => {
+ once(name, (err, data) => {
+ if (err) return reject(err)
+ resolve(data)
+ })
+ })
+ }
+ off(name)
+ on(name, handler)
+
+ }
+
var isProcessing = false;
var q = {
_tasks: new DLL(),
@@ -93,23 +147,18 @@ export default function queue(worker, concurrency, payload) {
},
concurrency,
payload,
- saturated: noop,
- unsaturated:noop,
buffer: concurrency / 4,
- empty: noop,
- drain: noop,
- error: noop,
started: false,
paused: false,
push (data, callback) {
- _insert(data, false, callback);
+ return _insert(data, false, callback);
},
kill () {
- q.drain = noop;
+ off()
q._tasks.empty();
},
unshift (data, callback) {
- _insert(data, true, callback);
+ return _insert(data, true, callback);
},
remove (testFn) {
q._tasks.remove(testFn);
@@ -135,14 +184,14 @@ export default function queue(worker, concurrency, payload) {
numRunning += 1;
if (q._tasks.length === 0) {
- q.empty();
+ trigger('empty');
}
if (numRunning === q.concurrency) {
- q.saturated();
+ trigger('saturated');
}
- var cb = onlyOnce(_next(tasks));
+ var cb = onlyOnce(_createCB(tasks));
_worker(data, cb);
}
isProcessing = false;
@@ -168,5 +217,28 @@ export default function queue(worker, concurrency, payload) {
setImmediate(q.process);
}
};
+ // define these as fixed properties, so people get useful errors when updating
+ Object.defineProperties(q, {
+ saturated: {
+ writable: false,
+ value: eventMethod('saturated')
+ },
+ unsaturated: {
+ writable: false,
+ value: eventMethod('unsaturated')
+ },
+ empty: {
+ writable: false,
+ value: eventMethod('empty')
+ },
+ drain: {
+ writable: false,
+ value: eventMethod('drain')
+ },
+ error: {
+ writable: false,
+ value: eventMethod('error')
+ },
+ })
return q;
}