summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAndré Guedes <andrebsguedes@gmail.com>2019-02-20 15:45:16 -0300
committerHubert Argasinski <argasinski.hubert@gmail.com>2019-02-20 13:45:16 -0500
commitce0a7e203e8d3b2a68ca674038dad86e1a221e00 (patch)
tree0dfe64b1b04ee328c602fde99031c4247b37a0be
parent59a8662da2d8a3f26e19b484627a87d45382006f (diff)
downloadasync-ce0a7e203e8d3b2a68ca674038dad86e1a221e00.tar.gz
fix: stop priorityQueue from draining while items still pending (#1623)
* fix: priorityQueue drains while items still pending Priority queue should check if the underlying queue is idle before draining. Currently the priorityQueue drains if an empty array of tasks is pushed even if other tasks are processing. * test: adds test for priorityQueue draining before tasks ended * test: lint fix * test: adds running assert to drain on test/priorityQueue.js Co-Authored-By: andrebsguedes <andrebsguedes@gmail.com>
-rw-r--r--lib/priorityQueue.js2
-rw-r--r--test/priorityQueue.js36
2 files changed, 37 insertions, 1 deletions
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js
index ce05805..294e409 100644
--- a/lib/priorityQueue.js
+++ b/lib/priorityQueue.js
@@ -40,7 +40,7 @@ export default function(worker, concurrency) {
if (!Array.isArray(data)) {
data = [data];
}
- if (data.length === 0) {
+ if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => q.drain());
}
diff --git a/test/priorityQueue.js b/test/priorityQueue.js
index 2d9d4a5..e1c3b29 100644
--- a/test/priorityQueue.js
+++ b/test/priorityQueue.js
@@ -239,5 +239,41 @@ describe('priorityQueue', () => {
q.push('foo4', 1, () => {calls.push('foo4 cb');});
});
});
+
+ it('should not call the drain callback if receives empty push and tasks are still pending', (done) => {
+ var call_order = [];
+
+ var q = async.priorityQueue((task, callback) => {
+ call_order.push('process ' + task);
+ callback('error', 'arg');
+ }, 1);
+
+ q.push(1, 1, (err, arg) => {
+ expect(err).to.equal('error');
+ expect(arg).to.equal('arg');
+ call_order.push('callback ' + 1);
+ });
+
+ q.push(2, 1, (err, arg) => {
+ expect(err).to.equal('error');
+ expect(arg).to.equal('arg');
+ call_order.push('callback ' + 2);
+ });
+
+ expect(q.length()).to.equal(2);
+
+ q.drain = function () {
+ expect(call_order).to.eql([
+ 'process 1', 'callback 1',
+ 'process 2', 'callback 2'
+ ]);
+ expect(q.concurrency).to.equal(1);
+ expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(0);
+ done();
+ };
+
+ q.push([], 1, () => {});
+ });
});