summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDaniel Bell <daniel.m.bell@gmail.com>2012-12-05 14:57:32 +1100
committerDaniel Bell <daniel.m.bell@gmail.com>2012-12-05 14:57:32 +1100
commitb0c55c580eaa9d8bed8f1fee827a92d6e0736906 (patch)
treeafe281bec7de6782aa0c717c17212ee1e9a95cef
parentd2903ab3e3530b72149e92d07c71334f981d3983 (diff)
downloadasync-b0c55c580eaa9d8bed8f1fee827a92d6e0736906.tar.gz
Added parallelLimit().
-rw-r--r--README.md18
-rw-r--r--lib/async.js27
-rw-r--r--test/test-async.js82
3 files changed, 118 insertions, 9 deletions
diff --git a/README.md b/README.md
index 8c52fd5..72a6c51 100644
--- a/README.md
+++ b/README.md
@@ -659,6 +659,24 @@ function(err, results) {
---------------------------------------
+<a name="parallel" />
+### parallelLimit(tasks, limit, [callback])
+
+The same as parallel only the tasks are executed in parallel with a maximum of "limit"
+tasks executing at any time.
+
+__Arguments__
+
+* tasks - An array or object containing functions to run, each function is passed a
+ callback it must call on completion.
+* limit - The maximum number of tasks to run at any time.
+* callback(err, results) - An optional callback to run once all the functions
+ have completed. This function gets an array of all the arguments passed to
+ the callbacks used in the array.
+
+
+---------------------------------------
+
<a name="whilst" />
### whilst(test, fn, callback)
diff --git a/lib/async.js b/lib/async.js
index 2de4f24..c4cc394 100644
--- a/lib/async.js
+++ b/lib/async.js
@@ -169,6 +169,12 @@
return fn.apply(null, [async.forEach].concat(args));
};
};
+ var doParallelLimit = function(limit, fn) {
+ return function () {
+ var args = Array.prototype.slice.call(arguments);
+ return fn.apply(null, [_forEachLimit(limit)].concat(args));
+ };
+ };
var doSeries = function (fn) {
return function () {
var args = Array.prototype.slice.call(arguments);
@@ -193,12 +199,13 @@
};
async.map = doParallel(_asyncMap);
async.mapSeries = doSeries(_asyncMap);
-
async.mapLimit = function (arr, limit, iterator, callback) {
- var fn = _forEachLimit(limit);
- return _asyncMap.apply(null, [fn, arr, iterator, callback]);
+ return _mapLimit(limit)(arr, iterator, callback);
};
+ var _mapLimit = function(limit) {
+ return doParallelLimit(limit, _asyncMap);
+ };
// reduce only has a series version, as doing reduce in parallel won't
// work in many situations.
@@ -452,10 +459,10 @@
wrapIterator(async.iterator(tasks))();
};
- async.parallel = function (tasks, callback) {
+ var _parallel = function(eachfn, tasks, callback) {
callback = callback || function () {};
if (tasks.constructor === Array) {
- async.map(tasks, function (fn, callback) {
+ eachfn.map(tasks, function (fn, callback) {
if (fn) {
fn(function (err) {
var args = Array.prototype.slice.call(arguments, 1);
@@ -469,7 +476,7 @@
}
else {
var results = {};
- async.forEach(_keys(tasks), function (k, callback) {
+ eachfn.forEach(_keys(tasks), function (k, callback) {
tasks[k](function (err) {
var args = Array.prototype.slice.call(arguments, 1);
if (args.length <= 1) {
@@ -484,6 +491,14 @@
}
};
+ async.parallel = function (tasks, callback) {
+ _parallel({ map: async.map, forEach: async.forEach }, tasks, callback);
+ };
+
+ async.parallelLimit = function(tasks, limit, callback) {
+ _parallel({ map: _mapLimit(limit), forEach: _forEachLimit(limit) }, tasks, callback);
+ };
+
async.series = function (tasks, callback) {
callback = callback || function () {};
if (tasks.constructor === Array) {
diff --git a/test/test-async.js b/test/test-async.js
index 8706717..e4d5dbe 100644
--- a/test/test-async.js
+++ b/test/test-async.js
@@ -393,6 +393,82 @@ exports['parallel object'] = function(test){
});
};
+exports['parallel limit'] = function(test){
+ var call_order = [];
+ async.parallelLimit([
+ function(callback){
+ setTimeout(function(){
+ call_order.push(1);
+ callback(null, 1);
+ }, 50);
+ },
+ function(callback){
+ setTimeout(function(){
+ call_order.push(2);
+ callback(null, 2);
+ }, 100);
+ },
+ function(callback){
+ setTimeout(function(){
+ call_order.push(3);
+ callback(null, 3,3);
+ }, 25);
+ }
+ ],
+ 2,
+ function(err, results){
+ test.equals(err, null);
+ test.same(call_order, [1,3,2]);
+ test.same(results, [1,2,[3,3]]);
+ test.done();
+ });
+};
+
+exports['parallel limit empty array'] = function(test){
+ async.parallelLimit([], 2, function(err, results){
+ test.equals(err, null);
+ test.same(results, []);
+ test.done();
+ });
+};
+
+exports['parallel limit error'] = function(test){
+ async.parallelLimit([
+ function(callback){
+ callback('error', 1);
+ },
+ function(callback){
+ callback('error2', 2);
+ }
+ ],
+ 1,
+ function(err, results){
+ test.equals(err, 'error');
+ });
+ setTimeout(test.done, 100);
+};
+
+exports['parallel limit no callback'] = function(test){
+ async.parallelLimit([
+ function(callback){callback();},
+ function(callback){callback(); test.done();},
+ ], 1);
+};
+
+exports['parallel limit object'] = function(test){
+ var call_order = [];
+ async.parallelLimit(getFunctionsObject(call_order), 2, function(err, results){
+ test.equals(err, null);
+ test.same(call_order, [1,3,2]);
+ test.same(results, {
+ one: 1,
+ two: 2,
+ three: [3,3]
+ });
+ test.done();
+ });
+};
+
exports['series'] = function(test){
var call_order = [];
async.series([
@@ -733,9 +809,9 @@ exports['mapSeries error'] = function(test){
exports['mapLimit'] = function(test){
var call_order = [];
- async.mapLimit([1,3,2], 2, mapIterator.bind(this, call_order), function(err, results){
- test.same(call_order, [1,3,2]);
- test.same(results, [2,6,4]);
+ async.mapLimit([2,4,3], 2, mapIterator.bind(this, call_order), function(err, results){
+ test.same(call_order, [2,4,3]);
+ test.same(results, [4,8,6]);
test.done();
});
};