summaryrefslogtreecommitdiff
path: root/lib/async.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/async.js')
-rw-r--r--lib/async.js55
1 files changed, 29 insertions, 26 deletions
diff --git a/lib/async.js b/lib/async.js
index add4219..629b2a5 100644
--- a/lib/async.js
+++ b/lib/async.js
@@ -838,8 +838,21 @@
if (q.tasks.length === q.concurrency) {
q.saturated();
}
- async.setImmediate(q.process);
});
+ async.setImmediate(q.process);
+ }
+ function _next(q, tasks) {
+ return function(){
+ workers -= 1;
+ var args = arguments;
+ _arrayEach(tasks, function (task) {
+ task.callback.apply(task, args);
+ });
+ if (q.tasks.length + workers === 0) {
+ q.drain();
+ }
+ q.process();
+ };
}
var workers = 0;
@@ -863,32 +876,22 @@
},
process: function () {
if (!q.paused && workers < q.concurrency && q.tasks.length) {
- var tasks = payload ?
- q.tasks.splice(0, payload) :
- q.tasks.splice(0, q.tasks.length);
-
- var data = _map(tasks, function (task) {
- return task.data;
- });
-
- if (q.tasks.length === 0) {
- q.empty();
- }
- workers += 1;
- var cb = only_once(next);
- worker(data, cb);
- }
-
- function next() {
- workers -= 1;
- var args = arguments;
- _arrayEach(tasks, function (task) {
- task.callback.apply(task, args);
- });
- if (q.tasks.length + workers === 0) {
- q.drain();
+ while(workers < q.concurrency && q.tasks.length){
+ var tasks = payload ?
+ q.tasks.splice(0, payload) :
+ q.tasks.splice(0, q.tasks.length);
+
+ var data = _map(tasks, function (task) {
+ return task.data;
+ });
+
+ if (q.tasks.length === 0) {
+ q.empty();
+ }
+ workers += 1;
+ var cb = only_once(_next(q, tasks));
+ worker(data, cb);
}
- q.process();
}
},
length: function () {