summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorzaphod1984 <zaphod84@gmx.de>2012-01-27 10:23:50 +0100
committerCaolan McMahon <caolan@caolanmcmahon.com>2012-02-12 18:13:19 -0800
commit0225f709ed4f4cce4861b193e80ae2320adf3d04 (patch)
tree6a8932cc5ff63677d750704a8444a5c1b863a9d7
parentc89db1fc1b37f015ef6992f5f156e46e44399431 (diff)
downloadasync-0225f709ed4f4cce4861b193e80ae2320adf3d04.tar.gz
added possibility to push taskbulks, added test
Conflicts: lib/async.js
-rw-r--r--README.md7
-rw-r--r--lib/async.js16
-rw-r--r--test/test-async.js36
3 files changed, 56 insertions, 3 deletions
diff --git a/README.md b/README.md
index 94ee562..dbefb34 100644
--- a/README.md
+++ b/README.md
@@ -701,6 +701,7 @@ methods:
alter the concurrency on-the-fly.
* push(task, [callback]) - add a new task to the queue, the callback is called
once the worker has finished processing the task.
+ instead of a single task, an array of tasks can be submitted. the respective callback is used for every task in the list.
* saturated - a callback that is called when the queue length hits the concurrency and further tasks will be queued
* empty - a callback that is called when the last item from the queue is given to a worker
* drain - a callback that is called when the last item from the queue has returned from the worker
@@ -729,6 +730,12 @@ __Example__
console.log('finished processing bar');
});
+ // add some items to the queue (batch-wise)
+
+ q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function (err) {
+ console.log('finished processing bar');
+ });
+
---------------------------------------
diff --git a/lib/async.js b/lib/async.js
index b5bea53..0bcd80d 100644
--- a/lib/async.js
+++ b/lib/async.js
@@ -595,9 +595,19 @@
empty: null,
drain: null,
push: function (data, callback) {
- q.tasks.push({data: data, callback: typeof callback === 'function' ? callback : null});
- if(q.saturated && q.tasks.length == concurrency) q.saturated();
- async.nextTick(q.process);
+ if(data.constructor !== Array) {
+ data = [data];
+ }
+ _forEach(data, function(task) {
+ q.tasks.push({
+ data: task,
+ callback: typeof callback === 'function' ? callback : null
+ });
+ if (q.saturated && q.tasks.length == concurrency) {
+ q.saturated();
+ }
+ async.nextTick(q.process);
+ });
},
process: function () {
if (workers < q.concurrency && q.tasks.length) {
diff --git a/test/test-async.js b/test/test-async.js
index 895d5a6..4505fff 100644
--- a/test/test-async.js
+++ b/test/test-async.js
@@ -1358,6 +1358,42 @@ exports['queue push without callback'] = function (test) {
}, 200);
};
+exports['queue bulk task'] = function (test) {
+ var call_order = [],
+ delays = [40,20,60,20];
+
+ // worker1: --1-4
+ // worker2: -2---3
+ // order of completion: 2,1,4,3
+
+ var q = async.queue(function (task, callback) {
+ setTimeout(function () {
+ call_order.push('process ' + task);
+ callback('error', task);
+ }, delays.splice(0,1)[0]);
+ }, 2);
+
+ q.push( [1,2,3,4], function (err, arg) {
+ test.equal(err, 'error');
+ call_order.push('callback ' + arg);
+ });
+
+ test.equal(q.length(), 4);
+ test.equal(q.concurrency, 2);
+
+ setTimeout(function () {
+ test.same(call_order, [
+ 'process 2', 'callback 2',
+ 'process 1', 'callback 1',
+ 'process 4', 'callback 4',
+ 'process 3', 'callback 3'
+ ]);
+ test.equal(q.concurrency, 2);
+ test.equal(q.length(), 0);
+ test.done();
+ }, 200);
+};
+
exports['memoize'] = function (test) {
test.expect(4);
var call_order = [];