summaryrefslogtreecommitdiff
path: root/test/priorityQueue.js
diff options
context:
space:
mode:
Diffstat (limited to 'test/priorityQueue.js')
-rw-r--r--test/priorityQueue.js206
1 files changed, 197 insertions, 9 deletions
diff --git a/test/priorityQueue.js b/test/priorityQueue.js
index 564a0d6..40f7fe6 100644
--- a/test/priorityQueue.js
+++ b/test/priorityQueue.js
@@ -16,13 +16,13 @@ describe('priorityQueue', () => {
q.push(1, 1.4, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
- expect(q.length()).to.equal(2);
+ expect(q.length()).to.equal(3);
call_order.push('callback ' + 1);
});
q.push(2, 0.2, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
- expect(q.length()).to.equal(3);
+ expect(q.length()).to.equal(4);
call_order.push('callback ' + 2);
});
q.push(3, 3.8, (err, arg) => {
@@ -31,26 +31,25 @@ describe('priorityQueue', () => {
expect(q.length()).to.equal(0);
call_order.push('callback ' + 3);
});
- q.push(4, 2.9, (err, arg) => {
+ q.push(['arr', 'arr'], 2.9, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
- expect(q.length()).to.equal(1);
- call_order.push('callback ' + 4);
+ call_order.push('callback arr');
});
- expect(q.length()).to.equal(4);
+ expect(q.length()).to.equal(5);
expect(q.concurrency).to.equal(1);
q.drain(() => {
expect(call_order).to.eql([
'process 2', 'callback 2',
'process 1', 'callback 1',
- 'process 4', 'callback 4',
+ 'process arr', 'callback arr',
+ 'process arr', 'callback arr',
'process 3', 'callback 3'
]);
expect(q.concurrency).to.equal(1);
expect(q.length()).to.equal(0);
- q.push([])
- expect(q.length()).to.equal(0)
+ expect(q.idle()).to.be.equal(true);
done()
});
try {
@@ -79,27 +78,32 @@ describe('priorityQueue', () => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(2);
+ expect(q.running()).to.equal(1);
call_order.push('callback ' + 1);
});
q.push(2, 0.2, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(1);
+ expect(q.running()).to.equal(1);
call_order.push('callback ' + 2);
});
q.push(3, 3.8, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(1);
call_order.push('callback ' + 3);
});
q.push(4, 2.9, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(0);
call_order.push('callback ' + 4);
});
expect(q.length()).to.equal(4);
+ expect(q.running()).to.equal(0);
expect(q.concurrency).to.equal(2);
q.drain(() => {
@@ -111,10 +115,30 @@ describe('priorityQueue', () => {
]);
expect(q.concurrency).to.equal(2);
expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(0);
done();
});
});
+ it('pushAsync', done => {
+ const calls = [];
+ const q = async.priorityQueue((task, cb) => {
+ if (task === 2) return cb(new Error('fail'));
+ cb();
+ })
+
+ q.pushAsync(1, 1, () => { throw new Error('should not be called') }).then(() => calls.push(1));
+ q.pushAsync(2, 0).catch(err => {
+ expect(err.message).to.equal('fail');
+ calls.push(2);
+ });
+ q.pushAsync([3, 4], 0).map(p => p.then(() => calls.push('arr')));
+ q.drain(() => setTimeout(() => {
+ expect(calls).to.eql([2, 'arr', 'arr', 1]);
+ done();
+ }));
+ });
+
it('pause in worker with concurrency', (done) => {
var call_order = [];
var q = async.priorityQueue((task, callback) => {
@@ -144,6 +168,104 @@ describe('priorityQueue', () => {
});
});
+ it('kill', (done) => {
+ var q = async.priorityQueue((/*task, callback*/) => {
+ setTimeout(() => {
+ throw new Error("Function should never be called");
+ }, 20);
+ }, 1);
+ q.drain(() => {
+ throw new Error("Function should never be called");
+ });
+
+ q.push(0);
+
+ q.kill();
+
+ setTimeout(() => {
+ expect(q.length()).to.equal(0);
+ done();
+ }, 40);
+ });
+
+ context('q.workersList():', () => {
+ it('should be the same length as running()', (done) => {
+ var q = async.priorityQueue((task, cb) => {
+ async.setImmediate(() => {
+ expect(q.workersList().length).to.equal(q.running());
+ cb();
+ });
+ }, 2);
+
+ q.drain(() => {
+ expect(q.workersList().length).to.equal(0);
+ expect(q.running()).to.equal(0);
+ done();
+ });
+
+ q.push('foo', 2);
+ q.push('bar', 1);
+ q.push('baz', 0);
+ });
+
+ it('should contain the items being processed', (done) => {
+ var itemsBeingProcessed = {
+ 'foo': [
+ {data: 'bar', priority: 1},
+ {data: 'foo', priority: 2}
+ ],
+ 'foo_cb': [
+ {data: 'foo', priority: 2}
+ ],
+ 'bar': [
+ {data: 'baz', priority: 0},
+ {data: 'bar', priority: 1}
+ ],
+ 'bar_cb': [
+ {data: 'bar', priority: 1},
+ {data: 'foo', priority: 2}
+ ],
+ 'baz': [
+ {data: 'baz', priority: 0}
+ ],
+ 'baz_cb': [
+ {data: 'baz', priority: 0},
+ {data: 'bar', priority: 1}
+ ]
+ };
+
+ function getWorkersListData(q) {
+ return q.workersList().map(({data, priority}) => {
+ return {data, priority};
+ });
+ }
+
+ var q = async.priorityQueue((task, cb) => {
+ expect(
+ getWorkersListData(q)
+ ).to.eql(itemsBeingProcessed[task]);
+ expect(q.workersList().length).to.equal(q.running());
+ async.setImmediate(() => {
+ expect(
+ getWorkersListData(q)
+ ).to.eql(itemsBeingProcessed[task+'_cb']);
+ expect(q.workersList().length).to.equal(q.running());
+ cb();
+ });
+ }, 2);
+
+ q.drain(() => {
+ expect(q.workersList()).to.eql([]);
+ expect(q.workersList().length).to.equal(q.running());
+ done();
+ });
+
+ q.push('foo', 2);
+ q.push('bar', 1);
+ q.push('baz', 0);
+ });
+ });
+
context('q.saturated(): ', () => {
it('should call the saturated callback if tasks length is concurrency', (done) => {
var calls = [];
@@ -247,6 +369,27 @@ describe('priorityQueue', () => {
});
});
+ it('should call the drain callback if receives an empty push', (done) => {
+ var call_order = [];
+
+ var q = async.priorityQueue((task, callback) => {
+ call_order.push(task);
+ callback('error', 'arg');
+ }, 1);
+
+ q.drain(() => {
+ call_order.push('drain')
+ expect(call_order).to.eql([
+ 'drain'
+ ]);
+ expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(0);
+ done();
+ });
+
+ q.push([], 1, () => { throw new Error('should not be called') });
+ });
+
it('should not call the drain callback if receives empty push and tasks are still pending', (done) => {
var call_order = [];
@@ -282,4 +425,49 @@ describe('priorityQueue', () => {
q.push([], 1, () => {});
});
+
+ it('should be iterable', (done) => {
+ var q = async.priorityQueue((data, cb) => {
+ if (data === 3) {
+ q.push(6)
+ expect([...q]).to.eql([4, 5, 6]);
+ }
+ async.setImmediate(cb);
+ });
+
+ q.push([1, 2, 3, 4, 5]);
+
+ expect([...q]).to.eql([1, 2, 3, 4, 5]);
+
+ q.drain(() => {
+ expect([...q]).to.eql([]);
+ done();
+ });
+ });
+
+ it('should error when calling unshift', () => {
+ var q = async.priorityQueue(() => {});
+ expect(() => {
+ q.unshift(1);
+ }).to.throw();
+ });
+
+ it('should error when calling unshiftAsync', () => {
+ var q = async.priorityQueue(() => {});
+ expect(() => {
+ q.unshiftAsync(1);
+ }).to.throw();
+ });
+
+ it('should error when the callback is called more than once', (done) => {
+ var q = async.priorityQueue((task, callback) => {
+ callback();
+ expect(() => {
+ callback();
+ }).to.throw();
+ done();
+ }, 2);
+
+ q.push(1);
+ });
});