summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/priorityQueue.js2
-rw-r--r--test/priorityQueue.js36
2 files changed, 37 insertions, 1 deletions
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js
index ce05805..294e409 100644
--- a/lib/priorityQueue.js
+++ b/lib/priorityQueue.js
@@ -40,7 +40,7 @@ export default function(worker, concurrency) {
if (!Array.isArray(data)) {
data = [data];
}
- if (data.length === 0) {
+ if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate(() => q.drain());
}
diff --git a/test/priorityQueue.js b/test/priorityQueue.js
index 2d9d4a5..e1c3b29 100644
--- a/test/priorityQueue.js
+++ b/test/priorityQueue.js
@@ -239,5 +239,41 @@ describe('priorityQueue', () => {
q.push('foo4', 1, () => {calls.push('foo4 cb');});
});
});
+
+ it('should not call the drain callback if receives empty push and tasks are still pending', (done) => {
+ var call_order = [];
+
+ var q = async.priorityQueue((task, callback) => {
+ call_order.push('process ' + task);
+ callback('error', 'arg');
+ }, 1);
+
+ q.push(1, 1, (err, arg) => {
+ expect(err).to.equal('error');
+ expect(arg).to.equal('arg');
+ call_order.push('callback ' + 1);
+ });
+
+ q.push(2, 1, (err, arg) => {
+ expect(err).to.equal('error');
+ expect(arg).to.equal('arg');
+ call_order.push('callback ' + 2);
+ });
+
+ expect(q.length()).to.equal(2);
+
+ q.drain = function () {
+ expect(call_order).to.eql([
+ 'process 1', 'callback 1',
+ 'process 2', 'callback 2'
+ ]);
+ expect(q.concurrency).to.equal(1);
+ expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(0);
+ done();
+ };
+
+ q.push([], 1, () => {});
+ });
});