From bb4f898a42a364521f6df0d15183d70504a99ef5 Mon Sep 17 00:00:00 2001 From: Justin Date: Mon, 1 Jun 2015 20:00:32 +0000 Subject: failing test for changing queue concurrency; #747 --- test/test-async.js | 82 +++++++++++++++++++++--------------------------------- 1 file changed, 31 insertions(+), 51 deletions(-) diff --git a/test/test-async.js b/test/test-async.js index 670d14a..9bfdde0 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -2849,59 +2849,39 @@ exports['queue'] = { }); }, +// The original queue implementation allowed the concurrency to be changed only +// on the same event loop during which a task was added to the queue. This +// test attempts to be a more rubust test. +// Start with a concurrency of 1. Wait until a leter event loop and change +// the concurrency to 2. Wait again for a later loop then verify the concurrency. +// Repeat that one more time by chaning the concurrency to 5. 'changing concurrency': function (test) { - var call_order = [], - delays = [40,20,60,20]; - - // worker1: --1-2---3-4 - // order of completion: 1,2,3,4 - - var q = async.queue(function (task, callback) { - setTimeout(function () { - call_order.push('process ' + task); - callback('error', 'arg'); - }, delays.splice(0,1)[0]); - }, 2); - - q.push(1, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 3); - call_order.push('callback ' + 1); - }); - q.push(2, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 2); - call_order.push('callback ' + 2); - }); - q.push(3, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 1); - call_order.push('callback ' + 3); - }); - q.push(4, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 0); - call_order.push('callback ' + 4); - }); - test.equal(q.length(), 4); - test.equal(q.concurrency, 2); - q.concurrency = 1; - - setTimeout(function () { - test.same(call_order, [ - 'process 1', 'callback 1', - 'process 2', 'callback 2', - 'process 3', 'callback 3', - 'process 4', 'callback 4' - ]); - test.equal(q.concurrency, 1); - test.equal(q.length(), 0); + + var q = async.queue(function(task, callback){ + setTimeout(function(){ + callback(); + }, 100); + }, 1); + + for(var i = 0; i < 50; i++){ + q.push(''); + } + + q.drain = function(){ test.done(); - }, 250); + }; + + setTimeout(function(){ + test.equal(q.concurrency, 1); + q.concurrency = 2; + setTimeout(function(){ + test.equal(q.running(), 2); + q.concurrency = 5; + setTimeout(function(){ + test.equal(q.running(), 5); + }, 500); + }, 500); + }, 500); }, 'push without callback': function (test) { -- cgit v1.2.1 From 3f007e5355c9f194b368e36d8838e15a7bd78a12 Mon Sep 17 00:00:00 2001 From: Justin Date: Mon, 1 Jun 2015 20:45:14 +0000 Subject: allow concurrency of queue to be changed --- lib/async.js | 55 +++++++++++++++++++++++++++++-------------------------- 1 file changed, 29 insertions(+), 26 deletions(-) diff --git a/lib/async.js b/lib/async.js index abc1f7b..9c4f0bb 100644 --- a/lib/async.js +++ b/lib/async.js @@ -834,8 +834,21 @@ if (q.tasks.length === q.concurrency) { q.saturated(); } - async.setImmediate(q.process); }); + setImmediate(q.process); + } + function _next(q, tasks) { + return function(){ + workers -= 1; + var args = arguments; + _arrayEach(tasks, function (task) { + task.callback.apply(task, args); + }); + if (q.tasks.length + workers === 0) { + q.drain(); + } + q.process(); + }; } var workers = 0; @@ -859,32 +872,22 @@ }, process: function () { if (!q.paused && workers < q.concurrency && q.tasks.length) { - var tasks = payload ? - q.tasks.splice(0, payload) : - q.tasks.splice(0, q.tasks.length); - - var data = _map(tasks, function (task) { - return task.data; - }); - - if (q.tasks.length === 0) { - q.empty(); - } - workers += 1; - var cb = only_once(next); - worker(data, cb); - } - - function next() { - workers -= 1; - var args = arguments; - _arrayEach(tasks, function (task) { - task.callback.apply(task, args); - }); - if (q.tasks.length + workers === 0) { - q.drain(); + while(workers < q.concurrency && q.tasks.length){ + var tasks = payload ? + q.tasks.splice(0, payload) : + q.tasks.splice(0, q.tasks.length); + + var data = _map(tasks, function (task) { + return task.data; + }); + + if (q.tasks.length === 0) { + q.empty(); + } + workers += 1; + var cb = only_once(_next(q, tasks)); + setImmediate(worker(data, cb)); } - q.process(); } }, length: function () { -- cgit v1.2.1 From b25793cd0f3e2f08e4a2eba2eac79a693316d7be Mon Sep 17 00:00:00 2001 From: Justin Date: Mon, 1 Jun 2015 21:43:40 +0000 Subject: async.setImmediate --- lib/async.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/async.js b/lib/async.js index 9c4f0bb..75c99f4 100644 --- a/lib/async.js +++ b/lib/async.js @@ -835,7 +835,7 @@ q.saturated(); } }); - setImmediate(q.process); + async.setImmediate(q.process); } function _next(q, tasks) { return function(){ @@ -886,7 +886,7 @@ } workers += 1; var cb = only_once(_next(q, tasks)); - setImmediate(worker(data, cb)); + async.setImmediate(worker(data, cb)); } } }, -- cgit v1.2.1 From 02b715cc237c4080d0f0c607e294d0ad76bc3a11 Mon Sep 17 00:00:00 2001 From: Justin Date: Tue, 2 Jun 2015 00:42:19 +0000 Subject: fix tests in node 0.12 --- lib/async.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/async.js b/lib/async.js index 75c99f4..aca7156 100644 --- a/lib/async.js +++ b/lib/async.js @@ -886,7 +886,7 @@ } workers += 1; var cb = only_once(_next(q, tasks)); - async.setImmediate(worker(data, cb)); + worker(data, cb); } } }, -- cgit v1.2.1