diff options
author | Caolan McMahon <caolan.mcmahon@gmail.com> | 2014-05-23 16:20:40 +0100 |
---|---|---|
committer | Caolan McMahon <caolan.mcmahon@gmail.com> | 2014-05-23 16:20:40 +0100 |
commit | 66ca10e52795707ca436154bddc38c560864e91c (patch) | |
tree | 7490b5bdef5feb34b82bed15c4870d77a4e242e4 | |
parent | 6a6615e987d1e42a9223688e84b722ac64f4d816 (diff) | |
parent | 2f8680c6bd96a0ba7b38f052d200f6364a02de69 (diff) | |
download | async-66ca10e52795707ca436154bddc38c560864e91c.tar.gz |
Merge pull request #536 from vsivsi/queue-pause-concurrency-issue
async.queue() can lose concurrency after .pause() / .resume()
-rwxr-xr-x | lib/async.js | 23 | ||||
-rwxr-xr-x | test/test-async.js | 45 |
2 files changed, 58 insertions, 10 deletions
diff --git a/lib/async.js b/lib/async.js index 01e8afc..a13f835 100755 --- a/lib/async.js +++ b/lib/async.js @@ -821,23 +821,26 @@ pause: function () { if (q.paused === true) { return; } q.paused = true; - q.process(); }, resume: function () { if (q.paused === false) { return; } q.paused = false; - q.process(); + // Need to call q.process once per concurrent + // worker to preserve full concurrency after pause + for (var w = 1; w <= q.concurrency; w++) { + async.setImmediate(q.process); + } } }; return q; }; - + async.priorityQueue = function (worker, concurrency) { - + function _compareTasks(a, b){ return a.priority - b.priority; }; - + function _binarySearch(sequence, item, compare) { var beg = -1, end = sequence.length - 1; @@ -851,7 +854,7 @@ } return beg; } - + function _insert(q, data, priority, callback) { if (!q.started){ q.started = true; @@ -873,7 +876,7 @@ priority: priority, callback: typeof callback === 'function' ? callback : null }; - + q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item); if (q.saturated && q.tasks.length === q.concurrency) { @@ -882,15 +885,15 @@ async.setImmediate(q.process); }); } - + // Start with a normal queue var q = async.queue(worker, concurrency); - + // Override push to accept second parameter representing priority q.push = function (data, priority, callback) { _insert(q, data, priority, callback); }; - + // Remove unshift function delete q.unshift; diff --git a/test/test-async.js b/test/test-async.js index 79dd680..6a4e8a3 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -2421,6 +2421,51 @@ exports['queue pause'] = function(test) { }, 800); } +exports['queue pause with concurrency'] = function(test) { + var call_order = [], + task_timeout = 100, + pause_timeout = 50, + resume_timeout = 300, + tasks = [ 1, 2, 3, 4, 5, 6 ], + + elapsed = (function () { + var start = +Date.now(); + return function () { return Math.floor((+Date.now() - start) / 100) * 100; }; + })(); + + var q = async.queue(function (task, callback) { + setTimeout(function () { + call_order.push('process ' + task); + call_order.push('timeout ' + elapsed()); + callback(); + }, task_timeout); + }, 2); + + q.push(tasks); + + setTimeout(function () { + q.pause(); + test.equal(q.paused, true); + }, pause_timeout); + + setTimeout(function () { + q.resume(); + test.equal(q.paused, false); + }, resume_timeout); + + setTimeout(function () { + test.same(call_order, [ + 'process 1', 'timeout 100', + 'process 2', 'timeout 100', + 'process 3', 'timeout 400', + 'process 4', 'timeout 400', + 'process 5', 'timeout 500', + 'process 6', 'timeout 500' + ]); + test.done(); + }, 800); +} + exports['queue kill'] = function (test) { var q = async.queue(function (task, callback) { setTimeout(function () { |