diff options
author | Alexander Early <alexander.early@gmail.com> | 2015-06-01 19:24:52 -0700 |
---|---|---|
committer | Alexander Early <alexander.early@gmail.com> | 2015-06-01 19:24:52 -0700 |
commit | ba7c7ba719d48c910879998be0900f5d210ce746 (patch) | |
tree | 8c2324a7c874c2f13c118bfbb6a5038644ca783d | |
parent | 540e579c848bf248a811950d66997eb0f8f2f6c6 (diff) | |
parent | 02b715cc237c4080d0f0c607e294d0ad76bc3a11 (diff) | |
download | async-ba7c7ba719d48c910879998be0900f5d210ce746.tar.gz |
Merge pull request #772 from justincy/queue-concurrency
Queue concurrency
-rw-r--r-- | lib/async.js | 55 | ||||
-rwxr-xr-x | test/test-async.js | 82 |
2 files changed, 60 insertions, 77 deletions
diff --git a/lib/async.js b/lib/async.js index add4219..629b2a5 100644 --- a/lib/async.js +++ b/lib/async.js @@ -838,8 +838,21 @@ if (q.tasks.length === q.concurrency) { q.saturated(); } - async.setImmediate(q.process); }); + async.setImmediate(q.process); + } + function _next(q, tasks) { + return function(){ + workers -= 1; + var args = arguments; + _arrayEach(tasks, function (task) { + task.callback.apply(task, args); + }); + if (q.tasks.length + workers === 0) { + q.drain(); + } + q.process(); + }; } var workers = 0; @@ -863,32 +876,22 @@ }, process: function () { if (!q.paused && workers < q.concurrency && q.tasks.length) { - var tasks = payload ? - q.tasks.splice(0, payload) : - q.tasks.splice(0, q.tasks.length); - - var data = _map(tasks, function (task) { - return task.data; - }); - - if (q.tasks.length === 0) { - q.empty(); - } - workers += 1; - var cb = only_once(next); - worker(data, cb); - } - - function next() { - workers -= 1; - var args = arguments; - _arrayEach(tasks, function (task) { - task.callback.apply(task, args); - }); - if (q.tasks.length + workers === 0) { - q.drain(); + while(workers < q.concurrency && q.tasks.length){ + var tasks = payload ? + q.tasks.splice(0, payload) : + q.tasks.splice(0, q.tasks.length); + + var data = _map(tasks, function (task) { + return task.data; + }); + + if (q.tasks.length === 0) { + q.empty(); + } + workers += 1; + var cb = only_once(_next(q, tasks)); + worker(data, cb); } - q.process(); } }, length: function () { diff --git a/test/test-async.js b/test/test-async.js index 14ea484..fa4f6c4 100755 --- a/test/test-async.js +++ b/test/test-async.js @@ -2878,59 +2878,39 @@ exports['queue'] = { }); }, +// The original queue implementation allowed the concurrency to be changed only +// on the same event loop during which a task was added to the queue. This +// test attempts to be a more rubust test. +// Start with a concurrency of 1. Wait until a leter event loop and change +// the concurrency to 2. Wait again for a later loop then verify the concurrency. +// Repeat that one more time by chaning the concurrency to 5. 'changing concurrency': function (test) { - var call_order = [], - delays = [40,20,60,20]; - - // worker1: --1-2---3-4 - // order of completion: 1,2,3,4 - - var q = async.queue(function (task, callback) { - setTimeout(function () { - call_order.push('process ' + task); - callback('error', 'arg'); - }, delays.splice(0,1)[0]); - }, 2); - - q.push(1, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 3); - call_order.push('callback ' + 1); - }); - q.push(2, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 2); - call_order.push('callback ' + 2); - }); - q.push(3, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 1); - call_order.push('callback ' + 3); - }); - q.push(4, function (err, arg) { - test.equal(err, 'error'); - test.equal(arg, 'arg'); - test.equal(q.length(), 0); - call_order.push('callback ' + 4); - }); - test.equal(q.length(), 4); - test.equal(q.concurrency, 2); - q.concurrency = 1; - - setTimeout(function () { - test.same(call_order, [ - 'process 1', 'callback 1', - 'process 2', 'callback 2', - 'process 3', 'callback 3', - 'process 4', 'callback 4' - ]); - test.equal(q.concurrency, 1); - test.equal(q.length(), 0); + + var q = async.queue(function(task, callback){ + setTimeout(function(){ + callback(); + }, 100); + }, 1); + + for(var i = 0; i < 50; i++){ + q.push(''); + } + + q.drain = function(){ test.done(); - }, 250); + }; + + setTimeout(function(){ + test.equal(q.concurrency, 1); + q.concurrency = 2; + setTimeout(function(){ + test.equal(q.running(), 2); + q.concurrency = 5; + setTimeout(function(){ + test.equal(q.running(), 5); + }, 500); + }, 500); + }, 500); }, 'push without callback': function (test) { |