diff options
Diffstat (limited to 'lib/internal/queue.js')
-rw-r--r-- | lib/internal/queue.js | 23 |
1 files changed, 13 insertions, 10 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 7a438da..c825e62 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -6,6 +6,7 @@ import rest from './rest'; import onlyOnce from './onlyOnce'; import setImmediate from './setImmediate'; import DLL from './DoublyLinkedList'; +import wrapAsync from './wrapAsync'; export default function queue(worker, concurrency, payload) { if (concurrency == null) { @@ -15,6 +16,10 @@ export default function queue(worker, concurrency, payload) { throw new Error('Concurrency must not be zero'); } + var _worker = wrapAsync(worker); + var numRunning = 0; + var workersList = []; + function _insert(data, insertAtFront, callback) { if (callback != null && typeof callback !== 'function') { throw new Error('task callback must be a function'); @@ -47,7 +52,7 @@ export default function queue(worker, concurrency, payload) { function _next(tasks) { return rest(function(args){ - workers -= 1; + numRunning -= 1; for (var i = 0, l = tasks.length; i < l; i++) { var task = tasks[i]; @@ -63,7 +68,7 @@ export default function queue(worker, concurrency, payload) { } } - if (workers <= (q.concurrency - q.buffer) ) { + if (numRunning <= (q.concurrency - q.buffer) ) { q.unsaturated(); } @@ -74,8 +79,6 @@ export default function queue(worker, concurrency, payload) { }); } - var workers = 0; - var workersList = []; var isProcessing = false; var q = { _tasks: new DLL(), @@ -106,7 +109,7 @@ export default function queue(worker, concurrency, payload) { return; } isProcessing = true; - while(!q.paused && workers < q.concurrency && q._tasks.length){ + while(!q.paused && numRunning < q.concurrency && q._tasks.length){ var tasks = [], data = []; var l = q._tasks.length; if (q.payload) l = Math.min(l, q.payload); @@ -119,15 +122,15 @@ export default function queue(worker, concurrency, payload) { if (q._tasks.length === 0) { q.empty(); } - workers += 1; + numRunning += 1; workersList.push(tasks[0]); - if (workers === q.concurrency) { + if (numRunning === q.concurrency) { q.saturated(); } var cb = onlyOnce(_next(tasks)); - worker(data, cb); + _worker(data, cb); } isProcessing = false; }, @@ -135,13 +138,13 @@ export default function queue(worker, concurrency, payload) { return q._tasks.length; }, running: function () { - return workers; + return numRunning; }, workersList: function () { return workersList; }, idle: function() { - return q._tasks.length + workers === 0; + return q._tasks.length + numRunning === 0; }, pause: function () { q.paused = true; |