diff options
Diffstat (limited to 'test/priorityQueue.js')
-rw-r--r-- | test/priorityQueue.js | 206 |
1 files changed, 197 insertions, 9 deletions
diff --git a/test/priorityQueue.js b/test/priorityQueue.js index 564a0d6..40f7fe6 100644 --- a/test/priorityQueue.js +++ b/test/priorityQueue.js @@ -16,13 +16,13 @@ describe('priorityQueue', () => { q.push(1, 1.4, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); - expect(q.length()).to.equal(2); + expect(q.length()).to.equal(3); call_order.push('callback ' + 1); }); q.push(2, 0.2, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); - expect(q.length()).to.equal(3); + expect(q.length()).to.equal(4); call_order.push('callback ' + 2); }); q.push(3, 3.8, (err, arg) => { @@ -31,26 +31,25 @@ describe('priorityQueue', () => { expect(q.length()).to.equal(0); call_order.push('callback ' + 3); }); - q.push(4, 2.9, (err, arg) => { + q.push(['arr', 'arr'], 2.9, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); - expect(q.length()).to.equal(1); - call_order.push('callback ' + 4); + call_order.push('callback arr'); }); - expect(q.length()).to.equal(4); + expect(q.length()).to.equal(5); expect(q.concurrency).to.equal(1); q.drain(() => { expect(call_order).to.eql([ 'process 2', 'callback 2', 'process 1', 'callback 1', - 'process 4', 'callback 4', + 'process arr', 'callback arr', + 'process arr', 'callback arr', 'process 3', 'callback 3' ]); expect(q.concurrency).to.equal(1); expect(q.length()).to.equal(0); - q.push([]) - expect(q.length()).to.equal(0) + expect(q.idle()).to.be.equal(true); done() }); try { @@ -79,27 +78,32 @@ describe('priorityQueue', () => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(2); + expect(q.running()).to.equal(1); call_order.push('callback ' + 1); }); q.push(2, 0.2, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(1); + expect(q.running()).to.equal(1); call_order.push('callback ' + 2); }); q.push(3, 3.8, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(0); + expect(q.running()).to.equal(1); call_order.push('callback ' + 3); }); q.push(4, 2.9, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); call_order.push('callback ' + 4); }); expect(q.length()).to.equal(4); + expect(q.running()).to.equal(0); expect(q.concurrency).to.equal(2); q.drain(() => { @@ -111,10 +115,30 @@ describe('priorityQueue', () => { ]); expect(q.concurrency).to.equal(2); expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); done(); }); }); + it('pushAsync', done => { + const calls = []; + const q = async.priorityQueue((task, cb) => { + if (task === 2) return cb(new Error('fail')); + cb(); + }) + + q.pushAsync(1, 1, () => { throw new Error('should not be called') }).then(() => calls.push(1)); + q.pushAsync(2, 0).catch(err => { + expect(err.message).to.equal('fail'); + calls.push(2); + }); + q.pushAsync([3, 4], 0).map(p => p.then(() => calls.push('arr'))); + q.drain(() => setTimeout(() => { + expect(calls).to.eql([2, 'arr', 'arr', 1]); + done(); + })); + }); + it('pause in worker with concurrency', (done) => { var call_order = []; var q = async.priorityQueue((task, callback) => { @@ -144,6 +168,104 @@ describe('priorityQueue', () => { }); }); + it('kill', (done) => { + var q = async.priorityQueue((/*task, callback*/) => { + setTimeout(() => { + throw new Error("Function should never be called"); + }, 20); + }, 1); + q.drain(() => { + throw new Error("Function should never be called"); + }); + + q.push(0); + + q.kill(); + + setTimeout(() => { + expect(q.length()).to.equal(0); + done(); + }, 40); + }); + + context('q.workersList():', () => { + it('should be the same length as running()', (done) => { + var q = async.priorityQueue((task, cb) => { + async.setImmediate(() => { + expect(q.workersList().length).to.equal(q.running()); + cb(); + }); + }, 2); + + q.drain(() => { + expect(q.workersList().length).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }); + + q.push('foo', 2); + q.push('bar', 1); + q.push('baz', 0); + }); + + it('should contain the items being processed', (done) => { + var itemsBeingProcessed = { + 'foo': [ + {data: 'bar', priority: 1}, + {data: 'foo', priority: 2} + ], + 'foo_cb': [ + {data: 'foo', priority: 2} + ], + 'bar': [ + {data: 'baz', priority: 0}, + {data: 'bar', priority: 1} + ], + 'bar_cb': [ + {data: 'bar', priority: 1}, + {data: 'foo', priority: 2} + ], + 'baz': [ + {data: 'baz', priority: 0} + ], + 'baz_cb': [ + {data: 'baz', priority: 0}, + {data: 'bar', priority: 1} + ] + }; + + function getWorkersListData(q) { + return q.workersList().map(({data, priority}) => { + return {data, priority}; + }); + } + + var q = async.priorityQueue((task, cb) => { + expect( + getWorkersListData(q) + ).to.eql(itemsBeingProcessed[task]); + expect(q.workersList().length).to.equal(q.running()); + async.setImmediate(() => { + expect( + getWorkersListData(q) + ).to.eql(itemsBeingProcessed[task+'_cb']); + expect(q.workersList().length).to.equal(q.running()); + cb(); + }); + }, 2); + + q.drain(() => { + expect(q.workersList()).to.eql([]); + expect(q.workersList().length).to.equal(q.running()); + done(); + }); + + q.push('foo', 2); + q.push('bar', 1); + q.push('baz', 0); + }); + }); + context('q.saturated(): ', () => { it('should call the saturated callback if tasks length is concurrency', (done) => { var calls = []; @@ -247,6 +369,27 @@ describe('priorityQueue', () => { }); }); + it('should call the drain callback if receives an empty push', (done) => { + var call_order = []; + + var q = async.priorityQueue((task, callback) => { + call_order.push(task); + callback('error', 'arg'); + }, 1); + + q.drain(() => { + call_order.push('drain') + expect(call_order).to.eql([ + 'drain' + ]); + expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }); + + q.push([], 1, () => { throw new Error('should not be called') }); + }); + it('should not call the drain callback if receives empty push and tasks are still pending', (done) => { var call_order = []; @@ -282,4 +425,49 @@ describe('priorityQueue', () => { q.push([], 1, () => {}); }); + + it('should be iterable', (done) => { + var q = async.priorityQueue((data, cb) => { + if (data === 3) { + q.push(6) + expect([...q]).to.eql([4, 5, 6]); + } + async.setImmediate(cb); + }); + + q.push([1, 2, 3, 4, 5]); + + expect([...q]).to.eql([1, 2, 3, 4, 5]); + + q.drain(() => { + expect([...q]).to.eql([]); + done(); + }); + }); + + it('should error when calling unshift', () => { + var q = async.priorityQueue(() => {}); + expect(() => { + q.unshift(1); + }).to.throw(); + }); + + it('should error when calling unshiftAsync', () => { + var q = async.priorityQueue(() => {}); + expect(() => { + q.unshiftAsync(1); + }).to.throw(); + }); + + it('should error when the callback is called more than once', (done) => { + var q = async.priorityQueue((task, callback) => { + callback(); + expect(() => { + callback(); + }).to.throw(); + done(); + }, 2); + + q.push(1); + }); }); |