diff options
author | yemreinci <18687880+yemreinci@users.noreply.github.com> | 2018-11-21 07:28:32 +0300 |
---|---|---|
committer | Alex Early <alexander.early@gmail.com> | 2018-11-20 20:28:32 -0800 |
commit | a5a9f13ee024e58a3bd074f9c4e0130f0499d27b (patch) | |
tree | f88fbadbf2b5a6d0574c19d531b94e520cea3777 | |
parent | 884b040c22bb27f69d9136bf4391a79a16911c12 (diff) | |
download | async-a5a9f13ee024e58a3bd074f9c4e0130f0499d27b.tar.gz |
feat: Use heap tree in priority queue (#1595)
* use heap tree in priority queue
* small refactor
-rw-r--r-- | lib/internal/Heap.js | 115 | ||||
-rw-r--r-- | lib/priorityQueue.js | 15 | ||||
-rw-r--r-- | perf/suites.js | 31 | ||||
-rw-r--r-- | test/heap.js | 124 |
4 files changed, 275 insertions, 10 deletions
diff --git a/lib/internal/Heap.js b/lib/internal/Heap.js new file mode 100644 index 0000000..d9ccacf --- /dev/null +++ b/lib/internal/Heap.js @@ -0,0 +1,115 @@ +// Binary min-heap implementation used for priority queue. +// Implementation is stable, i.e. push time is considered for equal priorities +export default class Heap { + constructor() { + this.heap = []; + this.pushCount = Number.MIN_SAFE_INTEGER; + } + + get length() { + return this.heap.length; + } + + empty () { + this.heap = []; + return this; + } + + percUp(index) { + let p; + + while (index > 0 && smaller(this.heap[index], this.heap[p=parent(index)])) { + let t = this.heap[index]; + this.heap[index] = this.heap[p]; + this.heap[p] = t; + + index = p; + } + } + + percDown(index) { + let l; + + while ((l=leftChi(index)) < this.heap.length) { + if (l+1 < this.heap.length && smaller(this.heap[l+1], this.heap[l])) { + l = l+1; + } + + if (smaller(this.heap[index], this.heap[l])) { + break; + } + + let t = this.heap[index]; + this.heap[index] = this.heap[l]; + this.heap[l] = t; + + index = l; + } + } + + push(node) { + node.pushCount = ++this.pushCount; + this.heap.push(node); + this.percUp(this.heap.length-1); + } + + unshift(node) { + return this.heap.push(node); + } + + shift() { + let [top] = this.heap; + + this.heap[0] = this.heap[this.heap.length-1]; + this.heap.pop(); + this.percDown(0); + + return top; + } + + toArray() { + return [...this]; + } + + *[Symbol.iterator] () { + for (let i = 0; i < this.heap.length; i++) { + yield this.heap[i].data; + } + } + + remove (testFn) { + let j = 0; + for (let i = 0; i < this.heap.length; i++) { + if (!testFn(this.heap[i])) { + this.heap[j] = this.heap[i]; + j++; + } + } + + this.heap.splice(j); + + for (let i = parent(this.heap.length-1); i >= 0; i--) { + this.percDown(i); + } + + return this; + } +} + +function leftChi(i) { + return (i<<1)+1; +} + +function parent(i) { + return ((i+1)>>1)-1; +} + +function smaller(x, y) { + if (x.priority !== y.priority) { + return x.priority < y.priority; + } + else { + return x.pushCount < y.pushCount; + } +} + diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js index 4eaea38..ce05805 100644 --- a/lib/priorityQueue.js +++ b/lib/priorityQueue.js @@ -1,5 +1,6 @@ import setImmediate from './setImmediate'; import queue from './queue'; +import Heap from './internal/Heap'; /** * The same as [async.queue]{@link module:ControlFlow.queue} only tasks are assigned a priority and @@ -28,6 +29,8 @@ export default function(worker, concurrency) { // Start with a normal queue var q = queue(worker, concurrency); + q._tasks = new Heap(); + // Override push to accept second parameter representing priority q.push = function(data, priority = 0, callback = () => {}) { if (typeof callback !== 'function') { @@ -42,11 +45,6 @@ export default function(worker, concurrency) { return setImmediate(() => q.drain()); } - var nextNode = q._tasks.head; - while (nextNode && priority >= nextNode.priority) { - nextNode = nextNode.next; - } - for (var i = 0, l = data.length; i < l; i++) { var item = { data: data[i], @@ -54,12 +52,9 @@ export default function(worker, concurrency) { callback }; - if (nextNode) { - q._tasks.insertBefore(nextNode, item); - } else { - q._tasks.push(item); - } + q._tasks.push(item); } + setImmediate(q.process); }; diff --git a/perf/suites.js b/perf/suites.js index d3cb4e9..1171939 100644 --- a/perf/suites.js +++ b/perf/suites.js @@ -303,6 +303,37 @@ module.exports = [{ } } }, { + name: "priorityQueue", + args: [ + [10], + [100], + [1000], + [30000], + [50000] + ], + setup: function setup(num) { + tasks = num; + }, + fn(async, done) { + var numEntries = tasks; + var q = async.priorityQueue(worker, 1); + for (var i = 1; i <= numEntries; i++) { + q.push({ + num: i + }, i); + } + + var completedCnt = 0; + + function worker(task, callback) { + completedCnt++; + if (completedCnt === numEntries) { + return done(); + } + setImmediate(callback); + } + } +}, { name: "some - no short circuit- false", // args lists are passed to the setup function args: [ diff --git a/test/heap.js b/test/heap.js new file mode 100644 index 0000000..a048639 --- /dev/null +++ b/test/heap.js @@ -0,0 +1,124 @@ +var Heap = require('../lib/internal/Heap').default; +var {expect} = require('chai'); + +describe('Heap', () => { + it('push', () => { + var heap = new Heap(); + + expect(heap.length).to.eql(0); + + heap.push({priority: 1, data: 'foo1'}); + heap.push({priority: 2, data: 'foo2'}); + heap.push({priority: 9, data: 'foo3'}); + heap.push({priority: 2, data: 'foo4'}); + heap.push({priority: 2, data: 'foo5'}); + heap.push({priority: 5, data: 'foo6'}); + heap.push({priority: -5, data: 'foo7'}); + heap.push({priority: 1, data: 'foo8'}); + + expect(heap.length).to.eql(8); + + expect(heap.shift().data).to.eql('foo7'); + expect(heap.shift().data).to.eql('foo1'); + + heap.push({priority: -10, data: 'foo9'}); + heap.push({priority: 12, data: 'foo10'}); + + expect(heap.shift().data).to.eql('foo9'); + expect(heap.shift().data).to.eql('foo8'); + expect(heap.shift().data).to.eql('foo2'); + expect(heap.shift().data).to.eql('foo4'); + expect(heap.shift().data).to.eql('foo5'); + + heap.push({priority: -1, data: 'foo11'}); + heap.push({priority: 7, data: 'foo12'}); + + expect(heap.shift().data).to.eql('foo11'); + expect(heap.shift().data).to.eql('foo6'); + expect(heap.shift().data).to.eql('foo12'); + expect(heap.shift().data).to.eql('foo3'); + expect(heap.shift().data).to.eql('foo10'); + + expect(heap.length).to.eql(0); + }); + + it('toArray', () => { + var heap = new Heap(); + expect(heap.toArray()).to.eql([]); + + for (var i = 0; i < 5; i++) { + heap.push({data: i}); + } + + expect(heap.toArray()).to.eql([0, 1, 2, 3, 4]); + }); + + it('remove', () => { + var heap = new Heap(); + + for (var i = 0; i < 5; i++) { + heap.push({data: i}); + } + + heap.remove((node) => { + return node.data === 3; + }) + + expect(heap.toArray()).to.eql([0, 1, 2, 4]); + }); + + it('remove (head)', () => { + var heap = new Heap(); + + for (var i = 0; i < 5; i++) { + heap.push({data: i}); + } + + heap.remove((node) => { + return node.data === 0; + }) + + expect(heap.toArray()).to.eql([1, 2, 3, 4]); + }); + + it('remove (tail)', () => { + var heap = new Heap(); + + for (var i = 0; i < 5; i++) { + heap.push({data: i}); + } + + heap.remove((node) => { + return node.data === 4; + }) + + expect(heap.toArray()).to.eql([0, 1, 2, 3]); + }); + + it('remove (all)', () => { + var heap = new Heap(); + + for (var i = 0; i < 5; i++) { + heap.push({data: i}); + } + + heap.remove((node) => { + return node.data < 5; + }) + + expect(heap.toArray()).to.eql([]); + }); + + it('empty', () => { + var heap = new Heap(); + + for (var i = 0; i < 5; i++) { + heap.push({data: i}); + } + + var empty = heap.empty(); + + expect(heap).to.equal(empty); + expect(heap.toArray()).to.eql([]); + }); +}); |