diff options
Diffstat (limited to 'lib/internal/queue.js')
-rw-r--r-- | lib/internal/queue.js | 33 |
1 files changed, 23 insertions, 10 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 479eaf8..1a3f4cb 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -1,7 +1,6 @@ import indexOf from 'lodash/_baseIndexOf'; import isArray from 'lodash/isArray'; import noop from 'lodash/noop'; -import rest from './rest'; import onlyOnce from './onlyOnce'; import setImmediate from './setImmediate'; @@ -20,6 +19,7 @@ export default function queue(worker, concurrency, payload) { var numRunning = 0; var workersList = []; + var processingScheduled = false; function _insert(data, insertAtFront, callback) { if (callback != null && typeof callback !== 'function') { throw new Error('task callback must be a function'); @@ -47,24 +47,34 @@ export default function queue(worker, concurrency, payload) { q._tasks.push(item); } } - setImmediate(q.process); + + if (!processingScheduled) { + processingScheduled = true; + setImmediate(function() { + processingScheduled = false; + q.process(); + }); + } } function _next(tasks) { - return rest(function(args){ + return function(err){ numRunning -= 1; for (var i = 0, l = tasks.length; i < l; i++) { var task = tasks[i]; + var index = indexOf(workersList, task, 0); - if (index >= 0) { - workersList.splice(index) + if (index === 0) { + workersList.shift(); + } else if (index > 0) { + workersList.splice(index, 1); } - task.callback.apply(task, args); + task.callback.apply(task, arguments); - if (args[0] != null) { - q.error(args[0], task.data); + if (err != null) { + q.error(err, task.data); } } @@ -76,7 +86,7 @@ export default function queue(worker, concurrency, payload) { q.drain(); } q.process(); - }); + }; } var isProcessing = false; @@ -102,6 +112,9 @@ export default function queue(worker, concurrency, payload) { unshift: function (data, callback) { _insert(data, true, callback); }, + remove: function (testFn) { + q._tasks.remove(testFn); + }, process: function () { // Avoid trying to start too many processing operations. This can occur // when callbacks resolve synchronously (#1267). @@ -116,11 +129,11 @@ export default function queue(worker, concurrency, payload) { for (var i = 0; i < l; i++) { var node = q._tasks.shift(); tasks.push(node); + workersList.push(node); data.push(node.data); } numRunning += 1; - workersList.push(tasks[0]); if (q._tasks.length === 0) { q.empty(); |