summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGraeme Yeates <yeatesgraeme@gmail.com>2016-06-30 13:44:52 -0400
committerGraeme Yeates <yeatesgraeme@gmail.com>2016-06-30 13:44:52 -0400
commit592f95cf1c90d71dfdb68f4e4c9ca0cdd30a91bc (patch)
tree839bddf39bec6be2f3b9a59969f4e3d83aa6f297
parentb7081631c373bf459f1108824f870c62328ed8d6 (diff)
downloadasync-592f95cf1c90d71dfdb68f4e4c9ca0cdd30a91bc.tar.gz
Implment queues using DLLs
-rw-r--r--lib/auto.js2
-rw-r--r--lib/internal/DoublyLinkedList.js64
-rw-r--r--lib/internal/queue.js52
-rw-r--r--lib/priorityQueue.js45
-rw-r--r--mocha_test/queue.js2
5 files changed, 108 insertions, 57 deletions
diff --git a/lib/auto.js b/lib/auto.js
index bb8ee87..70a8067 100644
--- a/lib/auto.js
+++ b/lib/auto.js
@@ -4,9 +4,9 @@ import indexOf from 'lodash/_baseIndexOf';
import isArray from 'lodash/isArray';
import okeys from 'lodash/keys';
import noop from 'lodash/noop';
-import once from './internal/once';
import rest from 'lodash/rest';
+import once from './internal/once';
import onlyOnce from './internal/onlyOnce';
/**
diff --git a/lib/internal/DoublyLinkedList.js b/lib/internal/DoublyLinkedList.js
new file mode 100644
index 0000000..ae46957
--- /dev/null
+++ b/lib/internal/DoublyLinkedList.js
@@ -0,0 +1,64 @@
+// Simple doubly linked list (https://en.wikipedia.org/wiki/Doubly_linked_list) implementation
+// used for queues. This implementation assumes that the node provided by the user can be modified
+// to adjust the next and last properties. We implement only the minimal functionality
+// for queue support.
+export default function DLL() {
+ this.head = this.tail = null;
+ this.length = 0;
+}
+
+function setInitial(dll, node) {
+ dll.length = 1;
+ dll.head = dll.tail = node;
+}
+
+DLL.prototype.removeLink = function(node) {
+ if (!node) return node;
+ if (node.prev) node.prev.next = node.next;
+ else this.head = node.next
+ if (node.next) node.next.prev = node.prev;
+ else this.tail = node.prev;
+
+ node.prev = node.next = null;
+ this.length -= 1;
+ return node;
+}
+
+DLL.prototype.empty = DLL;
+
+DLL.prototype.insertAfter = function(node, newNode) {
+ newNode.prev = node;
+ newNode.next = node.next;
+ if (node.next) node.next.prev = newNode;
+ else this.tail = newNode;
+ node.next = newNode;
+ this.length += 1;
+}
+
+DLL.prototype.insertBefore = function(node, newNode) {
+ newNode.prev = node.prev;
+ newNode.next = node;
+ if (node.prev) node.prev.next = newNode;
+ else this.head = newNode;
+ node.prev = newNode;
+ this.length += 1;
+}
+
+DLL.prototype.unshift = function(node) {
+ if (this.head) this.insertBefore(this.head, node);
+ else setInitial(this, node);
+};
+
+DLL.prototype.push = function(node) {
+ if (this.tail) this.insertAfter(this.tail, node);
+ else setInitial(this, node);
+};
+
+
+DLL.prototype.shift = function() {
+ return this.removeLink(this.head);
+};
+
+DLL.prototype.pop = function() {
+ return this.removeLink(this.tail);
+};
diff --git a/lib/internal/queue.js b/lib/internal/queue.js
index e57ef14..9debd64 100644
--- a/lib/internal/queue.js
+++ b/lib/internal/queue.js
@@ -1,11 +1,10 @@
import arrayEach from 'lodash/_arrayEach';
-import arrayMap from 'lodash/_arrayMap';
import isArray from 'lodash/isArray';
import noop from 'lodash/noop';
-import property from 'lodash/_baseProperty';
import onlyOnce from './onlyOnce';
import setImmediate from './setImmediate';
+import DLL from './DoublyLinkedList';
export default function queue(worker, concurrency, payload) {
if (concurrency == null) {
@@ -14,7 +13,8 @@ export default function queue(worker, concurrency, payload) {
else if(concurrency === 0) {
throw new Error('Concurrency must not be zero');
}
- function _insert(q, data, pos, callback) {
+
+ function _insert(data, pos, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
@@ -35,19 +35,19 @@ export default function queue(worker, concurrency, payload) {
};
if (pos) {
- q.tasks.unshift(item);
+ q._tasks.unshift(item);
} else {
- q.tasks.push(item);
+ q._tasks.push(item);
}
});
setImmediate(q.process);
}
- function _next(q, tasks) {
+
+ function _next(tasks) {
return function(){
workers -= 1;
-
var removed = false;
var args = arguments;
arrayEach(tasks, function (task) {
@@ -69,7 +69,7 @@ export default function queue(worker, concurrency, payload) {
q.unsaturated();
}
- if (q.tasks.length + workers === 0) {
+ if (q._tasks.length + workers === 0) {
q.drain();
}
q.process();
@@ -79,7 +79,7 @@ export default function queue(worker, concurrency, payload) {
var workers = 0;
var workersList = [];
var q = {
- tasks: [],
+ _tasks: new DLL(),
concurrency: concurrency,
payload: payload,
saturated: noop,
@@ -91,25 +91,27 @@ export default function queue(worker, concurrency, payload) {
started: false,
paused: false,
push: function (data, callback) {
- _insert(q, data, false, callback);
+ _insert(data, false, callback);
},
kill: function () {
q.drain = noop;
- q.tasks = [];
+ q._tasks.empty();
},
unshift: function (data, callback) {
- _insert(q, data, true, callback);
+ _insert(data, true, callback);
},
process: function () {
- while(!q.paused && workers < q.concurrency && q.tasks.length){
-
- var tasks = q.payload ?
- q.tasks.splice(0, q.payload) :
- q.tasks.splice(0, q.tasks.length);
-
- var data = arrayMap(tasks, property('data'));
+ while(!q.paused && workers < q.concurrency && q._tasks.length){
+ var tasks = [], data = [];
+ var l = q._tasks.length;
+ if (q.payload) l = Math.min(l, q.payload);
+ for (var i = 0; i < l; i++) {
+ var node = q._tasks.shift();
+ tasks.push(node);
+ data.push(node.data);
+ }
- if (q.tasks.length === 0) {
+ if (q._tasks.length === 0) {
q.empty();
}
workers += 1;
@@ -119,14 +121,12 @@ export default function queue(worker, concurrency, payload) {
q.saturated();
}
- var cb = onlyOnce(_next(q, tasks));
+ var cb = onlyOnce(_next(tasks));
worker(data, cb);
-
-
}
},
length: function () {
- return q.tasks.length;
+ return q._tasks.length;
},
running: function () {
return workers;
@@ -135,7 +135,7 @@ export default function queue(worker, concurrency, payload) {
return workersList;
},
idle: function() {
- return q.tasks.length + workers === 0;
+ return q._tasks.length + workers === 0;
},
pause: function () {
q.paused = true;
@@ -143,7 +143,7 @@ export default function queue(worker, concurrency, payload) {
resume: function () {
if (q.paused === false) { return; }
q.paused = false;
- var resumeCount = Math.min(q.concurrency, q.tasks.length);
+ var resumeCount = Math.min(q.concurrency, q._tasks.length);
// Need to call q.process once per concurrent
// worker to preserve full concurrency after pause
for (var w = 1; w <= resumeCount; w++) {
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js
index 36cacaf..02dbe64 100644
--- a/lib/priorityQueue.js
+++ b/lib/priorityQueue.js
@@ -30,25 +30,11 @@ import queue from './queue';
* * The `unshift` method was removed.
*/
export default function(worker, concurrency) {
- function _compareTasks(a, b) {
- return a.priority - b.priority;
- }
-
- function _binarySearch(sequence, item, compare) {
- var beg = -1,
- end = sequence.length - 1;
- while (beg < end) {
- var mid = beg + ((end - beg + 1) >>> 1);
- if (compare(item, sequence[mid]) >= 0) {
- beg = mid;
- } else {
- end = mid - 1;
- }
- }
- return beg;
- }
+ // Start with a normal queue
+ var q = queue(worker, concurrency);
- function _insert(q, data, priority, callback) {
+ // Override push to accept second parameter representing priority
+ q.push = function(data, priority, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
@@ -62,6 +48,12 @@ export default function(worker, concurrency) {
q.drain();
});
}
+
+ var nextNode = q._tasks.head;
+ while (nextNode && priority >= nextNode.priority) {
+ nextNode = nextNode.next;
+ }
+
arrayEach(data, function(task) {
var item = {
data: task,
@@ -69,18 +61,13 @@ export default function(worker, concurrency) {
callback: typeof callback === 'function' ? callback : noop
};
- q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);
-
- setImmediate(q.process);
+ if (nextNode) {
+ q._tasks.insertBefore(nextNode, item);
+ } else {
+ q._tasks.push(item);
+ }
});
- }
-
- // Start with a normal queue
- var q = queue(worker, concurrency);
-
- // Override push to accept second parameter representing priority
- q.push = function(data, priority, callback) {
- _insert(q, data, priority, callback);
+ setImmediate(q.process);
};
// Remove unshift function
diff --git a/mocha_test/queue.js b/mocha_test/queue.js
index d514ce7..e4a29e4 100644
--- a/mocha_test/queue.js
+++ b/mocha_test/queue.js
@@ -496,7 +496,7 @@ describe('queue', function(){
}, 5);
setTimeout(function () {
- expect(q.tasks.length).to.equal(1);
+ expect(q._tasks.length).to.equal(1);
expect(q.running()).to.equal(2);
q.resume();
}, 15);