diff options
author | Daniel Bell <daniel.m.bell@gmail.com> | 2012-12-05 14:57:32 +1100 |
---|---|---|
committer | Daniel Bell <daniel.m.bell@gmail.com> | 2012-12-05 14:57:32 +1100 |
commit | b0c55c580eaa9d8bed8f1fee827a92d6e0736906 (patch) | |
tree | afe281bec7de6782aa0c717c17212ee1e9a95cef | |
parent | d2903ab3e3530b72149e92d07c71334f981d3983 (diff) | |
download | async-b0c55c580eaa9d8bed8f1fee827a92d6e0736906.tar.gz |
Added parallelLimit().
-rw-r--r-- | README.md | 18 | ||||
-rw-r--r-- | lib/async.js | 27 | ||||
-rw-r--r-- | test/test-async.js | 82 |
3 files changed, 118 insertions, 9 deletions
@@ -659,6 +659,24 @@ function(err, results) { --------------------------------------- +<a name="parallel" /> +### parallelLimit(tasks, limit, [callback]) + +The same as parallel only the tasks are executed in parallel with a maximum of "limit" +tasks executing at any time. + +__Arguments__ + +* tasks - An array or object containing functions to run, each function is passed a + callback it must call on completion. +* limit - The maximum number of tasks to run at any time. +* callback(err, results) - An optional callback to run once all the functions + have completed. This function gets an array of all the arguments passed to + the callbacks used in the array. + + +--------------------------------------- + <a name="whilst" /> ### whilst(test, fn, callback) diff --git a/lib/async.js b/lib/async.js index 2de4f24..c4cc394 100644 --- a/lib/async.js +++ b/lib/async.js @@ -169,6 +169,12 @@ return fn.apply(null, [async.forEach].concat(args)); }; }; + var doParallelLimit = function(limit, fn) { + return function () { + var args = Array.prototype.slice.call(arguments); + return fn.apply(null, [_forEachLimit(limit)].concat(args)); + }; + }; var doSeries = function (fn) { return function () { var args = Array.prototype.slice.call(arguments); @@ -193,12 +199,13 @@ }; async.map = doParallel(_asyncMap); async.mapSeries = doSeries(_asyncMap); - async.mapLimit = function (arr, limit, iterator, callback) { - var fn = _forEachLimit(limit); - return _asyncMap.apply(null, [fn, arr, iterator, callback]); + return _mapLimit(limit)(arr, iterator, callback); }; + var _mapLimit = function(limit) { + return doParallelLimit(limit, _asyncMap); + }; // reduce only has a series version, as doing reduce in parallel won't // work in many situations. @@ -452,10 +459,10 @@ wrapIterator(async.iterator(tasks))(); }; - async.parallel = function (tasks, callback) { + var _parallel = function(eachfn, tasks, callback) { callback = callback || function () {}; if (tasks.constructor === Array) { - async.map(tasks, function (fn, callback) { + eachfn.map(tasks, function (fn, callback) { if (fn) { fn(function (err) { var args = Array.prototype.slice.call(arguments, 1); @@ -469,7 +476,7 @@ } else { var results = {}; - async.forEach(_keys(tasks), function (k, callback) { + eachfn.forEach(_keys(tasks), function (k, callback) { tasks[k](function (err) { var args = Array.prototype.slice.call(arguments, 1); if (args.length <= 1) { @@ -484,6 +491,14 @@ } }; + async.parallel = function (tasks, callback) { + _parallel({ map: async.map, forEach: async.forEach }, tasks, callback); + }; + + async.parallelLimit = function(tasks, limit, callback) { + _parallel({ map: _mapLimit(limit), forEach: _forEachLimit(limit) }, tasks, callback); + }; + async.series = function (tasks, callback) { callback = callback || function () {}; if (tasks.constructor === Array) { diff --git a/test/test-async.js b/test/test-async.js index 8706717..e4d5dbe 100644 --- a/test/test-async.js +++ b/test/test-async.js @@ -393,6 +393,82 @@ exports['parallel object'] = function(test){ }); }; +exports['parallel limit'] = function(test){ + var call_order = []; + async.parallelLimit([ + function(callback){ + setTimeout(function(){ + call_order.push(1); + callback(null, 1); + }, 50); + }, + function(callback){ + setTimeout(function(){ + call_order.push(2); + callback(null, 2); + }, 100); + }, + function(callback){ + setTimeout(function(){ + call_order.push(3); + callback(null, 3,3); + }, 25); + } + ], + 2, + function(err, results){ + test.equals(err, null); + test.same(call_order, [1,3,2]); + test.same(results, [1,2,[3,3]]); + test.done(); + }); +}; + +exports['parallel limit empty array'] = function(test){ + async.parallelLimit([], 2, function(err, results){ + test.equals(err, null); + test.same(results, []); + test.done(); + }); +}; + +exports['parallel limit error'] = function(test){ + async.parallelLimit([ + function(callback){ + callback('error', 1); + }, + function(callback){ + callback('error2', 2); + } + ], + 1, + function(err, results){ + test.equals(err, 'error'); + }); + setTimeout(test.done, 100); +}; + +exports['parallel limit no callback'] = function(test){ + async.parallelLimit([ + function(callback){callback();}, + function(callback){callback(); test.done();}, + ], 1); +}; + +exports['parallel limit object'] = function(test){ + var call_order = []; + async.parallelLimit(getFunctionsObject(call_order), 2, function(err, results){ + test.equals(err, null); + test.same(call_order, [1,3,2]); + test.same(results, { + one: 1, + two: 2, + three: [3,3] + }); + test.done(); + }); +}; + exports['series'] = function(test){ var call_order = []; async.series([ @@ -733,9 +809,9 @@ exports['mapSeries error'] = function(test){ exports['mapLimit'] = function(test){ var call_order = []; - async.mapLimit([1,3,2], 2, mapIterator.bind(this, call_order), function(err, results){ - test.same(call_order, [1,3,2]); - test.same(results, [2,6,4]); + async.mapLimit([2,4,3], 2, mapIterator.bind(this, call_order), function(err, results){ + test.same(call_order, [2,4,3]); + test.same(results, [4,8,6]); test.done(); }); }; |