diff options
-rw-r--r-- | lib/priorityQueue.js | 2 | ||||
-rw-r--r-- | test/priorityQueue.js | 36 |
2 files changed, 37 insertions, 1 deletions
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js index ce05805..294e409 100644 --- a/lib/priorityQueue.js +++ b/lib/priorityQueue.js @@ -40,7 +40,7 @@ export default function(worker, concurrency) { if (!Array.isArray(data)) { data = [data]; } - if (data.length === 0) { + if (data.length === 0 && q.idle()) { // call drain immediately if there are no tasks return setImmediate(() => q.drain()); } diff --git a/test/priorityQueue.js b/test/priorityQueue.js index 2d9d4a5..e1c3b29 100644 --- a/test/priorityQueue.js +++ b/test/priorityQueue.js @@ -239,5 +239,41 @@ describe('priorityQueue', () => { q.push('foo4', 1, () => {calls.push('foo4 cb');}); }); }); + + it('should not call the drain callback if receives empty push and tasks are still pending', (done) => { + var call_order = []; + + var q = async.priorityQueue((task, callback) => { + call_order.push('process ' + task); + callback('error', 'arg'); + }, 1); + + q.push(1, 1, (err, arg) => { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + call_order.push('callback ' + 1); + }); + + q.push(2, 1, (err, arg) => { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + call_order.push('callback ' + 2); + }); + + expect(q.length()).to.equal(2); + + q.drain = function () { + expect(call_order).to.eql([ + 'process 1', 'callback 1', + 'process 2', 'callback 2' + ]); + expect(q.concurrency).to.equal(1); + expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }; + + q.push([], 1, () => {}); + }); }); |