summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authoryemreinci <18687880+yemreinci@users.noreply.github.com>2018-11-21 07:28:32 +0300
committerAlex Early <alexander.early@gmail.com>2018-11-20 20:28:32 -0800
commita5a9f13ee024e58a3bd074f9c4e0130f0499d27b (patch)
treef88fbadbf2b5a6d0574c19d531b94e520cea3777
parent884b040c22bb27f69d9136bf4391a79a16911c12 (diff)
downloadasync-a5a9f13ee024e58a3bd074f9c4e0130f0499d27b.tar.gz
feat: Use heap tree in priority queue (#1595)
* use heap tree in priority queue * small refactor
-rw-r--r--lib/internal/Heap.js115
-rw-r--r--lib/priorityQueue.js15
-rw-r--r--perf/suites.js31
-rw-r--r--test/heap.js124
4 files changed, 275 insertions, 10 deletions
diff --git a/lib/internal/Heap.js b/lib/internal/Heap.js
new file mode 100644
index 0000000..d9ccacf
--- /dev/null
+++ b/lib/internal/Heap.js
@@ -0,0 +1,115 @@
+// Binary min-heap implementation used for priority queue.
+// Implementation is stable, i.e. push time is considered for equal priorities
+export default class Heap {
+ constructor() {
+ this.heap = [];
+ this.pushCount = Number.MIN_SAFE_INTEGER;
+ }
+
+ get length() {
+ return this.heap.length;
+ }
+
+ empty () {
+ this.heap = [];
+ return this;
+ }
+
+ percUp(index) {
+ let p;
+
+ while (index > 0 && smaller(this.heap[index], this.heap[p=parent(index)])) {
+ let t = this.heap[index];
+ this.heap[index] = this.heap[p];
+ this.heap[p] = t;
+
+ index = p;
+ }
+ }
+
+ percDown(index) {
+ let l;
+
+ while ((l=leftChi(index)) < this.heap.length) {
+ if (l+1 < this.heap.length && smaller(this.heap[l+1], this.heap[l])) {
+ l = l+1;
+ }
+
+ if (smaller(this.heap[index], this.heap[l])) {
+ break;
+ }
+
+ let t = this.heap[index];
+ this.heap[index] = this.heap[l];
+ this.heap[l] = t;
+
+ index = l;
+ }
+ }
+
+ push(node) {
+ node.pushCount = ++this.pushCount;
+ this.heap.push(node);
+ this.percUp(this.heap.length-1);
+ }
+
+ unshift(node) {
+ return this.heap.push(node);
+ }
+
+ shift() {
+ let [top] = this.heap;
+
+ this.heap[0] = this.heap[this.heap.length-1];
+ this.heap.pop();
+ this.percDown(0);
+
+ return top;
+ }
+
+ toArray() {
+ return [...this];
+ }
+
+ *[Symbol.iterator] () {
+ for (let i = 0; i < this.heap.length; i++) {
+ yield this.heap[i].data;
+ }
+ }
+
+ remove (testFn) {
+ let j = 0;
+ for (let i = 0; i < this.heap.length; i++) {
+ if (!testFn(this.heap[i])) {
+ this.heap[j] = this.heap[i];
+ j++;
+ }
+ }
+
+ this.heap.splice(j);
+
+ for (let i = parent(this.heap.length-1); i >= 0; i--) {
+ this.percDown(i);
+ }
+
+ return this;
+ }
+}
+
+function leftChi(i) {
+ return (i<<1)+1;
+}
+
+function parent(i) {
+ return ((i+1)>>1)-1;
+}
+
+function smaller(x, y) {
+ if (x.priority !== y.priority) {
+ return x.priority < y.priority;
+ }
+ else {
+ return x.pushCount < y.pushCount;
+ }
+}
+
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js
index 4eaea38..ce05805 100644
--- a/lib/priorityQueue.js
+++ b/lib/priorityQueue.js
@@ -1,5 +1,6 @@
import setImmediate from './setImmediate';
import queue from './queue';
+import Heap from './internal/Heap';
/**
* The same as [async.queue]{@link module:ControlFlow.queue} only tasks are assigned a priority and
@@ -28,6 +29,8 @@ export default function(worker, concurrency) {
// Start with a normal queue
var q = queue(worker, concurrency);
+ q._tasks = new Heap();
+
// Override push to accept second parameter representing priority
q.push = function(data, priority = 0, callback = () => {}) {
if (typeof callback !== 'function') {
@@ -42,11 +45,6 @@ export default function(worker, concurrency) {
return setImmediate(() => q.drain());
}
- var nextNode = q._tasks.head;
- while (nextNode && priority >= nextNode.priority) {
- nextNode = nextNode.next;
- }
-
for (var i = 0, l = data.length; i < l; i++) {
var item = {
data: data[i],
@@ -54,12 +52,9 @@ export default function(worker, concurrency) {
callback
};
- if (nextNode) {
- q._tasks.insertBefore(nextNode, item);
- } else {
- q._tasks.push(item);
- }
+ q._tasks.push(item);
}
+
setImmediate(q.process);
};
diff --git a/perf/suites.js b/perf/suites.js
index d3cb4e9..1171939 100644
--- a/perf/suites.js
+++ b/perf/suites.js
@@ -303,6 +303,37 @@ module.exports = [{
}
}
}, {
+ name: "priorityQueue",
+ args: [
+ [10],
+ [100],
+ [1000],
+ [30000],
+ [50000]
+ ],
+ setup: function setup(num) {
+ tasks = num;
+ },
+ fn(async, done) {
+ var numEntries = tasks;
+ var q = async.priorityQueue(worker, 1);
+ for (var i = 1; i <= numEntries; i++) {
+ q.push({
+ num: i
+ }, i);
+ }
+
+ var completedCnt = 0;
+
+ function worker(task, callback) {
+ completedCnt++;
+ if (completedCnt === numEntries) {
+ return done();
+ }
+ setImmediate(callback);
+ }
+ }
+}, {
name: "some - no short circuit- false",
// args lists are passed to the setup function
args: [
diff --git a/test/heap.js b/test/heap.js
new file mode 100644
index 0000000..a048639
--- /dev/null
+++ b/test/heap.js
@@ -0,0 +1,124 @@
+var Heap = require('../lib/internal/Heap').default;
+var {expect} = require('chai');
+
+describe('Heap', () => {
+ it('push', () => {
+ var heap = new Heap();
+
+ expect(heap.length).to.eql(0);
+
+ heap.push({priority: 1, data: 'foo1'});
+ heap.push({priority: 2, data: 'foo2'});
+ heap.push({priority: 9, data: 'foo3'});
+ heap.push({priority: 2, data: 'foo4'});
+ heap.push({priority: 2, data: 'foo5'});
+ heap.push({priority: 5, data: 'foo6'});
+ heap.push({priority: -5, data: 'foo7'});
+ heap.push({priority: 1, data: 'foo8'});
+
+ expect(heap.length).to.eql(8);
+
+ expect(heap.shift().data).to.eql('foo7');
+ expect(heap.shift().data).to.eql('foo1');
+
+ heap.push({priority: -10, data: 'foo9'});
+ heap.push({priority: 12, data: 'foo10'});
+
+ expect(heap.shift().data).to.eql('foo9');
+ expect(heap.shift().data).to.eql('foo8');
+ expect(heap.shift().data).to.eql('foo2');
+ expect(heap.shift().data).to.eql('foo4');
+ expect(heap.shift().data).to.eql('foo5');
+
+ heap.push({priority: -1, data: 'foo11'});
+ heap.push({priority: 7, data: 'foo12'});
+
+ expect(heap.shift().data).to.eql('foo11');
+ expect(heap.shift().data).to.eql('foo6');
+ expect(heap.shift().data).to.eql('foo12');
+ expect(heap.shift().data).to.eql('foo3');
+ expect(heap.shift().data).to.eql('foo10');
+
+ expect(heap.length).to.eql(0);
+ });
+
+ it('toArray', () => {
+ var heap = new Heap();
+ expect(heap.toArray()).to.eql([]);
+
+ for (var i = 0; i < 5; i++) {
+ heap.push({data: i});
+ }
+
+ expect(heap.toArray()).to.eql([0, 1, 2, 3, 4]);
+ });
+
+ it('remove', () => {
+ var heap = new Heap();
+
+ for (var i = 0; i < 5; i++) {
+ heap.push({data: i});
+ }
+
+ heap.remove((node) => {
+ return node.data === 3;
+ })
+
+ expect(heap.toArray()).to.eql([0, 1, 2, 4]);
+ });
+
+ it('remove (head)', () => {
+ var heap = new Heap();
+
+ for (var i = 0; i < 5; i++) {
+ heap.push({data: i});
+ }
+
+ heap.remove((node) => {
+ return node.data === 0;
+ })
+
+ expect(heap.toArray()).to.eql([1, 2, 3, 4]);
+ });
+
+ it('remove (tail)', () => {
+ var heap = new Heap();
+
+ for (var i = 0; i < 5; i++) {
+ heap.push({data: i});
+ }
+
+ heap.remove((node) => {
+ return node.data === 4;
+ })
+
+ expect(heap.toArray()).to.eql([0, 1, 2, 3]);
+ });
+
+ it('remove (all)', () => {
+ var heap = new Heap();
+
+ for (var i = 0; i < 5; i++) {
+ heap.push({data: i});
+ }
+
+ heap.remove((node) => {
+ return node.data < 5;
+ })
+
+ expect(heap.toArray()).to.eql([]);
+ });
+
+ it('empty', () => {
+ var heap = new Heap();
+
+ for (var i = 0; i < 5; i++) {
+ heap.push({data: i});
+ }
+
+ var empty = heap.empty();
+
+ expect(heap).to.equal(empty);
+ expect(heap.toArray()).to.eql([]);
+ });
+});