diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/auto.js | 2 | ||||
-rw-r--r-- | lib/internal/DoublyLinkedList.js | 64 | ||||
-rw-r--r-- | lib/internal/queue.js | 52 | ||||
-rw-r--r-- | lib/priorityQueue.js | 45 |
4 files changed, 107 insertions, 56 deletions
diff --git a/lib/auto.js b/lib/auto.js index bb8ee87..70a8067 100644 --- a/lib/auto.js +++ b/lib/auto.js @@ -4,9 +4,9 @@ import indexOf from 'lodash/_baseIndexOf'; import isArray from 'lodash/isArray'; import okeys from 'lodash/keys'; import noop from 'lodash/noop'; -import once from './internal/once'; import rest from 'lodash/rest'; +import once from './internal/once'; import onlyOnce from './internal/onlyOnce'; /** diff --git a/lib/internal/DoublyLinkedList.js b/lib/internal/DoublyLinkedList.js new file mode 100644 index 0000000..ae46957 --- /dev/null +++ b/lib/internal/DoublyLinkedList.js @@ -0,0 +1,64 @@ +// Simple doubly linked list (https://en.wikipedia.org/wiki/Doubly_linked_list) implementation +// used for queues. This implementation assumes that the node provided by the user can be modified +// to adjust the next and last properties. We implement only the minimal functionality +// for queue support. +export default function DLL() { + this.head = this.tail = null; + this.length = 0; +} + +function setInitial(dll, node) { + dll.length = 1; + dll.head = dll.tail = node; +} + +DLL.prototype.removeLink = function(node) { + if (!node) return node; + if (node.prev) node.prev.next = node.next; + else this.head = node.next + if (node.next) node.next.prev = node.prev; + else this.tail = node.prev; + + node.prev = node.next = null; + this.length -= 1; + return node; +} + +DLL.prototype.empty = DLL; + +DLL.prototype.insertAfter = function(node, newNode) { + newNode.prev = node; + newNode.next = node.next; + if (node.next) node.next.prev = newNode; + else this.tail = newNode; + node.next = newNode; + this.length += 1; +} + +DLL.prototype.insertBefore = function(node, newNode) { + newNode.prev = node.prev; + newNode.next = node; + if (node.prev) node.prev.next = newNode; + else this.head = newNode; + node.prev = newNode; + this.length += 1; +} + +DLL.prototype.unshift = function(node) { + if (this.head) this.insertBefore(this.head, node); + else setInitial(this, node); +}; + +DLL.prototype.push = function(node) { + if (this.tail) this.insertAfter(this.tail, node); + else setInitial(this, node); +}; + + +DLL.prototype.shift = function() { + return this.removeLink(this.head); +}; + +DLL.prototype.pop = function() { + return this.removeLink(this.tail); +}; diff --git a/lib/internal/queue.js b/lib/internal/queue.js index e57ef14..9debd64 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -1,11 +1,10 @@ import arrayEach from 'lodash/_arrayEach'; -import arrayMap from 'lodash/_arrayMap'; import isArray from 'lodash/isArray'; import noop from 'lodash/noop'; -import property from 'lodash/_baseProperty'; import onlyOnce from './onlyOnce'; import setImmediate from './setImmediate'; +import DLL from './DoublyLinkedList'; export default function queue(worker, concurrency, payload) { if (concurrency == null) { @@ -14,7 +13,8 @@ export default function queue(worker, concurrency, payload) { else if(concurrency === 0) { throw new Error('Concurrency must not be zero'); } - function _insert(q, data, pos, callback) { + + function _insert(data, pos, callback) { if (callback != null && typeof callback !== 'function') { throw new Error('task callback must be a function'); } @@ -35,19 +35,19 @@ export default function queue(worker, concurrency, payload) { }; if (pos) { - q.tasks.unshift(item); + q._tasks.unshift(item); } else { - q.tasks.push(item); + q._tasks.push(item); } }); setImmediate(q.process); } - function _next(q, tasks) { + + function _next(tasks) { return function(){ workers -= 1; - var removed = false; var args = arguments; arrayEach(tasks, function (task) { @@ -69,7 +69,7 @@ export default function queue(worker, concurrency, payload) { q.unsaturated(); } - if (q.tasks.length + workers === 0) { + if (q._tasks.length + workers === 0) { q.drain(); } q.process(); @@ -79,7 +79,7 @@ export default function queue(worker, concurrency, payload) { var workers = 0; var workersList = []; var q = { - tasks: [], + _tasks: new DLL(), concurrency: concurrency, payload: payload, saturated: noop, @@ -91,25 +91,27 @@ export default function queue(worker, concurrency, payload) { started: false, paused: false, push: function (data, callback) { - _insert(q, data, false, callback); + _insert(data, false, callback); }, kill: function () { q.drain = noop; - q.tasks = []; + q._tasks.empty(); }, unshift: function (data, callback) { - _insert(q, data, true, callback); + _insert(data, true, callback); }, process: function () { - while(!q.paused && workers < q.concurrency && q.tasks.length){ - - var tasks = q.payload ? - q.tasks.splice(0, q.payload) : - q.tasks.splice(0, q.tasks.length); - - var data = arrayMap(tasks, property('data')); + while(!q.paused && workers < q.concurrency && q._tasks.length){ + var tasks = [], data = []; + var l = q._tasks.length; + if (q.payload) l = Math.min(l, q.payload); + for (var i = 0; i < l; i++) { + var node = q._tasks.shift(); + tasks.push(node); + data.push(node.data); + } - if (q.tasks.length === 0) { + if (q._tasks.length === 0) { q.empty(); } workers += 1; @@ -119,14 +121,12 @@ export default function queue(worker, concurrency, payload) { q.saturated(); } - var cb = onlyOnce(_next(q, tasks)); + var cb = onlyOnce(_next(tasks)); worker(data, cb); - - } }, length: function () { - return q.tasks.length; + return q._tasks.length; }, running: function () { return workers; @@ -135,7 +135,7 @@ export default function queue(worker, concurrency, payload) { return workersList; }, idle: function() { - return q.tasks.length + workers === 0; + return q._tasks.length + workers === 0; }, pause: function () { q.paused = true; @@ -143,7 +143,7 @@ export default function queue(worker, concurrency, payload) { resume: function () { if (q.paused === false) { return; } q.paused = false; - var resumeCount = Math.min(q.concurrency, q.tasks.length); + var resumeCount = Math.min(q.concurrency, q._tasks.length); // Need to call q.process once per concurrent // worker to preserve full concurrency after pause for (var w = 1; w <= resumeCount; w++) { diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js index 36cacaf..02dbe64 100644 --- a/lib/priorityQueue.js +++ b/lib/priorityQueue.js @@ -30,25 +30,11 @@ import queue from './queue'; * * The `unshift` method was removed. */ export default function(worker, concurrency) { - function _compareTasks(a, b) { - return a.priority - b.priority; - } - - function _binarySearch(sequence, item, compare) { - var beg = -1, - end = sequence.length - 1; - while (beg < end) { - var mid = beg + ((end - beg + 1) >>> 1); - if (compare(item, sequence[mid]) >= 0) { - beg = mid; - } else { - end = mid - 1; - } - } - return beg; - } + // Start with a normal queue + var q = queue(worker, concurrency); - function _insert(q, data, priority, callback) { + // Override push to accept second parameter representing priority + q.push = function(data, priority, callback) { if (callback != null && typeof callback !== 'function') { throw new Error('task callback must be a function'); } @@ -62,6 +48,12 @@ export default function(worker, concurrency) { q.drain(); }); } + + var nextNode = q._tasks.head; + while (nextNode && priority >= nextNode.priority) { + nextNode = nextNode.next; + } + arrayEach(data, function(task) { var item = { data: task, @@ -69,18 +61,13 @@ export default function(worker, concurrency) { callback: typeof callback === 'function' ? callback : noop }; - q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item); - - setImmediate(q.process); + if (nextNode) { + q._tasks.insertBefore(nextNode, item); + } else { + q._tasks.push(item); + } }); - } - - // Start with a normal queue - var q = queue(worker, concurrency); - - // Override push to accept second parameter representing priority - q.push = function(data, priority, callback) { - _insert(q, data, priority, callback); + setImmediate(q.process); }; // Remove unshift function |