summaryrefslogtreecommitdiff
path: root/lib/internal/queue.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/internal/queue.js')
-rw-r--r--lib/internal/queue.js33
1 files changed, 23 insertions, 10 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js
index 479eaf8..1a3f4cb 100644
--- a/lib/internal/queue.js
+++ b/lib/internal/queue.js
@@ -1,7 +1,6 @@
import indexOf from 'lodash/_baseIndexOf';
import isArray from 'lodash/isArray';
import noop from 'lodash/noop';
-import rest from './rest';
import onlyOnce from './onlyOnce';
import setImmediate from './setImmediate';
@@ -20,6 +19,7 @@ export default function queue(worker, concurrency, payload) {
var numRunning = 0;
var workersList = [];
+ var processingScheduled = false;
function _insert(data, insertAtFront, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
@@ -47,24 +47,34 @@ export default function queue(worker, concurrency, payload) {
q._tasks.push(item);
}
}
- setImmediate(q.process);
+
+ if (!processingScheduled) {
+ processingScheduled = true;
+ setImmediate(function() {
+ processingScheduled = false;
+ q.process();
+ });
+ }
}
function _next(tasks) {
- return rest(function(args){
+ return function(err){
numRunning -= 1;
for (var i = 0, l = tasks.length; i < l; i++) {
var task = tasks[i];
+
var index = indexOf(workersList, task, 0);
- if (index >= 0) {
- workersList.splice(index)
+ if (index === 0) {
+ workersList.shift();
+ } else if (index > 0) {
+ workersList.splice(index, 1);
}
- task.callback.apply(task, args);
+ task.callback.apply(task, arguments);
- if (args[0] != null) {
- q.error(args[0], task.data);
+ if (err != null) {
+ q.error(err, task.data);
}
}
@@ -76,7 +86,7 @@ export default function queue(worker, concurrency, payload) {
q.drain();
}
q.process();
- });
+ };
}
var isProcessing = false;
@@ -102,6 +112,9 @@ export default function queue(worker, concurrency, payload) {
unshift: function (data, callback) {
_insert(data, true, callback);
},
+ remove: function (testFn) {
+ q._tasks.remove(testFn);
+ },
process: function () {
// Avoid trying to start too many processing operations. This can occur
// when callbacks resolve synchronously (#1267).
@@ -116,11 +129,11 @@ export default function queue(worker, concurrency, payload) {
for (var i = 0; i < l; i++) {
var node = q._tasks.shift();
tasks.push(node);
+ workersList.push(node);
data.push(node.data);
}
numRunning += 1;
- workersList.push(tasks[0]);
if (q._tasks.length === 0) {
q.empty();