summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorCaolan McMahon <caolan.mcmahon@gmail.com>2014-05-23 16:20:40 +0100
committerCaolan McMahon <caolan.mcmahon@gmail.com>2014-05-23 16:20:40 +0100
commit66ca10e52795707ca436154bddc38c560864e91c (patch)
tree7490b5bdef5feb34b82bed15c4870d77a4e242e4
parent6a6615e987d1e42a9223688e84b722ac64f4d816 (diff)
parent2f8680c6bd96a0ba7b38f052d200f6364a02de69 (diff)
downloadasync-66ca10e52795707ca436154bddc38c560864e91c.tar.gz
Merge pull request #536 from vsivsi/queue-pause-concurrency-issue
async.queue() can lose concurrency after .pause() / .resume()
-rwxr-xr-xlib/async.js23
-rwxr-xr-xtest/test-async.js45
2 files changed, 58 insertions, 10 deletions
diff --git a/lib/async.js b/lib/async.js
index 01e8afc..a13f835 100755
--- a/lib/async.js
+++ b/lib/async.js
@@ -821,23 +821,26 @@
pause: function () {
if (q.paused === true) { return; }
q.paused = true;
- q.process();
},
resume: function () {
if (q.paused === false) { return; }
q.paused = false;
- q.process();
+ // Need to call q.process once per concurrent
+ // worker to preserve full concurrency after pause
+ for (var w = 1; w <= q.concurrency; w++) {
+ async.setImmediate(q.process);
+ }
}
};
return q;
};
-
+
async.priorityQueue = function (worker, concurrency) {
-
+
function _compareTasks(a, b){
return a.priority - b.priority;
};
-
+
function _binarySearch(sequence, item, compare) {
var beg = -1,
end = sequence.length - 1;
@@ -851,7 +854,7 @@
}
return beg;
}
-
+
function _insert(q, data, priority, callback) {
if (!q.started){
q.started = true;
@@ -873,7 +876,7 @@
priority: priority,
callback: typeof callback === 'function' ? callback : null
};
-
+
q.tasks.splice(_binarySearch(q.tasks, item, _compareTasks) + 1, 0, item);
if (q.saturated && q.tasks.length === q.concurrency) {
@@ -882,15 +885,15 @@
async.setImmediate(q.process);
});
}
-
+
// Start with a normal queue
var q = async.queue(worker, concurrency);
-
+
// Override push to accept second parameter representing priority
q.push = function (data, priority, callback) {
_insert(q, data, priority, callback);
};
-
+
// Remove unshift function
delete q.unshift;
diff --git a/test/test-async.js b/test/test-async.js
index 79dd680..6a4e8a3 100755
--- a/test/test-async.js
+++ b/test/test-async.js
@@ -2421,6 +2421,51 @@ exports['queue pause'] = function(test) {
}, 800);
}
+exports['queue pause with concurrency'] = function(test) {
+ var call_order = [],
+ task_timeout = 100,
+ pause_timeout = 50,
+ resume_timeout = 300,
+ tasks = [ 1, 2, 3, 4, 5, 6 ],
+
+ elapsed = (function () {
+ var start = +Date.now();
+ return function () { return Math.floor((+Date.now() - start) / 100) * 100; };
+ })();
+
+ var q = async.queue(function (task, callback) {
+ setTimeout(function () {
+ call_order.push('process ' + task);
+ call_order.push('timeout ' + elapsed());
+ callback();
+ }, task_timeout);
+ }, 2);
+
+ q.push(tasks);
+
+ setTimeout(function () {
+ q.pause();
+ test.equal(q.paused, true);
+ }, pause_timeout);
+
+ setTimeout(function () {
+ q.resume();
+ test.equal(q.paused, false);
+ }, resume_timeout);
+
+ setTimeout(function () {
+ test.same(call_order, [
+ 'process 1', 'timeout 100',
+ 'process 2', 'timeout 100',
+ 'process 3', 'timeout 400',
+ 'process 4', 'timeout 400',
+ 'process 5', 'timeout 500',
+ 'process 6', 'timeout 500'
+ ]);
+ test.done();
+ }, 800);
+}
+
exports['queue kill'] = function (test) {
var q = async.queue(function (task, callback) {
setTimeout(function () {