diff options
Diffstat (limited to 'lib/internal/queue.js')
-rw-r--r-- | lib/internal/queue.js | 52 |
1 files changed, 26 insertions, 26 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js index e57ef14..9debd64 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -1,11 +1,10 @@ import arrayEach from 'lodash/_arrayEach'; -import arrayMap from 'lodash/_arrayMap'; import isArray from 'lodash/isArray'; import noop from 'lodash/noop'; -import property from 'lodash/_baseProperty'; import onlyOnce from './onlyOnce'; import setImmediate from './setImmediate'; +import DLL from './DoublyLinkedList'; export default function queue(worker, concurrency, payload) { if (concurrency == null) { @@ -14,7 +13,8 @@ export default function queue(worker, concurrency, payload) { else if(concurrency === 0) { throw new Error('Concurrency must not be zero'); } - function _insert(q, data, pos, callback) { + + function _insert(data, pos, callback) { if (callback != null && typeof callback !== 'function') { throw new Error('task callback must be a function'); } @@ -35,19 +35,19 @@ export default function queue(worker, concurrency, payload) { }; if (pos) { - q.tasks.unshift(item); + q._tasks.unshift(item); } else { - q.tasks.push(item); + q._tasks.push(item); } }); setImmediate(q.process); } - function _next(q, tasks) { + + function _next(tasks) { return function(){ workers -= 1; - var removed = false; var args = arguments; arrayEach(tasks, function (task) { @@ -69,7 +69,7 @@ export default function queue(worker, concurrency, payload) { q.unsaturated(); } - if (q.tasks.length + workers === 0) { + if (q._tasks.length + workers === 0) { q.drain(); } q.process(); @@ -79,7 +79,7 @@ export default function queue(worker, concurrency, payload) { var workers = 0; var workersList = []; var q = { - tasks: [], + _tasks: new DLL(), concurrency: concurrency, payload: payload, saturated: noop, @@ -91,25 +91,27 @@ export default function queue(worker, concurrency, payload) { started: false, paused: false, push: function (data, callback) { - _insert(q, data, false, callback); + _insert(data, false, callback); }, kill: function () { q.drain = noop; - q.tasks = []; + q._tasks.empty(); }, unshift: function (data, callback) { - _insert(q, data, true, callback); + _insert(data, true, callback); }, process: function () { - while(!q.paused && workers < q.concurrency && q.tasks.length){ - - var tasks = q.payload ? - q.tasks.splice(0, q.payload) : - q.tasks.splice(0, q.tasks.length); - - var data = arrayMap(tasks, property('data')); + while(!q.paused && workers < q.concurrency && q._tasks.length){ + var tasks = [], data = []; + var l = q._tasks.length; + if (q.payload) l = Math.min(l, q.payload); + for (var i = 0; i < l; i++) { + var node = q._tasks.shift(); + tasks.push(node); + data.push(node.data); + } - if (q.tasks.length === 0) { + if (q._tasks.length === 0) { q.empty(); } workers += 1; @@ -119,14 +121,12 @@ export default function queue(worker, concurrency, payload) { q.saturated(); } - var cb = onlyOnce(_next(q, tasks)); + var cb = onlyOnce(_next(tasks)); worker(data, cb); - - } }, length: function () { - return q.tasks.length; + return q._tasks.length; }, running: function () { return workers; @@ -135,7 +135,7 @@ export default function queue(worker, concurrency, payload) { return workersList; }, idle: function() { - return q.tasks.length + workers === 0; + return q._tasks.length + workers === 0; }, pause: function () { q.paused = true; @@ -143,7 +143,7 @@ export default function queue(worker, concurrency, payload) { resume: function () { if (q.paused === false) { return; } q.paused = false; - var resumeCount = Math.min(q.concurrency, q.tasks.length); + var resumeCount = Math.min(q.concurrency, q._tasks.length); // Need to call q.process once per concurrent // worker to preserve full concurrency after pause for (var w = 1; w <= resumeCount; w++) { |