summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/cargo.js37
-rw-r--r--lib/internal/queue.js136
-rw-r--r--lib/queue.js54
-rw-r--r--test/cargo.js40
-rw-r--r--test/cargoQueue.js44
-rw-r--r--test/es2017/asyncFunctions.js12
-rw-r--r--test/es2017/awaitableFunctions.js45
-rw-r--r--test/priorityQueue.js32
-rw-r--r--test/queue.js121
9 files changed, 310 insertions, 211 deletions
diff --git a/lib/cargo.js b/lib/cargo.js
index 5060223..1afdc0e 100644
--- a/lib/cargo.js
+++ b/lib/cargo.js
@@ -1,36 +1,6 @@
import queue from './internal/queue';
/**
- * A cargo of tasks for the worker function to complete. Cargo inherits all of
- * the same methods and event callbacks as [`queue`]{@link module:ControlFlow.queue}.
- * @typedef {Object} CargoObject
- * @memberOf module:ControlFlow
- * @property {Function} length - A function returning the number of items
- * waiting to be processed. Invoke like `cargo.length()`.
- * @property {number} payload - An `integer` for determining how many tasks
- * should be process per round. This property can be changed after a `cargo` is
- * created to alter the payload on-the-fly.
- * @property {Function} push - Adds `task` to the `queue`. The callback is
- * called once the `worker` has finished processing the task. Instead of a
- * single task, an array of `tasks` can be submitted. The respective callback is
- * used for every task in the list. Invoke like `cargo.push(task, [callback])`.
- * @property {Function} saturated - A callback that is called when the
- * `queue.length()` hits the concurrency and further tasks will be queued.
- * @property {Function} empty - A callback that is called when the last item
- * from the `queue` is given to a `worker`.
- * @property {Function} drain - A callback that is called when the last item
- * from the `queue` has returned from the `worker`.
- * @property {Function} idle - a function returning false if there are items
- * waiting or being processed, or true if not. Invoke like `cargo.idle()`.
- * @property {Function} pause - a function that pauses the processing of tasks
- * until `resume()` is called. Invoke like `cargo.pause()`.
- * @property {Function} resume - a function that resumes the processing of
- * queued tasks when the queue is paused. Invoke like `cargo.resume()`.
- * @property {Function} kill - a function that removes the `drain` callback and
- * empties remaining tasks from the queue forcing it to go idle. Invoke like `cargo.kill()`.
- */
-
-/**
* Creates a `cargo` object with the specified payload. Tasks added to the
* cargo will be processed altogether (up to the `payload` limit). If the
* `worker` is in progress, the task is queued until it becomes available. Once
@@ -53,7 +23,7 @@ import queue from './internal/queue';
* @param {number} [payload=Infinity] - An optional `integer` for determining
* how many tasks should be processed per round; if omitted, the default is
* unlimited.
- * @returns {module:ControlFlow.CargoObject} A cargo object to manage the tasks. Callbacks can
+ * @returns {module:ControlFlow.QueueObject} A cargo object to manage the tasks. Callbacks can
* attached as certain properties to listen for specific events during the
* lifecycle of the cargo and inner queue.
* @example
@@ -73,9 +43,8 @@ import queue from './internal/queue';
* cargo.push({name: 'bar'}, function(err) {
* console.log('finished processing bar');
* });
- * cargo.push({name: 'baz'}, function(err) {
- * console.log('finished processing baz');
- * });
+ * await cargo.push({name: 'baz'});
+ * console.log('finished processing baz');
*/
export default function cargo(worker, payload) {
return queue(worker, 1, payload);
diff --git a/lib/internal/queue.js b/lib/internal/queue.js
index 751b562..0db96a8 100644
--- a/lib/internal/queue.js
+++ b/lib/internal/queue.js
@@ -3,8 +3,6 @@ import setImmediate from './setImmediate';
import DLL from './DoublyLinkedList';
import wrapAsync from './wrapAsync';
-const noop = () => {}
-
export default function queue(worker, concurrency, payload) {
if (concurrency == null) {
concurrency = 1;
@@ -16,6 +14,35 @@ export default function queue(worker, concurrency, payload) {
var _worker = wrapAsync(worker);
var numRunning = 0;
var workersList = [];
+ const events = {
+ error: [],
+ drain: [],
+ saturated: [],
+ unsaturated: [],
+ empty: []
+ }
+
+ function on (event, handler) {
+ events[event].push(handler)
+ }
+
+ function once (event, handler) {
+ const handleAndRemove = (...args) => {
+ off(event, handleAndRemove)
+ handler(...args)
+ }
+ events[event].push(handleAndRemove)
+ }
+
+ function off (event, handler) {
+ if (!event) return Object.keys(events).forEach(ev => events[ev] = [])
+ if (!handler) return events[event] = []
+ events[event] = events[event].filter(ev => ev !== handler)
+ }
+
+ function trigger (event, ...args) {
+ events[event].forEach(handler => handler(...args))
+ }
var processingScheduled = false;
function _insert(data, insertAtFront, callback) {
@@ -23,25 +50,32 @@ export default function queue(worker, concurrency, payload) {
throw new Error('task callback must be a function');
}
q.started = true;
- if (!Array.isArray(data)) {
- data = [data];
- }
- if (data.length === 0 && q.idle()) {
- // call drain immediately if there are no tasks
- return setImmediate(() => q.drain());
+ if (Array.isArray(data)) {
+ if (data.length === 0 && q.idle()) {
+ // call drain immediately if there are no tasks
+ return setImmediate(() => trigger('drain'));
+ }
+
+ return data.map(datum => _insert(datum, insertAtFront, callback));
}
- for (var i = 0, l = data.length; i < l; i++) {
- var item = {
- data: data[i],
- callback: callback || noop
- };
+ var res;
- if (insertAtFront) {
- q._tasks.unshift(item);
- } else {
- q._tasks.push(item);
+ var item = {
+ data,
+ callback: callback || function (err, ...args) {
+ // we don't care about the error, let the global error handler
+ // deal with it
+ if (err) return
+ if (args.length <= 1) return res(args[0])
+ res(args)
}
+ };
+
+ if (insertAtFront) {
+ q._tasks.unshift(item);
+ } else {
+ q._tasks.push(item);
}
if (!processingScheduled) {
@@ -51,9 +85,15 @@ export default function queue(worker, concurrency, payload) {
q.process();
});
}
+
+ if (!callback) {
+ return new Promise((resolve) => {
+ res = resolve
+ })
+ }
}
- function _next(tasks) {
+ function _createCB(tasks) {
return function (err, ...args) {
numRunning -= 1;
@@ -70,21 +110,35 @@ export default function queue(worker, concurrency, payload) {
task.callback(err, ...args);
if (err != null) {
- q.error(err, task.data);
+ trigger('error', err, task.data);
}
}
if (numRunning <= (q.concurrency - q.buffer) ) {
- q.unsaturated();
+ trigger('unsaturated')
}
if (q.idle()) {
- q.drain();
+ trigger('drain')
}
q.process();
};
}
+ const eventMethod = (name) => (handler) => {
+ if (!handler) {
+ return new Promise((resolve, reject) => {
+ once(name, (err, data) => {
+ if (err) return reject(err)
+ resolve(data)
+ })
+ })
+ }
+ off(name)
+ on(name, handler)
+
+ }
+
var isProcessing = false;
var q = {
_tasks: new DLL(),
@@ -93,23 +147,18 @@ export default function queue(worker, concurrency, payload) {
},
concurrency,
payload,
- saturated: noop,
- unsaturated:noop,
buffer: concurrency / 4,
- empty: noop,
- drain: noop,
- error: noop,
started: false,
paused: false,
push (data, callback) {
- _insert(data, false, callback);
+ return _insert(data, false, callback);
},
kill () {
- q.drain = noop;
+ off()
q._tasks.empty();
},
unshift (data, callback) {
- _insert(data, true, callback);
+ return _insert(data, true, callback);
},
remove (testFn) {
q._tasks.remove(testFn);
@@ -135,14 +184,14 @@ export default function queue(worker, concurrency, payload) {
numRunning += 1;
if (q._tasks.length === 0) {
- q.empty();
+ trigger('empty');
}
if (numRunning === q.concurrency) {
- q.saturated();
+ trigger('saturated');
}
- var cb = onlyOnce(_next(tasks));
+ var cb = onlyOnce(_createCB(tasks));
_worker(data, cb);
}
isProcessing = false;
@@ -168,5 +217,28 @@ export default function queue(worker, concurrency, payload) {
setImmediate(q.process);
}
};
+ // define these as fixed properties, so people get useful errors when updating
+ Object.defineProperties(q, {
+ saturated: {
+ writable: false,
+ value: eventMethod('saturated')
+ },
+ unsaturated: {
+ writable: false,
+ value: eventMethod('unsaturated')
+ },
+ empty: {
+ writable: false,
+ value: eventMethod('empty')
+ },
+ drain: {
+ writable: false,
+ value: eventMethod('drain')
+ },
+ error: {
+ writable: false,
+ value: eventMethod('error')
+ },
+ })
return q;
}
diff --git a/lib/queue.js b/lib/queue.js
index 4462f80..53cf4ad 100644
--- a/lib/queue.js
+++ b/lib/queue.js
@@ -18,6 +18,9 @@ import wrapAsync from './internal/wrapAsync';
* @property {number} concurrency - an integer for determining how many `worker`
* functions should be run in parallel. This property can be changed after a
* `queue` is created to alter the concurrency on-the-fly.
+ * @property {number} payload - an integer that specifies how many items are
+ * passed to the worker function at a time. only applies if this is a
+ * [cargo]{@link module:ControlFlow.cargo} object
* @property {Function} push - add a new task to the `queue`. Calls `callback`
* once the `worker` has finished processing the task. Instead of a single task,
* a `tasks` array can be submitted. The respective callback is used for every
@@ -30,20 +33,26 @@ import wrapAsync from './internal/wrapAsync';
* [priorityQueue]{@link module:ControlFlow.priorityQueue} object.
* Invoked with `queue.remove(testFn)`, where `testFn` is of the form
* `function ({data, priority}) {}` and returns a Boolean.
- * @property {Function} saturated - a callback that is called when the number of
- * running workers hits the `concurrency` limit, and further tasks will be
- * queued.
- * @property {Function} unsaturated - a callback that is called when the number
- * of running workers is less than the `concurrency` & `buffer` limits, and
- * further tasks will not be queued.
+ * @property {Function} saturated - a function that sets a callback that is
+ * called when the number of running workers hits the `concurrency` limit, and
+ * further tasks will be queued. If the callback is omitted, `q.saturated()`
+ * returns a promise for the next occurrence.
+ * @property {Function} unsaturated - a function that sets a callback that is
+ * called when the number of running workers is less than the `concurrency` &
+ * `buffer` limits, and further tasks will not be queued. If the callback is
+ * omitted, `q.unsaturated()` returns a promise for the next occurrence.
* @property {number} buffer - A minimum threshold buffer in order to say that
* the `queue` is `unsaturated`.
- * @property {Function} empty - a callback that is called when the last item
- * from the `queue` is given to a `worker`.
- * @property {Function} drain - a callback that is called when the last item
- * from the `queue` has returned from the `worker`.
- * @property {Function} error - a callback that is called when a task errors.
- * Has the signature `function(error, task)`.
+ * @property {Function} empty - a function that sets a callback that is called
+ * when the last item from the `queue` is given to a `worker`. If the callback
+ * is omitted, `q.empty()` returns a promise for the next occurrence.
+ * @property {Function} drain - a function that sets a callback that is called
+ * when the last item from the `queue` has returned from the `worker`. If the
+ * callback is omitted, `q.drain()` returns a promise for the next occurrence.
+ * @property {Function} error - a function that sets a callback that is called
+ * when a task errors. Has the signature `function(error, task)`. If the
+ * callback is omitted, `error()` returns a promise that rejects on the next
+ * error.
* @property {boolean} paused - a boolean for determining whether the queue is
* in a paused state.
* @property {Function} pause - a function that pauses the processing of tasks
@@ -65,6 +74,12 @@ import wrapAsync from './internal/wrapAsync';
* for (let item of q) {
* console.log(item)
* }
+ *
+ * q.drain(() => {
+ * console.log('all done')
+ * })
+ * // or
+ * await q.drain()
*/
/**
@@ -96,22 +111,23 @@ import wrapAsync from './internal/wrapAsync';
* }, 2);
*
* // assign a callback
- * q.drain = function() {
+ * q.drain(function() {
* console.log('all items have been processed');
- * };
+ * });
+ * // or await the end
+ * await q.drain()
*
* // assign an error callback
- * q.error = function(err, task) {
+ * q.error(function(err, task) {
* console.error('task experienced an error');
- * };
+ * });
*
* // add some items to the queue
* q.push({name: 'foo'}, function(err) {
* console.log('finished processing foo');
* });
- * q.push({name: 'bar'}, function (err) {
- * console.log('finished processing bar');
- * });
+ * // callback is optional
+ * q.push({name: 'bar'});
*
* // add some items to the queue (batch-wise)
* q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
diff --git a/test/cargo.js b/test/cargo.js
index 2191eb8..59e28f5 100644
--- a/test/cargo.js
+++ b/test/cargo.js
@@ -59,7 +59,7 @@ describe('cargo', () => {
}, 30);
- c.drain = function () {
+ c.drain(() => {
expect(call_order).to.eql([
'process 1 2', 'callback 1', 'callback 2',
'process 3 4', 'callback 3', 'callback 4',
@@ -67,7 +67,7 @@ describe('cargo', () => {
]);
expect(c.length()).to.equal(0);
done();
- };
+ });
});
it('without callback', (done) => {
@@ -95,7 +95,7 @@ describe('cargo', () => {
c.push(5);
}, 80);
- c.drain = function() {
+ c.drain(() => {
expect(call_order).to.eql([
'process 1',
'process 2',
@@ -103,7 +103,7 @@ describe('cargo', () => {
'process 5'
]);
done();
- }
+ })
});
it('bulk task', (done) => {
@@ -145,9 +145,9 @@ describe('cargo', () => {
}, 3);
var drainCounter = 0;
- c.drain = function () {
+ c.drain(() => {
drainCounter++;
- };
+ });
for(var i = 0; i < 10; i++){
c.push(i);
@@ -172,7 +172,7 @@ describe('cargo', () => {
}
var drainCounter = 0;
- c.drain = function () {
+ c.drain(() => {
drainCounter++;
if (drainCounter === 1) {
@@ -181,7 +181,7 @@ describe('cargo', () => {
expect(drainCounter).to.equal(2);
done();
}
- };
+ });
loadCargo();
});
@@ -195,15 +195,15 @@ describe('cargo', () => {
}, 1);
q.concurrency = 3;
- q.saturated = function() {
+ q.saturated(() => {
assert(q.running() == 3, 'cargo should be saturated now');
calls.push('saturated');
- };
- q.empty = function() {
+ });
+ q.empty(() => {
assert(q.length() === 0, 'cargo should be empty now');
calls.push('empty');
- };
- q.drain = function() {
+ });
+ q.drain(() => {
assert(
q.length() === 0 && q.running() === 0,
'cargo should be empty now and no more workers should be running'
@@ -227,7 +227,7 @@ describe('cargo', () => {
'drain'
]);
done();
- };
+ });
q.push('foo', () => {calls.push('foo cb');});
q.push('bar', () => {calls.push('bar cb');});
q.push('zoo', () => {calls.push('zoo cb');});
@@ -249,9 +249,7 @@ describe('cargo', () => {
setTimeout(cb, 25);
}, 1);
- cargo.drain = function () {
- done();
- };
+ cargo.drain(done);
expect(cargo.payload).to.equal(1);
@@ -286,11 +284,11 @@ describe('cargo', () => {
});
}, 2);
- cargo.drain = function() {
+ cargo.drain(() => {
expect(cargo.workersList()).to.eql([]);
expect(cargo.running()).to.equal(0);
done();
- };
+ });
cargo.push('foo');
cargo.push('bar');
@@ -306,10 +304,10 @@ describe('cargo', () => {
});
}, 2);
- cargo.drain = function() {
+ cargo.drain(() => {
expect(cargo.running()).to.equal(0);
done();
- };
+ });
cargo.push('foo');
cargo.push('bar');
diff --git a/test/cargoQueue.js b/test/cargoQueue.js
index 517422c..46801c7 100644
--- a/test/cargoQueue.js
+++ b/test/cargoQueue.js
@@ -61,7 +61,7 @@ describe('cargoQueue', () => {
expect(c.length()).to.equal(2);
- c.drain = function () {
+ c.drain(() => {
expect(call_order).to.eql([
'process 1 2', 'callback 1', 'callback 2',
'process 3', 'callback 3',
@@ -69,7 +69,7 @@ describe('cargoQueue', () => {
]);
expect(c.length()).to.equal(0);
done();
- };
+ });
});
it('without callback', (done) => {
@@ -83,7 +83,7 @@ describe('cargoQueue', () => {
c.push(4);
setImmediate(() => {
c.push(5);
- c.drain = function complete () {
+ c.drain(() => {
expect(call_order).to.eql([
'process 1',
'process 2',
@@ -91,7 +91,7 @@ describe('cargoQueue', () => {
'process 5'
]);
done();
- }
+ })
})
})
})
@@ -135,9 +135,9 @@ describe('cargoQueue', () => {
}, 3, 2);
var drainCounter = 0;
- c.drain = function () {
+ c.drain(() => {
drainCounter++;
- };
+ });
for(var i = 0; i < 10; i++){
c.push(i);
@@ -162,7 +162,7 @@ describe('cargoQueue', () => {
}
var drainCounter = 0;
- c.drain = function () {
+ c.drain(() => {
drainCounter++;
if (drainCounter === 1) {
@@ -171,7 +171,7 @@ describe('cargoQueue', () => {
expect(drainCounter).to.equal(2);
done();
}
- };
+ });
loadCargo();
});
@@ -184,15 +184,15 @@ describe('cargoQueue', () => {
async.setImmediate(cb);
}, 3, 1);
- q.saturated = function() {
+ q.saturated(() => {
assert(q.running() == 3, 'cargoQueue should be saturated now');
calls.push('saturated');
- };
- q.empty = function() {
+ });
+ q.empty(() => {
assert(q.length() === 0, 'cargoQueue should be empty now');
calls.push('empty');
- };
- q.drain = function() {
+ });
+ q.drain(() => {
assert(
q.length() === 0 && q.running() === 0,
'cargoQueue should be empty now and no more workers should be running'
@@ -216,7 +216,7 @@ describe('cargoQueue', () => {
'drain'
]);
done();
- };
+ });
q.push('foo', () => {calls.push('foo cb');});
q.push('bar', () => {calls.push('bar cb');});
q.push('zoo', () => {calls.push('zoo cb');});
@@ -238,9 +238,7 @@ describe('cargoQueue', () => {
setTimeout(cb, 25);
}, 1, 1);
- cargo.drain = function () {
- done();
- };
+ cargo.drain(done);
expect(cargo.payload).to.equal(1);
@@ -264,9 +262,7 @@ describe('cargoQueue', () => {
setTimeout(cb, 25);
}, 1, 1);
- cargo.drain = function () {
- done();
- };
+ cargo.drain(done);
expect(cargo.concurrency).to.equal(1);
@@ -301,11 +297,11 @@ describe('cargoQueue', () => {
});
}, 1, 2);
- cargo.drain = function() {
+ cargo.drain(() => {
expect(cargo.workersList()).to.eql([]);
expect(cargo.running()).to.equal(0);
done();
- };
+ });
cargo.push('foo');
cargo.push('bar');
@@ -321,10 +317,10 @@ describe('cargoQueue', () => {
});
}, 1, 1);
- cargo.drain = function() {
+ cargo.drain(() => {
expect(cargo.running()).to.equal(0);
done();
- };
+ });
cargo.push(['foo', 'bar', 'baz', 'boo']);
})
diff --git a/test/es2017/asyncFunctions.js b/test/es2017/asyncFunctions.js
index 553b7f4..bf0d708 100644
--- a/test/es2017/asyncFunctions.js
+++ b/test/es2017/asyncFunctions.js
@@ -350,10 +350,10 @@ module.exports = function () {
result.push(await Promise.resolve(val));
}, 2)
- q.drain = () => {
+ q.drain(() => {
expect(result).to.eql([[1, 2], [3]]);
done();
- };
+ });
q.push(1);
q.push(2);
@@ -366,10 +366,10 @@ module.exports = function () {
result.push(await Promise.resolve(val));
}, 2)
- q.drain = () => {
+ q.drain(() => {
expect(result).to.eql([1, 2, 3]);
done();
- };
+ });
q.push(1);
q.push(2);
@@ -382,10 +382,10 @@ module.exports = function () {
result.push(await Promise.resolve(val));
}, 2)
- q.drain = () => {
+ q.drain(() => {
expect(result).to.eql([1, 2, 3]);
done();
- };
+ });
q.push(1);
q.push(2);
diff --git a/test/es2017/awaitableFunctions.js b/test/es2017/awaitableFunctions.js
index 55bcc9e..e8adf7b 100644
--- a/test/es2017/awaitableFunctions.js
+++ b/test/es2017/awaitableFunctions.js
@@ -586,6 +586,51 @@ module.exports = function () {
expect(calls).to.eql([1, 2, 3, 4])
});
+ it('should work with queues', async () => {
+ const q = async.queue(async (data) => {
+ if (data === 6) throw new Error('oh noes')
+ await new Promise(resolve => setTimeout(resolve, 10))
+ return data
+ }, 5)
+
+ const calls = []
+ const errorCalls = []
+ const emptyCalls = []
+ q.error().catch(d => errorCalls.push('error ' + d))
+ q.saturated().then(() => calls.push('saturated'))
+ q.unsaturated().then(() => calls.push('unsaturated'))
+ q.empty().then(() => emptyCalls.push('empty'))
+
+ q.push(1).then(d => calls.push('push cb ' + d))
+ q.push(2).then(d => calls.push('push cb ' + d))
+ q.push([3, 4, 5, 6]).map(p => p.then(d => calls.push('push cb ' + d)))
+ q.push(7).then(d => calls.push('push cb ' + d))
+ q.push(8).then(d => calls.push('push cb ' + d))
+
+ const multiP = Promise.all(q.push([9, 10]))
+
+ await q.drain()
+ await multiP
+ expect(calls).to.eql([
+ 'saturated',
+ 'push cb 1',
+ 'push cb 2',
+ 'push cb 3',
+ 'push cb 4',
+ 'push cb 5',
+ 'push cb 7',
+ 'unsaturated',
+ 'push cb 8'
+ ])
+
+ expect(errorCalls).to.eql([
+ 'error Error: oh noes',
+ ])
+ expect(emptyCalls).to.eql([
+ 'empty',
+ ])
+ })
+
/*
* Util
*/
diff --git a/test/priorityQueue.js b/test/priorityQueue.js
index c3ece02..96936fd 100644
--- a/test/priorityQueue.js
+++ b/test/priorityQueue.js
@@ -40,7 +40,7 @@ describe('priorityQueue', () => {
expect(q.length()).to.equal(4);
expect(q.concurrency).to.equal(1);
- q.drain = function () {
+ q.drain(() => {
expect(call_order).to.eql([
'process 2', 'callback 2',
'process 1', 'callback 1',
@@ -50,7 +50,7 @@ describe('priorityQueue', () => {
expect(q.concurrency).to.equal(1);
expect(q.length()).to.equal(0);
done();
- };
+ });
});
it('concurrency', (done) => {
@@ -95,7 +95,7 @@ describe('priorityQueue', () => {
expect(q.length()).to.equal(4);
expect(q.concurrency).to.equal(2);
- q.drain = function () {
+ q.drain(() => {
expect(call_order).to.eql([
'process 1', 'callback 1',
'process 2', 'callback 2',
@@ -105,7 +105,7 @@ describe('priorityQueue', () => {
expect(q.concurrency).to.equal(2);
expect(q.length()).to.equal(0);
done();
- };
+ });
});
it('pause in worker with concurrency', (done) => {
@@ -131,10 +131,10 @@ describe('priorityQueue', () => {
q.push({ id: 4 });
q.push({ id: 5 });
- q.drain = function () {
+ q.drain(() => {
expect(call_order).to.eql([1, 2, 3, 4, 5]);
done();
- };
+ });
});
context('q.saturated(): ', () => {
@@ -144,10 +144,10 @@ describe('priorityQueue', () => {
calls.push('process ' + task);
async.setImmediate(cb);
}, 4);
- q.saturated = function() {
+ q.saturated(() => {
calls.push('saturated');
- };
- q.empty = function() {
+ });
+ q.empty(() => {
expect(calls.indexOf('saturated')).to.be.above(-1);
setTimeout(() => {
expect(calls).eql([
@@ -166,7 +166,7 @@ describe('priorityQueue', () => {
]);
done();
}, 50);
- };
+ });
q.push('foo0', 5, () => {calls.push('foo0 cb');});
q.push('foo1', 4, () => {calls.push('foo1 cb');});
q.push('foo2', 3, () => {calls.push('foo2 cb');});
@@ -206,10 +206,10 @@ describe('priorityQueue', () => {
calls.push('process ' + task);
setTimeout(cb, 10);
}, 4);
- q.unsaturated = function() {
+ q.unsaturated(() => {
calls.push('unsaturated');
- };
- q.empty = function() {
+ });
+ q.empty(() => {
expect(calls.indexOf('unsaturated')).to.be.above(-1);
setTimeout(() => {
expect(calls).eql([
@@ -231,7 +231,7 @@ describe('priorityQueue', () => {
]);
done();
}, 50);
- };
+ });
q.push('foo0', 5, () => {calls.push('foo0 cb');});
q.push('foo1', 4, () => {calls.push('foo1 cb');});
q.push('foo2', 3, () => {calls.push('foo2 cb');});
@@ -262,7 +262,7 @@ describe('priorityQueue', () => {
expect(q.length()).to.equal(2);
- q.drain = function () {
+ q.drain(() => {
expect(call_order).to.eql([
'process 1', 'callback 1',
'process 2', 'callback 2'
@@ -271,7 +271,7 @@ describe('priorityQueue', () => {
expect(q.length()).to.equal(0);
expect(q.running()).to.equal(0);
done();
- };
+ });
q.push([], 1, () => {});
});
diff --git a/test/queue.js b/test/queue.js
index f103aaa..5c31116 100644
--- a/test/queue.js
+++ b/test/queue.js
@@ -50,7 +50,7 @@ describe('queue', function(){
expect(q.length()).to.equal(4);
expect(q.concurrency).to.equal(2);
- q.drain = function () {
+ q.drain(() => {
expect(call_order).to.eql([
'process 2', 'callback 2',
'process 1', 'callback 1',
@@ -60,7 +60,7 @@ describe('queue', function(){
expect(q.concurrency).to.equal(2);
expect(q.length()).to.equal(0);
done();
- };
+ });
});
it('default concurrency', (done) => {
@@ -103,7 +103,7 @@ describe('queue', function(){
expect(q.length()).to.equal(4);
expect(q.concurrency).to.equal(1);
- q.drain = function () {
+ q.drain(() => {
expect(call_order).to.eql([
'process 1', 'callback 1',
'process 2', 'callback 2',
@@ -113,7 +113,7 @@ describe('queue', function(){
expect(q.concurrency).to.equal(1);
expect(q.length()).to.equal(0);
done();
- };
+ });
});
it('zero concurrency', (done) => {
@@ -132,10 +132,10 @@ describe('queue', function(){
callback(task.name === 'foo' ? new Error('fooError') : null);
}, 2);
- q.drain = function() {
+ q.drain(() => {
expect(results).to.eql(['bar', 'fooError']);
done();
- };
+ });
q.push({name: 'bar'}, (err) => {
if(err) {
@@ -163,17 +163,17 @@ describe('queue', function(){
callback(task.name === 'foo' ? new Error('fooError') : null);
}, 2);
- q.error = function(error, task) {
+ q.error((error, task) => {
expect(error).to.exist;
expect(error.message).to.equal('fooError');
expect(task.name).to.equal('foo');
results.push('fooError');
- };
+ });
- q.drain = function() {
+ q.drain (() => {
expect(results).to.eql(['fooError', 'bar']);
done();
- };
+ });
q.push({name: 'foo'});
@@ -201,9 +201,7 @@ describe('queue', function(){
q.push('');
}
- q.drain = function(){
- done();
- };
+ q.drain(done);
setTimeout(() => {
expect(q.concurrency).to.equal(1);
@@ -245,7 +243,7 @@ describe('queue', function(){
q.push(3);
q.push(4);
- q.drain = function () {
+ q.drain(() => {
expect(running).to.eql(0);
expect(concurrencyList).to.eql([1, 2, 2, 2]);
expect(call_order).to.eql([
@@ -255,7 +253,7 @@ describe('queue', function(){
'process 3'
]);
done();
- };
+ });
});
it('push with non-function', (done) => {
@@ -320,7 +318,7 @@ describe('queue', function(){
expect(q.length()).to.equal(4);
expect(q.concurrency).to.equal(2);
- q.drain = function () {
+ q.drain(() => {
expect(call_order).to.eql([
'process 2', 'callback 2',
'process 1', 'callback 1',
@@ -330,7 +328,7 @@ describe('queue', function(){
expect(q.concurrency).to.equal(2);
expect(q.length()).to.equal(0);
done();
- };
+ });
});
it('idle', (done) => {
@@ -351,11 +349,11 @@ describe('queue', function(){
// Queue is busy when tasks added
expect(q.idle()).to.equal(false);
- q.drain = function() {
+ q.drain(() => {
// Queue is idle after drain
expect(q.idle()).to.equal(true);
done();
- };
+ });
});
it('pause', (done) => {
@@ -397,7 +395,7 @@ describe('queue', function(){
q.resume();
q.push(5);
q.push(6);
- q.drain = drain;
+ q.drain(drain);
}
function drain () {
expect(concurrencyList).to.eql([1, 2, 2, 1, 2, 2]);
@@ -436,10 +434,10 @@ describe('queue', function(){
q.push({ id: 4 });
q.push({ id: 5 });
- q.drain = function () {
+ q.drain(() => {
expect(call_order).to.eql([1, 2, 3, 4, 5]);
done();
- };
+ });
});
it('start paused', (done) => {
@@ -462,9 +460,7 @@ describe('queue', function(){
q.resume();
}, 5);
- q.drain = function () {
- done();
- };
+ q.drain(done);
});
it('kill', (done) => {
@@ -473,9 +469,9 @@ describe('queue', function(){
throw new Error("Function should never be called");
}, 20);
}, 1);
- q.drain = function() {
+ q.drain(() => {
throw new Error("Function should never be called");
- };
+ });
q.push(0);
@@ -496,15 +492,15 @@ describe('queue', function(){
}, 3);
q.concurrency = 3;
- q.saturated = function() {
+ q.saturated(() => {
assert(q.running() == 3, 'queue should be saturated now');
calls.push('saturated');
- };
- q.empty = function() {
+ });
+ q.empty(() => {
assert(q.length() === 0, 'queue should be empty now');
calls.push('empty');
- };
- q.drain = function() {
+ });
+ q.drain(() => {
assert(
q.length() === 0 && q.running() === 0,
'queue should be empty now and no more workers should be running'
@@ -528,7 +524,7 @@ describe('queue', function(){
'drain'
]);
done();
- };
+ });
q.push('foo', () => {calls.push('foo cb');});
q.push('bar', () => {calls.push('bar cb');});
q.push('zoo', () => {calls.push('zoo cb');});
@@ -544,7 +540,7 @@ describe('queue', function(){
async.setImmediate(cb);
}, 3);
- q.drain = function() {
+ q.drain(() => {
assert(
q.length() === 0 && q.running() === 0,
'queue should be empty now and no more workers should be running'
@@ -554,7 +550,7 @@ describe('queue', function(){
'drain'
]);
done();
- };
+ });
q.push([]);
});
@@ -568,14 +564,14 @@ describe('queue', function(){
async.setImmediate(cb);
}, 1);
- q.empty = function () {
+ q.empty(() => {
calls.push('empty');
assert(q.idle() === false,
'tasks should be running when empty is called')
expect(q.running()).to.equal(1);
- }
+ })
- q.drain = function() {
+ q.drain(() => {
calls.push('drain');
expect(calls).to.eql([
'empty',
@@ -583,7 +579,7 @@ describe('queue', function(){
'drain'
]);
done();
- };
+ });
q.push(1);
});
@@ -593,13 +589,13 @@ describe('queue', function(){
async.setImmediate(cb);
}, 2);
- q.saturated = function () {
+ q.saturated(() => {
saturatedCalled = true;
- };
- q.drain = function () {
+ })
+ q.drain(() => {
assert(saturatedCalled, "saturated not called");
done();
- };
+ })
q.push(['foo', 'bar', 'baz', 'moo']);
});
@@ -623,10 +619,10 @@ describe('queue', function(){
calls.push('process ' + task);
async.setImmediate(cb);
}, 4);
- q.saturated = function() {
+ q.saturated(() => {
calls.push('saturated');
- };
- q.empty = function() {
+ });
+ q.empty(() => {
expect(calls.indexOf('saturated')).to.be.above(-1);
setTimeout(() => {
expect(calls).eql([
@@ -645,7 +641,7 @@ describe('queue', function(){
]);
done();
}, 50);
- };
+ });
q.push('foo0', () => {calls.push('foo0 cb');});
q.push('foo1', () => {calls.push('foo1 cb');});
q.push('foo2', () => {calls.push('foo2 cb');});
@@ -683,10 +679,10 @@ describe('queue', function(){
calls.push('process ' + task);
async.setImmediate(cb);
}, 4);
- q.unsaturated = function() {
+ q.unsaturated(() => {
calls.push('unsaturated');
- };
- q.empty = function() {
+ });
+ q.empty(() => {
expect(calls.indexOf('unsaturated')).to.be.above(-1);
setTimeout(() => {
expect(calls).eql([
@@ -708,7 +704,7 @@ describe('queue', function(){
]);
done();
}, 50);
- };
+ });
q.push('foo0', () => {calls.push('foo0 cb');});
q.push('foo1', () => {calls.push('foo1 cb');});
q.push('foo2', () => {calls.push('foo2 cb');});
@@ -726,11 +722,11 @@ describe('queue', function(){
});
}, 2);
- q.drain = function() {
+ q.drain(() => {
expect(q.workersList().length).to.equal(0);
expect(q.running()).to.equal(0);
done();
- };
+ });
q.push('foo');
q.push('bar');
@@ -767,11 +763,11 @@ describe('queue', function(){
});
}, 2);
- q.drain = function() {
+ q.drain(() => {
expect(q.workersList()).to.eql([]);
expect(q.workersList().length).to.equal(q.running());
done();
- };
+ });
q.push('foo');
q.push('bar');
@@ -792,10 +788,10 @@ describe('queue', function(){
return node.data === 3;
});
- q.drain = function () {
+ q.drain(() => {
expect(result).to.eql([1, 2, 4, 5]);
done();
- }
+ })
});
it('should be iterable', (done) => {
@@ -811,9 +807,16 @@ describe('queue', function(){
expect([...q]).to.eql([1, 2, 3, 4, 5]);
- q.drain = function () {
+ q.drain(() => {
expect([...q]).to.eql([]);
done();
- }
+ })
+ })
+
+ it('should error when re-assigning event methods', () => {
+ var q = async.queue(() => {})
+ expect(() => {
+ q.drain = () => {}
+ }).to.throw()
})
});