diff options
author | André Guedes <andrebsguedes@gmail.com> | 2019-02-20 15:45:16 -0300 |
---|---|---|
committer | Hubert Argasinski <argasinski.hubert@gmail.com> | 2019-02-20 13:45:16 -0500 |
commit | ce0a7e203e8d3b2a68ca674038dad86e1a221e00 (patch) | |
tree | 0dfe64b1b04ee328c602fde99031c4247b37a0be | |
parent | 59a8662da2d8a3f26e19b484627a87d45382006f (diff) | |
download | async-ce0a7e203e8d3b2a68ca674038dad86e1a221e00.tar.gz |
fix: stop priorityQueue from draining while items still pending (#1623)
* fix: priorityQueue drains while items still pending
Priority queue should check if the underlying queue is idle before draining.
Currently the priorityQueue drains if an empty array of tasks is pushed even if other tasks are processing.
* test: adds test for priorityQueue draining before tasks ended
* test: lint fix
* test: adds running assert to drain on test/priorityQueue.js
Co-Authored-By: andrebsguedes <andrebsguedes@gmail.com>
-rw-r--r-- | lib/priorityQueue.js | 2 | ||||
-rw-r--r-- | test/priorityQueue.js | 36 |
2 files changed, 37 insertions, 1 deletions
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js index ce05805..294e409 100644 --- a/lib/priorityQueue.js +++ b/lib/priorityQueue.js @@ -40,7 +40,7 @@ export default function(worker, concurrency) { if (!Array.isArray(data)) { data = [data]; } - if (data.length === 0) { + if (data.length === 0 && q.idle()) { // call drain immediately if there are no tasks return setImmediate(() => q.drain()); } diff --git a/test/priorityQueue.js b/test/priorityQueue.js index 2d9d4a5..e1c3b29 100644 --- a/test/priorityQueue.js +++ b/test/priorityQueue.js @@ -239,5 +239,41 @@ describe('priorityQueue', () => { q.push('foo4', 1, () => {calls.push('foo4 cb');}); }); }); + + it('should not call the drain callback if receives empty push and tasks are still pending', (done) => { + var call_order = []; + + var q = async.priorityQueue((task, callback) => { + call_order.push('process ' + task); + callback('error', 'arg'); + }, 1); + + q.push(1, 1, (err, arg) => { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + call_order.push('callback ' + 1); + }); + + q.push(2, 1, (err, arg) => { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + call_order.push('callback ' + 2); + }); + + expect(q.length()).to.equal(2); + + q.drain = function () { + expect(call_order).to.eql([ + 'process 1', 'callback 1', + 'process 2', 'callback 2' + ]); + expect(q.concurrency).to.equal(1); + expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }; + + q.push([], 1, () => {}); + }); }); |