From b0c55c580eaa9d8bed8f1fee827a92d6e0736906 Mon Sep 17 00:00:00 2001 From: Daniel Bell Date: Wed, 5 Dec 2012 14:57:32 +1100 Subject: Added parallelLimit(). --- README.md | 18 ++++++++++++ lib/async.js | 27 ++++++++++++++---- test/test-async.js | 82 ++++++++++++++++++++++++++++++++++++++++++++++++++++-- 3 files changed, 118 insertions(+), 9 deletions(-) diff --git a/README.md b/README.md index 8c52fd5..72a6c51 100644 --- a/README.md +++ b/README.md @@ -657,6 +657,24 @@ function(err, results) { }); ``` +--------------------------------------- + + +### parallelLimit(tasks, limit, [callback]) + +The same as parallel only the tasks are executed in parallel with a maximum of "limit" +tasks executing at any time. + +__Arguments__ + +* tasks - An array or object containing functions to run, each function is passed a + callback it must call on completion. +* limit - The maximum number of tasks to run at any time. +* callback(err, results) - An optional callback to run once all the functions + have completed. This function gets an array of all the arguments passed to + the callbacks used in the array. + + --------------------------------------- diff --git a/lib/async.js b/lib/async.js index 2de4f24..c4cc394 100644 --- a/lib/async.js +++ b/lib/async.js @@ -169,6 +169,12 @@ return fn.apply(null, [async.forEach].concat(args)); }; }; + var doParallelLimit = function(limit, fn) { + return function () { + var args = Array.prototype.slice.call(arguments); + return fn.apply(null, [_forEachLimit(limit)].concat(args)); + }; + }; var doSeries = function (fn) { return function () { var args = Array.prototype.slice.call(arguments); @@ -193,12 +199,13 @@ }; async.map = doParallel(_asyncMap); async.mapSeries = doSeries(_asyncMap); - async.mapLimit = function (arr, limit, iterator, callback) { - var fn = _forEachLimit(limit); - return _asyncMap.apply(null, [fn, arr, iterator, callback]); + return _mapLimit(limit)(arr, iterator, callback); }; + var _mapLimit = function(limit) { + return doParallelLimit(limit, _asyncMap); + }; // reduce only has a series version, as doing reduce in parallel won't // work in many situations. @@ -452,10 +459,10 @@ wrapIterator(async.iterator(tasks))(); }; - async.parallel = function (tasks, callback) { + var _parallel = function(eachfn, tasks, callback) { callback = callback || function () {}; if (tasks.constructor === Array) { - async.map(tasks, function (fn, callback) { + eachfn.map(tasks, function (fn, callback) { if (fn) { fn(function (err) { var args = Array.prototype.slice.call(arguments, 1); @@ -469,7 +476,7 @@ } else { var results = {}; - async.forEach(_keys(tasks), function (k, callback) { + eachfn.forEach(_keys(tasks), function (k, callback) { tasks[k](function (err) { var args = Array.prototype.slice.call(arguments, 1); if (args.length <= 1) { @@ -484,6 +491,14 @@ } }; + async.parallel = function (tasks, callback) { + _parallel({ map: async.map, forEach: async.forEach }, tasks, callback); + }; + + async.parallelLimit = function(tasks, limit, callback) { + _parallel({ map: _mapLimit(limit), forEach: _forEachLimit(limit) }, tasks, callback); + }; + async.series = function (tasks, callback) { callback = callback || function () {}; if (tasks.constructor === Array) { diff --git a/test/test-async.js b/test/test-async.js index 8706717..e4d5dbe 100644 --- a/test/test-async.js +++ b/test/test-async.js @@ -393,6 +393,82 @@ exports['parallel object'] = function(test){ }); }; +exports['parallel limit'] = function(test){ + var call_order = []; + async.parallelLimit([ + function(callback){ + setTimeout(function(){ + call_order.push(1); + callback(null, 1); + }, 50); + }, + function(callback){ + setTimeout(function(){ + call_order.push(2); + callback(null, 2); + }, 100); + }, + function(callback){ + setTimeout(function(){ + call_order.push(3); + callback(null, 3,3); + }, 25); + } + ], + 2, + function(err, results){ + test.equals(err, null); + test.same(call_order, [1,3,2]); + test.same(results, [1,2,[3,3]]); + test.done(); + }); +}; + +exports['parallel limit empty array'] = function(test){ + async.parallelLimit([], 2, function(err, results){ + test.equals(err, null); + test.same(results, []); + test.done(); + }); +}; + +exports['parallel limit error'] = function(test){ + async.parallelLimit([ + function(callback){ + callback('error', 1); + }, + function(callback){ + callback('error2', 2); + } + ], + 1, + function(err, results){ + test.equals(err, 'error'); + }); + setTimeout(test.done, 100); +}; + +exports['parallel limit no callback'] = function(test){ + async.parallelLimit([ + function(callback){callback();}, + function(callback){callback(); test.done();}, + ], 1); +}; + +exports['parallel limit object'] = function(test){ + var call_order = []; + async.parallelLimit(getFunctionsObject(call_order), 2, function(err, results){ + test.equals(err, null); + test.same(call_order, [1,3,2]); + test.same(results, { + one: 1, + two: 2, + three: [3,3] + }); + test.done(); + }); +}; + exports['series'] = function(test){ var call_order = []; async.series([ @@ -733,9 +809,9 @@ exports['mapSeries error'] = function(test){ exports['mapLimit'] = function(test){ var call_order = []; - async.mapLimit([1,3,2], 2, mapIterator.bind(this, call_order), function(err, results){ - test.same(call_order, [1,3,2]); - test.same(results, [2,6,4]); + async.mapLimit([2,4,3], 2, mapIterator.bind(this, call_order), function(err, results){ + test.same(call_order, [2,4,3]); + test.same(results, [4,8,6]); test.done(); }); }; -- cgit v1.2.1