summaryrefslogtreecommitdiff
path: root/lib/internal/queue.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/internal/queue.js')
-rw-r--r--lib/internal/queue.js23
1 files changed, 13 insertions, 10 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js
index 7a438da..c825e62 100644
--- a/lib/internal/queue.js
+++ b/lib/internal/queue.js
@@ -6,6 +6,7 @@ import rest from './rest';
import onlyOnce from './onlyOnce';
import setImmediate from './setImmediate';
import DLL from './DoublyLinkedList';
+import wrapAsync from './wrapAsync';
export default function queue(worker, concurrency, payload) {
if (concurrency == null) {
@@ -15,6 +16,10 @@ export default function queue(worker, concurrency, payload) {
throw new Error('Concurrency must not be zero');
}
+ var _worker = wrapAsync(worker);
+ var numRunning = 0;
+ var workersList = [];
+
function _insert(data, insertAtFront, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
@@ -47,7 +52,7 @@ export default function queue(worker, concurrency, payload) {
function _next(tasks) {
return rest(function(args){
- workers -= 1;
+ numRunning -= 1;
for (var i = 0, l = tasks.length; i < l; i++) {
var task = tasks[i];
@@ -63,7 +68,7 @@ export default function queue(worker, concurrency, payload) {
}
}
- if (workers <= (q.concurrency - q.buffer) ) {
+ if (numRunning <= (q.concurrency - q.buffer) ) {
q.unsaturated();
}
@@ -74,8 +79,6 @@ export default function queue(worker, concurrency, payload) {
});
}
- var workers = 0;
- var workersList = [];
var isProcessing = false;
var q = {
_tasks: new DLL(),
@@ -106,7 +109,7 @@ export default function queue(worker, concurrency, payload) {
return;
}
isProcessing = true;
- while(!q.paused && workers < q.concurrency && q._tasks.length){
+ while(!q.paused && numRunning < q.concurrency && q._tasks.length){
var tasks = [], data = [];
var l = q._tasks.length;
if (q.payload) l = Math.min(l, q.payload);
@@ -119,15 +122,15 @@ export default function queue(worker, concurrency, payload) {
if (q._tasks.length === 0) {
q.empty();
}
- workers += 1;
+ numRunning += 1;
workersList.push(tasks[0]);
- if (workers === q.concurrency) {
+ if (numRunning === q.concurrency) {
q.saturated();
}
var cb = onlyOnce(_next(tasks));
- worker(data, cb);
+ _worker(data, cb);
}
isProcessing = false;
},
@@ -135,13 +138,13 @@ export default function queue(worker, concurrency, payload) {
return q._tasks.length;
},
running: function () {
- return workers;
+ return numRunning;
},
workersList: function () {
return workersList;
},
idle: function() {
- return q._tasks.length + workers === 0;
+ return q._tasks.length + numRunning === 0;
},
pause: function () {
q.paused = true;