summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/internal/queue.js71
-rw-r--r--lib/queue.js8
-rw-r--r--test/es2017/awaitableFunctions.js1
-rw-r--r--test/queue.js40
4 files changed, 91 insertions, 29 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js
index 01567aa..bb3e16b 100644
--- a/lib/internal/queue.js
+++ b/lib/internal/queue.js
@@ -45,27 +45,26 @@ export default function queue(worker, concurrency, payload) {
}
var processingScheduled = false;
- function _insert(data, insertAtFront, callback) {
+ function _insert(data, insertAtFront, rejectOnError, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
q.started = true;
- /*if (Array.isArray(data)) {
- return data.map(datum => _insert(datum, insertAtFront, callback));
- }*/
-
- var res;
+ var res, rej;
+ function promiseCallback (err, ...args) {
+ // we don't care about the error, let the global error handler
+ // deal with it
+ if (err) return rejectOnError ? rej(err) : res()
+ if (args.length <= 1) return res(args[0])
+ res(args)
+ }
var item = {
data,
- callback: callback || function (err, ...args) {
- // we don't care about the error, let the global error handler
- // deal with it
- if (err) return
- if (args.length <= 1) return res(args[0])
- res(args)
- }
+ callback: rejectOnError ?
+ promiseCallback :
+ (callback || promiseCallback)
};
if (insertAtFront) {
@@ -82,9 +81,10 @@ export default function queue(worker, concurrency, payload) {
});
}
- if (!callback) {
- return new Promise((resolve) => {
+ if (rejectOnError || !callback) {
+ return new Promise((resolve, reject) => {
res = resolve
+ rej = reject
})
}
}
@@ -121,6 +121,15 @@ export default function queue(worker, concurrency, payload) {
};
}
+ function _maybeDrain(data) {
+ if (data.length === 0 && q.idle()) {
+ // call drain immediately if there are no tasks
+ setImmediate(() => trigger('drain'));
+ return true
+ }
+ return false
+ }
+
const eventMethod = (name) => (handler) => {
if (!handler) {
return new Promise((resolve, reject) => {
@@ -148,13 +157,17 @@ export default function queue(worker, concurrency, payload) {
paused: false,
push (data, callback) {
if (Array.isArray(data)) {
- if (data.length === 0 && q.idle()) {
- // call drain immediately if there are no tasks
- return setImmediate(() => trigger('drain'));
- }
- return data.map(datum => _insert(datum, false, callback))
+ if (_maybeDrain(data)) return
+ return data.map(datum => _insert(datum, false, false, callback))
}
- return _insert(data, false, callback);
+ return _insert(data, false, false, callback);
+ },
+ pushAsync (data, callback) {
+ if (Array.isArray(data)) {
+ if (_maybeDrain(data)) return
+ return data.map(datum => _insert(datum, false, true, callback))
+ }
+ return _insert(data, false, true, callback);
},
kill () {
off()
@@ -162,13 +175,17 @@ export default function queue(worker, concurrency, payload) {
},
unshift (data, callback) {
if (Array.isArray(data)) {
- if (data.length === 0 && q.idle()) {
- // call drain immediately if there are no tasks
- return setImmediate(() => trigger('drain'));
- }
- return data.map(datum => _insert(datum, true, callback))
+ if (_maybeDrain(data)) return
+ return data.map(datum => _insert(datum, true, false, callback))
+ }
+ return _insert(data, true, false, callback);
+ },
+ unshiftAsync (data, callback) {
+ if (Array.isArray(data)) {
+ if (_maybeDrain(data)) return
+ return data.map(datum => _insert(datum, true, true, callback))
}
- return _insert(data, true, callback);
+ return _insert(data, true, true, callback);
},
remove (testFn) {
q._tasks.remove(testFn);
diff --git a/lib/queue.js b/lib/queue.js
index 53cf4ad..dc55547 100644
--- a/lib/queue.js
+++ b/lib/queue.js
@@ -21,12 +21,16 @@ import wrapAsync from './internal/wrapAsync';
* @property {number} payload - an integer that specifies how many items are
* passed to the worker function at a time. only applies if this is a
* [cargo]{@link module:ControlFlow.cargo} object
- * @property {Function} push - add a new task to the `queue`. Calls `callback`
+ * @property {AsyncFunction} push - add a new task to the `queue`. Calls `callback`
* once the `worker` has finished processing the task. Instead of a single task,
* a `tasks` array can be submitted. The respective callback is used for every
* task in the list. Invoke with `queue.push(task, [callback])`,
- * @property {Function} unshift - add a new task to the front of the `queue`.
+ * @property {AsyncFunction} unshift - add a new task to the front of the `queue`.
* Invoke with `queue.unshift(task, [callback])`.
+ * @property {AsyncFunction} pushAsync - the same as `q.push`, except this returns
+ * a promise that rejects if an error occurs.
+ * @property {AsyncFunction} unshirtAsync - the same as `q.unshift`, except this returns
+ * a promise that rejects if an error occurs.
* @property {Function} remove - remove items from the queue that match a test
* function. The test function will be passed an object with a `data` property,
* and a `priority` property, if this is a
diff --git a/test/es2017/awaitableFunctions.js b/test/es2017/awaitableFunctions.js
index e8adf7b..eb2663c 100644
--- a/test/es2017/awaitableFunctions.js
+++ b/test/es2017/awaitableFunctions.js
@@ -618,6 +618,7 @@ module.exports = function () {
'push cb 3',
'push cb 4',
'push cb 5',
+ 'push cb undefined',
'push cb 7',
'unsaturated',
'push cb 8'
diff --git a/test/queue.js b/test/queue.js
index e9f3a78..f99e91a 100644
--- a/test/queue.js
+++ b/test/queue.js
@@ -156,6 +156,46 @@ describe('queue', function(){
});
});
+ it('pushAsync', done => {
+ const calls = []
+ var q = async.queue((task, cb) => {
+ if (task === 2) return cb(new Error('fail'))
+ cb()
+ })
+
+ q.pushAsync(1, () => { throw new Error('should not be called') }).then(() => calls.push(1))
+ q.pushAsync(2).catch(err => {
+ expect(err.message).to.equal('fail')
+ calls.push(2)
+ })
+ q.pushAsync([3, 4]).map(p => p.then(() => calls.push('arr')))
+ q.drain(() => setTimeout(() => {
+ console.log('drain')
+ expect(calls).to.eql([1, 2, 'arr', 'arr'])
+ done()
+ }))
+ })
+
+ it('unshiftAsync', done => {
+ const calls = []
+ var q = async.queue((task, cb) => {
+ if (task === 2) return cb(new Error('fail'))
+ cb()
+ })
+
+ q.unshiftAsync(1).then(() => calls.push(1))
+ q.unshiftAsync(2).catch(err => {
+ expect(err.message).to.equal('fail')
+ calls.push(2)
+ })
+ q.unshiftAsync([3, 4]).map(p => p.then(() => calls.push('arr')))
+ q.drain(() => setTimeout(() => {
+ console.log('drain')
+ expect(calls).to.eql(['arr', 'arr', 2, 1])
+ done()
+ }))
+ })
+
it('global error handler', (done) => {
var results = [];