summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Early <alexander.early@gmail.com>2015-06-01 19:24:52 -0700
committerAlexander Early <alexander.early@gmail.com>2015-06-01 19:24:52 -0700
commitba7c7ba719d48c910879998be0900f5d210ce746 (patch)
tree8c2324a7c874c2f13c118bfbb6a5038644ca783d
parent540e579c848bf248a811950d66997eb0f8f2f6c6 (diff)
parent02b715cc237c4080d0f0c607e294d0ad76bc3a11 (diff)
downloadasync-ba7c7ba719d48c910879998be0900f5d210ce746.tar.gz
Merge pull request #772 from justincy/queue-concurrency
Queue concurrency
-rw-r--r--lib/async.js55
-rwxr-xr-xtest/test-async.js82
2 files changed, 60 insertions, 77 deletions
diff --git a/lib/async.js b/lib/async.js
index add4219..629b2a5 100644
--- a/lib/async.js
+++ b/lib/async.js
@@ -838,8 +838,21 @@
if (q.tasks.length === q.concurrency) {
q.saturated();
}
- async.setImmediate(q.process);
});
+ async.setImmediate(q.process);
+ }
+ function _next(q, tasks) {
+ return function(){
+ workers -= 1;
+ var args = arguments;
+ _arrayEach(tasks, function (task) {
+ task.callback.apply(task, args);
+ });
+ if (q.tasks.length + workers === 0) {
+ q.drain();
+ }
+ q.process();
+ };
}
var workers = 0;
@@ -863,32 +876,22 @@
},
process: function () {
if (!q.paused && workers < q.concurrency && q.tasks.length) {
- var tasks = payload ?
- q.tasks.splice(0, payload) :
- q.tasks.splice(0, q.tasks.length);
-
- var data = _map(tasks, function (task) {
- return task.data;
- });
-
- if (q.tasks.length === 0) {
- q.empty();
- }
- workers += 1;
- var cb = only_once(next);
- worker(data, cb);
- }
-
- function next() {
- workers -= 1;
- var args = arguments;
- _arrayEach(tasks, function (task) {
- task.callback.apply(task, args);
- });
- if (q.tasks.length + workers === 0) {
- q.drain();
+ while(workers < q.concurrency && q.tasks.length){
+ var tasks = payload ?
+ q.tasks.splice(0, payload) :
+ q.tasks.splice(0, q.tasks.length);
+
+ var data = _map(tasks, function (task) {
+ return task.data;
+ });
+
+ if (q.tasks.length === 0) {
+ q.empty();
+ }
+ workers += 1;
+ var cb = only_once(_next(q, tasks));
+ worker(data, cb);
}
- q.process();
}
},
length: function () {
diff --git a/test/test-async.js b/test/test-async.js
index 14ea484..fa4f6c4 100755
--- a/test/test-async.js
+++ b/test/test-async.js
@@ -2878,59 +2878,39 @@ exports['queue'] = {
});
},
+// The original queue implementation allowed the concurrency to be changed only
+// on the same event loop during which a task was added to the queue. This
+// test attempts to be a more rubust test.
+// Start with a concurrency of 1. Wait until a leter event loop and change
+// the concurrency to 2. Wait again for a later loop then verify the concurrency.
+// Repeat that one more time by chaning the concurrency to 5.
'changing concurrency': function (test) {
- var call_order = [],
- delays = [40,20,60,20];
-
- // worker1: --1-2---3-4
- // order of completion: 1,2,3,4
-
- var q = async.queue(function (task, callback) {
- setTimeout(function () {
- call_order.push('process ' + task);
- callback('error', 'arg');
- }, delays.splice(0,1)[0]);
- }, 2);
-
- q.push(1, function (err, arg) {
- test.equal(err, 'error');
- test.equal(arg, 'arg');
- test.equal(q.length(), 3);
- call_order.push('callback ' + 1);
- });
- q.push(2, function (err, arg) {
- test.equal(err, 'error');
- test.equal(arg, 'arg');
- test.equal(q.length(), 2);
- call_order.push('callback ' + 2);
- });
- q.push(3, function (err, arg) {
- test.equal(err, 'error');
- test.equal(arg, 'arg');
- test.equal(q.length(), 1);
- call_order.push('callback ' + 3);
- });
- q.push(4, function (err, arg) {
- test.equal(err, 'error');
- test.equal(arg, 'arg');
- test.equal(q.length(), 0);
- call_order.push('callback ' + 4);
- });
- test.equal(q.length(), 4);
- test.equal(q.concurrency, 2);
- q.concurrency = 1;
-
- setTimeout(function () {
- test.same(call_order, [
- 'process 1', 'callback 1',
- 'process 2', 'callback 2',
- 'process 3', 'callback 3',
- 'process 4', 'callback 4'
- ]);
- test.equal(q.concurrency, 1);
- test.equal(q.length(), 0);
+
+ var q = async.queue(function(task, callback){
+ setTimeout(function(){
+ callback();
+ }, 100);
+ }, 1);
+
+ for(var i = 0; i < 50; i++){
+ q.push('');
+ }
+
+ q.drain = function(){
test.done();
- }, 250);
+ };
+
+ setTimeout(function(){
+ test.equal(q.concurrency, 1);
+ q.concurrency = 2;
+ setTimeout(function(){
+ test.equal(q.running(), 2);
+ q.concurrency = 5;
+ setTimeout(function(){
+ test.equal(q.running(), 5);
+ }, 500);
+ }, 500);
+ }, 500);
},
'push without callback': function (test) {