summaryrefslogtreecommitdiff
path: root/lib/priorityQueue.js
diff options
context:
space:
mode:
authorAlex Early <alexander.early@gmail.com>2016-07-02 16:56:05 -0700
committerGitHub <noreply@github.com>2016-07-02 16:56:05 -0700
commit9527b324dae49c92c12beca2b2630b1c36dae084 (patch)
tree86aa8eb4c3ac2c51a09b9bf49ac79e2bffc69a8a /lib/priorityQueue.js
parent7a634cc1304abb21f2e1200d47135c20a3005162 (diff)
parent81e002d132a3ca99f42040d5cd3e184172aeb624 (diff)
downloadasync-9527b324dae49c92c12beca2b2630b1c36dae084.tar.gz
Merge pull request #1205 from caolan/dll
Implment queues using DLLs
Diffstat (limited to 'lib/priorityQueue.js')
-rw-r--r--lib/priorityQueue.js45
1 files changed, 16 insertions, 29 deletions
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js
index 9ea24bc..e895d8b 100644
--- a/lib/priorityQueue.js
+++ b/lib/priorityQueue.js
@@ -31,25 +31,11 @@ import queue from './queue';
* * The `unshift` method was removed.
*/
export default function(worker, concurrency) {
- function _compareTasks(a, b) {
- return a.priority - b.priority;
- }
-
- function _binarySearch(sequence, item, compare) {
- var beg = -1,
- end = sequence.length - 1;
- while (beg < end) {
- var mid = beg + ((end - beg + 1) >>> 1);
- if (compare(item, sequence[mid]) >= 0) {
- beg = mid;
- } else {
- end = mid - 1;
- }
- }
- return beg;
- }
+ // Start with a normal queue
+ var q = queue(worker, concurrency);
- function _insert(q, data, priority, callback) {
+ // Override push to accept second parameter representing priority
+ q.push = function(data, priority, callback) {
if (callback != null && typeof callback !== 'function') {
throw new Error('task callback must be a function');
}
@@ -63,6 +49,12 @@ export default function(worker, concurrency) {
q.drain();
});
}
+
+ var nextNode = q._tasks.head;
+ while (nextNode && priority >= nextNode.priority) {
+ nextNode = nextNode.next;
+ }
+
arrayEach(data, function(task) {
var item = {
data: task,
@@ -70,18 +62,13 @@ export default function(worker, concurrency) {
callback: typeof callback === 'function' ? callback : noop
};
- q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);
-
- setImmediate(q.process);
+ if (nextNode) {
+ q._tasks.insertBefore(nextNode, item);
+ } else {
+ q._tasks.push(item);
+ }
});
- }
-
- // Start with a normal queue
- var q = queue(worker, concurrency);
-
- // Override push to accept second parameter representing priority
- q.push = function(data, priority, callback) {
- _insert(q, data, priority, callback);
+ setImmediate(q.process);
};
// Remove unshift function