diff options
author | zaphod1984 <zaphod84@gmx.de> | 2012-01-27 10:23:50 +0100 |
---|---|---|
committer | Caolan McMahon <caolan@caolanmcmahon.com> | 2012-02-12 18:13:19 -0800 |
commit | 0225f709ed4f4cce4861b193e80ae2320adf3d04 (patch) | |
tree | 6a8932cc5ff63677d750704a8444a5c1b863a9d7 | |
parent | c89db1fc1b37f015ef6992f5f156e46e44399431 (diff) | |
download | async-0225f709ed4f4cce4861b193e80ae2320adf3d04.tar.gz |
added possibility to push taskbulks, added test
Conflicts:
lib/async.js
-rw-r--r-- | README.md | 7 | ||||
-rw-r--r-- | lib/async.js | 16 | ||||
-rw-r--r-- | test/test-async.js | 36 |
3 files changed, 56 insertions, 3 deletions
@@ -701,6 +701,7 @@ methods: alter the concurrency on-the-fly. * push(task, [callback]) - add a new task to the queue, the callback is called once the worker has finished processing the task. + instead of a single task, an array of tasks can be submitted. the respective callback is used for every task in the list. * saturated - a callback that is called when the queue length hits the concurrency and further tasks will be queued * empty - a callback that is called when the last item from the queue is given to a worker * drain - a callback that is called when the last item from the queue has returned from the worker @@ -729,6 +730,12 @@ __Example__ console.log('finished processing bar'); }); + // add some items to the queue (batch-wise) + + q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function (err) { + console.log('finished processing bar'); + }); + --------------------------------------- diff --git a/lib/async.js b/lib/async.js index b5bea53..0bcd80d 100644 --- a/lib/async.js +++ b/lib/async.js @@ -595,9 +595,19 @@ empty: null, drain: null, push: function (data, callback) { - q.tasks.push({data: data, callback: typeof callback === 'function' ? callback : null}); - if(q.saturated && q.tasks.length == concurrency) q.saturated(); - async.nextTick(q.process); + if(data.constructor !== Array) { + data = [data]; + } + _forEach(data, function(task) { + q.tasks.push({ + data: task, + callback: typeof callback === 'function' ? callback : null + }); + if (q.saturated && q.tasks.length == concurrency) { + q.saturated(); + } + async.nextTick(q.process); + }); }, process: function () { if (workers < q.concurrency && q.tasks.length) { diff --git a/test/test-async.js b/test/test-async.js index 895d5a6..4505fff 100644 --- a/test/test-async.js +++ b/test/test-async.js @@ -1358,6 +1358,42 @@ exports['queue push without callback'] = function (test) { }, 200); }; +exports['queue bulk task'] = function (test) { + var call_order = [], + delays = [40,20,60,20]; + + // worker1: --1-4 + // worker2: -2---3 + // order of completion: 2,1,4,3 + + var q = async.queue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + callback('error', task); + }, delays.splice(0,1)[0]); + }, 2); + + q.push( [1,2,3,4], function (err, arg) { + test.equal(err, 'error'); + call_order.push('callback ' + arg); + }); + + test.equal(q.length(), 4); + test.equal(q.concurrency, 2); + + setTimeout(function () { + test.same(call_order, [ + 'process 2', 'callback 2', + 'process 1', 'callback 1', + 'process 4', 'callback 4', + 'process 3', 'callback 3' + ]); + test.equal(q.concurrency, 2); + test.equal(q.length(), 0); + test.done(); + }, 200); +}; + exports['memoize'] = function (test) { test.expect(4); var call_order = []; |