diff options
Diffstat (limited to 'test')
-rw-r--r-- | test/es2017/awaitableFunctions.js | 60 | ||||
-rw-r--r-- | test/priorityQueue.js | 206 | ||||
-rw-r--r-- | test/queue.js | 2 |
3 files changed, 255 insertions, 13 deletions
diff --git a/test/es2017/awaitableFunctions.js b/test/es2017/awaitableFunctions.js index f961857..d68a18f 100644 --- a/test/es2017/awaitableFunctions.js +++ b/test/es2017/awaitableFunctions.js @@ -540,7 +540,7 @@ module.exports = function () { it('should work with queues', async () => { const q = async.queue(async (data) => { if (data === 2) throw new Error('oh noes') - await new Promise(resolve => setTimeout(resolve, 10)) + await new Promise(resolve => setTimeout(() => resolve(data), 10)) return data }, 5) @@ -561,7 +561,7 @@ module.exports = function () { const multiP = Promise.all(q.push([9, 10])) await q.drain() - await multiP + const res = await multiP expect(calls.join()).to.eql([ 'saturated', 'push cb 1', @@ -581,6 +581,62 @@ module.exports = function () { expect(emptyCalls).to.eql([ 'empty', ]) + + expect(res).to.eql([ + 9, + 10 + ]) + }) + + it('should work with priorityQueues', async () => { + const q = async.priorityQueue(async (data) => { + if (data === 2) throw new Error('oh noes') + await new Promise(resolve => setTimeout(() => resolve(data), 10)) + return data + }, 5) + + const calls = [] + const errorCalls = [] + const emptyCalls = [] + q.error().catch(d => errorCalls.push('error ' + d)) + q.saturated().then(() => calls.push('saturated')) + q.unsaturated().then(() => calls.push('unsaturated')) + q.empty().then(() => emptyCalls.push('empty')) + + q.push(1, 1).then(d => calls.push('push cb ' + d)) + q.push(2, 1).then(d => errorCalls.push('push cb ' + d)) + q.push([3, 4, 5, 6], 0).map(p => p.then(d => calls.push('push cb ' + d))) + q.push(7, 3).then(d => calls.push('push cb ' + d)) + q.push(8, 3).then(d => calls.push('push cb ' + d)) + + const multiP = Promise.all(q.push([9, 10], 1)) + + await q.drain() + const res = await multiP + expect(calls.join()).to.eql([ + 'saturated', + 'push cb 3', + 'push cb 4', + 'push cb 5', + 'push cb 6', + 'push cb 1', + 'unsaturated', + 'push cb 7', + 'push cb 8' + ].join()) + + expect(errorCalls).to.eql([ + 'push cb undefined', + 'error Error: oh noes', + ]) + expect(emptyCalls).to.eql([ + 'empty', + ]) + + expect(res).to.eql([ + 9, + 10 + ]) }) /* diff --git a/test/priorityQueue.js b/test/priorityQueue.js index 564a0d6..40f7fe6 100644 --- a/test/priorityQueue.js +++ b/test/priorityQueue.js @@ -16,13 +16,13 @@ describe('priorityQueue', () => { q.push(1, 1.4, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); - expect(q.length()).to.equal(2); + expect(q.length()).to.equal(3); call_order.push('callback ' + 1); }); q.push(2, 0.2, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); - expect(q.length()).to.equal(3); + expect(q.length()).to.equal(4); call_order.push('callback ' + 2); }); q.push(3, 3.8, (err, arg) => { @@ -31,26 +31,25 @@ describe('priorityQueue', () => { expect(q.length()).to.equal(0); call_order.push('callback ' + 3); }); - q.push(4, 2.9, (err, arg) => { + q.push(['arr', 'arr'], 2.9, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); - expect(q.length()).to.equal(1); - call_order.push('callback ' + 4); + call_order.push('callback arr'); }); - expect(q.length()).to.equal(4); + expect(q.length()).to.equal(5); expect(q.concurrency).to.equal(1); q.drain(() => { expect(call_order).to.eql([ 'process 2', 'callback 2', 'process 1', 'callback 1', - 'process 4', 'callback 4', + 'process arr', 'callback arr', + 'process arr', 'callback arr', 'process 3', 'callback 3' ]); expect(q.concurrency).to.equal(1); expect(q.length()).to.equal(0); - q.push([]) - expect(q.length()).to.equal(0) + expect(q.idle()).to.be.equal(true); done() }); try { @@ -79,27 +78,32 @@ describe('priorityQueue', () => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(2); + expect(q.running()).to.equal(1); call_order.push('callback ' + 1); }); q.push(2, 0.2, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(1); + expect(q.running()).to.equal(1); call_order.push('callback ' + 2); }); q.push(3, 3.8, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(0); + expect(q.running()).to.equal(1); call_order.push('callback ' + 3); }); q.push(4, 2.9, (err, arg) => { expect(err).to.equal('error'); expect(arg).to.equal('arg'); expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); call_order.push('callback ' + 4); }); expect(q.length()).to.equal(4); + expect(q.running()).to.equal(0); expect(q.concurrency).to.equal(2); q.drain(() => { @@ -111,10 +115,30 @@ describe('priorityQueue', () => { ]); expect(q.concurrency).to.equal(2); expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); done(); }); }); + it('pushAsync', done => { + const calls = []; + const q = async.priorityQueue((task, cb) => { + if (task === 2) return cb(new Error('fail')); + cb(); + }) + + q.pushAsync(1, 1, () => { throw new Error('should not be called') }).then(() => calls.push(1)); + q.pushAsync(2, 0).catch(err => { + expect(err.message).to.equal('fail'); + calls.push(2); + }); + q.pushAsync([3, 4], 0).map(p => p.then(() => calls.push('arr'))); + q.drain(() => setTimeout(() => { + expect(calls).to.eql([2, 'arr', 'arr', 1]); + done(); + })); + }); + it('pause in worker with concurrency', (done) => { var call_order = []; var q = async.priorityQueue((task, callback) => { @@ -144,6 +168,104 @@ describe('priorityQueue', () => { }); }); + it('kill', (done) => { + var q = async.priorityQueue((/*task, callback*/) => { + setTimeout(() => { + throw new Error("Function should never be called"); + }, 20); + }, 1); + q.drain(() => { + throw new Error("Function should never be called"); + }); + + q.push(0); + + q.kill(); + + setTimeout(() => { + expect(q.length()).to.equal(0); + done(); + }, 40); + }); + + context('q.workersList():', () => { + it('should be the same length as running()', (done) => { + var q = async.priorityQueue((task, cb) => { + async.setImmediate(() => { + expect(q.workersList().length).to.equal(q.running()); + cb(); + }); + }, 2); + + q.drain(() => { + expect(q.workersList().length).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }); + + q.push('foo', 2); + q.push('bar', 1); + q.push('baz', 0); + }); + + it('should contain the items being processed', (done) => { + var itemsBeingProcessed = { + 'foo': [ + {data: 'bar', priority: 1}, + {data: 'foo', priority: 2} + ], + 'foo_cb': [ + {data: 'foo', priority: 2} + ], + 'bar': [ + {data: 'baz', priority: 0}, + {data: 'bar', priority: 1} + ], + 'bar_cb': [ + {data: 'bar', priority: 1}, + {data: 'foo', priority: 2} + ], + 'baz': [ + {data: 'baz', priority: 0} + ], + 'baz_cb': [ + {data: 'baz', priority: 0}, + {data: 'bar', priority: 1} + ] + }; + + function getWorkersListData(q) { + return q.workersList().map(({data, priority}) => { + return {data, priority}; + }); + } + + var q = async.priorityQueue((task, cb) => { + expect( + getWorkersListData(q) + ).to.eql(itemsBeingProcessed[task]); + expect(q.workersList().length).to.equal(q.running()); + async.setImmediate(() => { + expect( + getWorkersListData(q) + ).to.eql(itemsBeingProcessed[task+'_cb']); + expect(q.workersList().length).to.equal(q.running()); + cb(); + }); + }, 2); + + q.drain(() => { + expect(q.workersList()).to.eql([]); + expect(q.workersList().length).to.equal(q.running()); + done(); + }); + + q.push('foo', 2); + q.push('bar', 1); + q.push('baz', 0); + }); + }); + context('q.saturated(): ', () => { it('should call the saturated callback if tasks length is concurrency', (done) => { var calls = []; @@ -247,6 +369,27 @@ describe('priorityQueue', () => { }); }); + it('should call the drain callback if receives an empty push', (done) => { + var call_order = []; + + var q = async.priorityQueue((task, callback) => { + call_order.push(task); + callback('error', 'arg'); + }, 1); + + q.drain(() => { + call_order.push('drain') + expect(call_order).to.eql([ + 'drain' + ]); + expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }); + + q.push([], 1, () => { throw new Error('should not be called') }); + }); + it('should not call the drain callback if receives empty push and tasks are still pending', (done) => { var call_order = []; @@ -282,4 +425,49 @@ describe('priorityQueue', () => { q.push([], 1, () => {}); }); + + it('should be iterable', (done) => { + var q = async.priorityQueue((data, cb) => { + if (data === 3) { + q.push(6) + expect([...q]).to.eql([4, 5, 6]); + } + async.setImmediate(cb); + }); + + q.push([1, 2, 3, 4, 5]); + + expect([...q]).to.eql([1, 2, 3, 4, 5]); + + q.drain(() => { + expect([...q]).to.eql([]); + done(); + }); + }); + + it('should error when calling unshift', () => { + var q = async.priorityQueue(() => {}); + expect(() => { + q.unshift(1); + }).to.throw(); + }); + + it('should error when calling unshiftAsync', () => { + var q = async.priorityQueue(() => {}); + expect(() => { + q.unshiftAsync(1); + }).to.throw(); + }); + + it('should error when the callback is called more than once', (done) => { + var q = async.priorityQueue((task, callback) => { + callback(); + expect(() => { + callback(); + }).to.throw(); + done(); + }, 2); + + q.push(1); + }); }); diff --git a/test/queue.js b/test/queue.js index f99e91a..615dd6b 100644 --- a/test/queue.js +++ b/test/queue.js @@ -170,7 +170,6 @@ describe('queue', function(){ }) q.pushAsync([3, 4]).map(p => p.then(() => calls.push('arr'))) q.drain(() => setTimeout(() => { - console.log('drain') expect(calls).to.eql([1, 2, 'arr', 'arr']) done() })) @@ -190,7 +189,6 @@ describe('queue', function(){ }) q.unshiftAsync([3, 4]).map(p => p.then(() => calls.push('arr'))) q.drain(() => setTimeout(() => { - console.log('drain') expect(calls).to.eql(['arr', 'arr', 2, 1]) done() })) |