From ce0a7e203e8d3b2a68ca674038dad86e1a221e00 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Andr=C3=A9=20Guedes?= Date: Wed, 20 Feb 2019 15:45:16 -0300 Subject: fix: stop priorityQueue from draining while items still pending (#1623) * fix: priorityQueue drains while items still pending Priority queue should check if the underlying queue is idle before draining. Currently the priorityQueue drains if an empty array of tasks is pushed even if other tasks are processing. * test: adds test for priorityQueue draining before tasks ended * test: lint fix * test: adds running assert to drain on test/priorityQueue.js Co-Authored-By: andrebsguedes --- lib/priorityQueue.js | 2 +- test/priorityQueue.js | 36 ++++++++++++++++++++++++++++++++++++ 2 files changed, 37 insertions(+), 1 deletion(-) diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js index ce05805..294e409 100644 --- a/lib/priorityQueue.js +++ b/lib/priorityQueue.js @@ -40,7 +40,7 @@ export default function(worker, concurrency) { if (!Array.isArray(data)) { data = [data]; } - if (data.length === 0) { + if (data.length === 0 && q.idle()) { // call drain immediately if there are no tasks return setImmediate(() => q.drain()); } diff --git a/test/priorityQueue.js b/test/priorityQueue.js index 2d9d4a5..e1c3b29 100644 --- a/test/priorityQueue.js +++ b/test/priorityQueue.js @@ -239,5 +239,41 @@ describe('priorityQueue', () => { q.push('foo4', 1, () => {calls.push('foo4 cb');}); }); }); + + it('should not call the drain callback if receives empty push and tasks are still pending', (done) => { + var call_order = []; + + var q = async.priorityQueue((task, callback) => { + call_order.push('process ' + task); + callback('error', 'arg'); + }, 1); + + q.push(1, 1, (err, arg) => { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + call_order.push('callback ' + 1); + }); + + q.push(2, 1, (err, arg) => { + expect(err).to.equal('error'); + expect(arg).to.equal('arg'); + call_order.push('callback ' + 2); + }); + + expect(q.length()).to.equal(2); + + q.drain = function () { + expect(call_order).to.eql([ + 'process 1', 'callback 1', + 'process 2', 'callback 2' + ]); + expect(q.concurrency).to.equal(1); + expect(q.length()).to.equal(0); + expect(q.running()).to.equal(0); + done(); + }; + + q.push([], 1, () => {}); + }); }); -- cgit v1.2.1