summaryrefslogtreecommitdiff
path: root/lib/internal/queue.js
diff options
context:
space:
mode:
Diffstat (limited to 'lib/internal/queue.js')
-rw-r--r--lib/internal/queue.js52
1 files changed, 26 insertions, 26 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js
index e57ef14..9debd64 100644
--- a/lib/internal/queue.js
+++ b/lib/internal/queue.js
@@ -1,11 +1,10 @@
import arrayEach from 'lodash/_arrayEach';
-import arrayMap from 'lodash/_arrayMap';
import isArray from 'lodash/isArray';
import noop from 'lodash/noop';
-import property from 'lodash/_baseProperty';
import onlyOnce from './onlyOnce';
import setImmediate from './setImmediate';
+import DLL from './DoublyLinkedList';
export default function queue(worker, concurrency, payload) {
if (concurrency == null) {
@@ -14,7 +13,8 @@ export default function queue(worker, concurrency, payload) {
else if(concurrency === 0) {
throw new Error('Concurrency must not be zero');
}
- function _insert(q, data, pos, callback) {
+
+ function _insert(data, pos, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
@@ -35,19 +35,19 @@ export default function queue(worker, concurrency, payload) {
};
if (pos) {
- q.tasks.unshift(item);
+ q._tasks.unshift(item);
} else {
- q.tasks.push(item);
+ q._tasks.push(item);
}
});
setImmediate(q.process);
}
- function _next(q, tasks) {
+
+ function _next(tasks) {
return function(){
workers -= 1;
-
var removed = false;
var args = arguments;
arrayEach(tasks, function (task) {
@@ -69,7 +69,7 @@ export default function queue(worker, concurrency, payload) {
q.unsaturated();
}
- if (q.tasks.length + workers === 0) {
+ if (q._tasks.length + workers === 0) {
q.drain();
}
q.process();
@@ -79,7 +79,7 @@ export default function queue(worker, concurrency, payload) {
var workers = 0;
var workersList = [];
var q = {
- tasks: [],
+ _tasks: new DLL(),
concurrency: concurrency,
payload: payload,
saturated: noop,
@@ -91,25 +91,27 @@ export default function queue(worker, concurrency, payload) {
started: false,
paused: false,
push: function (data, callback) {
- _insert(q, data, false, callback);
+ _insert(data, false, callback);
},
kill: function () {
q.drain = noop;
- q.tasks = [];
+ q._tasks.empty();
},
unshift: function (data, callback) {
- _insert(q, data, true, callback);
+ _insert(data, true, callback);
},
process: function () {
- while(!q.paused && workers < q.concurrency && q.tasks.length){
-
- var tasks = q.payload ?
- q.tasks.splice(0, q.payload) :
- q.tasks.splice(0, q.tasks.length);
-
- var data = arrayMap(tasks, property('data'));
+ while(!q.paused && workers < q.concurrency && q._tasks.length){
+ var tasks = [], data = [];
+ var l = q._tasks.length;
+ if (q.payload) l = Math.min(l, q.payload);
+ for (var i = 0; i < l; i++) {
+ var node = q._tasks.shift();
+ tasks.push(node);
+ data.push(node.data);
+ }
- if (q.tasks.length === 0) {
+ if (q._tasks.length === 0) {
q.empty();
}
workers += 1;
@@ -119,14 +121,12 @@ export default function queue(worker, concurrency, payload) {
q.saturated();
}
- var cb = onlyOnce(_next(q, tasks));
+ var cb = onlyOnce(_next(tasks));
worker(data, cb);
-
-
}
},
length: function () {
- return q.tasks.length;
+ return q._tasks.length;
},
running: function () {
return workers;
@@ -135,7 +135,7 @@ export default function queue(worker, concurrency, payload) {
return workersList;
},
idle: function() {
- return q.tasks.length + workers === 0;
+ return q._tasks.length + workers === 0;
},
pause: function () {
q.paused = true;
@@ -143,7 +143,7 @@ export default function queue(worker, concurrency, payload) {
resume: function () {
if (q.paused === false) { return; }
q.paused = false;
- var resumeCount = Math.min(q.concurrency, q.tasks.length);
+ var resumeCount = Math.min(q.concurrency, q._tasks.length);
// Need to call q.process once per concurrent
// worker to preserve full concurrency after pause
for (var w = 1; w <= resumeCount; w++) {