diff options
Diffstat (limited to 'lib/async.js')
-rw-r--r-- | lib/async.js | 55 |
1 files changed, 29 insertions, 26 deletions
diff --git a/lib/async.js b/lib/async.js index add4219..629b2a5 100644 --- a/lib/async.js +++ b/lib/async.js @@ -838,8 +838,21 @@ if (q.tasks.length === q.concurrency) { q.saturated(); } - async.setImmediate(q.process); }); + async.setImmediate(q.process); + } + function _next(q, tasks) { + return function(){ + workers -= 1; + var args = arguments; + _arrayEach(tasks, function (task) { + task.callback.apply(task, args); + }); + if (q.tasks.length + workers === 0) { + q.drain(); + } + q.process(); + }; } var workers = 0; @@ -863,32 +876,22 @@ }, process: function () { if (!q.paused && workers < q.concurrency && q.tasks.length) { - var tasks = payload ? - q.tasks.splice(0, payload) : - q.tasks.splice(0, q.tasks.length); - - var data = _map(tasks, function (task) { - return task.data; - }); - - if (q.tasks.length === 0) { - q.empty(); - } - workers += 1; - var cb = only_once(next); - worker(data, cb); - } - - function next() { - workers -= 1; - var args = arguments; - _arrayEach(tasks, function (task) { - task.callback.apply(task, args); - }); - if (q.tasks.length + workers === 0) { - q.drain(); + while(workers < q.concurrency && q.tasks.length){ + var tasks = payload ? + q.tasks.splice(0, payload) : + q.tasks.splice(0, q.tasks.length); + + var data = _map(tasks, function (task) { + return task.data; + }); + + if (q.tasks.length === 0) { + q.empty(); + } + workers += 1; + var cb = only_once(_next(q, tasks)); + worker(data, cb); } - q.process(); } }, length: function () { |