diff options
Diffstat (limited to 'lib')
-rw-r--r-- | lib/async.js | 336 |
1 files changed, 248 insertions, 88 deletions
diff --git a/lib/async.js b/lib/async.js index b93e178..4257f0d 100644 --- a/lib/async.js +++ b/lib/async.js @@ -5,16 +5,24 @@ * Copyright 2010-2014 Caolan McMahon * Released under the MIT license */ -/*jshint onevar: false, indent:4 */ -/*global setImmediate: false, setTimeout: false, console: false */ (function () { var async = {}; + var noop = function () {}; // global on the server, window in the browser var root, previous_async; - root = this; + if (typeof window == 'object' && this === window) { + root = window; + } + else if (typeof global == 'object' && this === global) { + root = global; + } + else { + root = this; + } + if (root != null) { previous_async = root.async; } @@ -30,7 +38,7 @@ if (called) throw new Error("Callback was already called."); called = true; fn.apply(root, arguments); - } + }; } //// cross-browser compatiblity functions //// @@ -42,36 +50,39 @@ }; var _each = function (arr, iterator) { - for (var i = 0; i < arr.length; i += 1) { - iterator(arr[i], i, arr); - } + var index = -1, + length = arr.length; + + while (++index < length) { + iterator(arr[index], index, arr); + } }; var _map = function (arr, iterator) { - if (arr.map) { - return arr.map(iterator); - } - var results = []; - _each(arr, function (x, i, a) { - results.push(iterator(x, i, a)); - }); - return results; + var index = -1, + length = arr.length, + result = Array(length); + + while (++index < length) { + result[index] = iterator(arr[index], index, arr); + } + return result; }; var _reduce = function (arr, iterator, memo) { - if (arr.reduce) { - return arr.reduce(iterator, memo); - } _each(arr, function (x, i, a) { memo = iterator(memo, x, i, a); }); return memo; }; - var _keys = function (obj) { - if (Object.keys) { - return Object.keys(obj); - } + var _forEachOf = function (object, iterator) { + _each(_keys(object), function (key) { + iterator(object[key], key); + }); + }; + + var _keys = Object.keys || function (obj) { var keys = []; for (var k in obj) { if (obj.hasOwnProperty(k)) { @@ -81,14 +92,38 @@ return keys; }; + var _baseSlice = function (arr, start) { + start = start || 0; + var index = -1; + var length = arr.length; + + if (start) { + length -= start; + length = length < 0 ? 0 : length; + } + var result = Array(length); + + while (++index < length) { + result[index] = arr[index + start]; + } + return result; + }; + //// exported async module functions //// //// nextTick implementation with browser-compatible fallback //// + + // capture the global reference to guard against fakeTimer mocks + var _setImmediate; + if (typeof setImmediate === 'function') { + _setImmediate = setImmediate; + } + if (typeof process === 'undefined' || !(process.nextTick)) { - if (typeof setImmediate === 'function') { + if (_setImmediate) { async.nextTick = function (fn) { // not a direct alias for IE10 compatibility - setImmediate(fn); + _setImmediate(fn); }; async.setImmediate = async.nextTick; } @@ -101,10 +136,10 @@ } else { async.nextTick = process.nextTick; - if (typeof setImmediate !== 'undefined') { + if (_setImmediate) { async.setImmediate = function (fn) { // not a direct alias for IE10 compatibility - setImmediate(fn); + _setImmediate(fn); }; } else { @@ -113,9 +148,9 @@ } async.each = function (arr, iterator, callback) { - callback = callback || function () {}; + callback = callback || noop; if (!arr.length) { - return callback(); + return callback(null); } var completed = 0; _each(arr, function (x) { @@ -124,12 +159,12 @@ function done(err) { if (err) { callback(err); - callback = function () {}; + callback = noop; } else { completed += 1; if (completed >= arr.length) { - callback(); + callback(null); } } } @@ -137,21 +172,21 @@ async.forEach = async.each; async.eachSeries = function (arr, iterator, callback) { - callback = callback || function () {}; + callback = callback || noop; if (!arr.length) { - return callback(); + return callback(null); } var completed = 0; var iterate = function () { iterator(arr[completed], function (err) { if (err) { callback(err); - callback = function () {}; + callback = noop; } else { completed += 1; if (completed >= arr.length) { - callback(); + callback(null); } else { iterate(); @@ -163,6 +198,7 @@ }; async.forEachSeries = async.eachSeries; + async.eachLimit = function (arr, limit, iterator, callback) { var fn = _eachLimit(limit); fn.apply(null, [arr, iterator, callback]); @@ -172,9 +208,9 @@ var _eachLimit = function (limit) { return function (arr, iterator, callback) { - callback = callback || function () {}; + callback = callback || noop; if (!arr.length || limit <= 0) { - return callback(); + return callback(null); } var completed = 0; var started = 0; @@ -183,7 +219,7 @@ (function replenish () { if (completed >= arr.length) { - return callback(); + return callback(null); } while (running < limit && started < arr.length && !errored) { @@ -192,14 +228,123 @@ iterator(arr[started - 1], function (err) { if (err) { callback(err); - callback = function () {}; errored = true; + callback = noop; } else { completed += 1; running -= 1; if (completed >= arr.length) { - callback(); + callback(null); + } + else { + replenish(); + } + } + }); + } + })(); + }; + }; + + + + async.forEachOf = async.eachOf = function (object, iterator, callback) { + callback = callback || function () {}; + var size = object.length || _keys(object).length; + var completed = 0; + if (!size) { + return callback(null); + } + _forEachOf(object, function (value, key) { + iterator(object[key], key, function (err) { + if (err) { + callback(err); + callback = function () {}; + } else { + completed += 1; + if (completed === size) { + callback(null); + } + } + }); + }); + }; + + async.forEachOfSeries = async.eachOfSeries = function (obj, iterator, callback) { + callback = callback || function () {}; + var keys = _keys(obj); + var size = keys.length; + if (!size) { + return callback(); + } + var completed = 0; + var iterate = function () { + var sync = true; + var key = keys[completed]; + iterator(obj[key], key, function (err) { + if (err) { + callback(err); + callback = function () {}; + } + else { + completed += 1; + if (completed >= size) { + callback(null); + } + else { + if (sync) { + async.nextTick(iterate); + } + else { + iterate(); + } + } + } + }); + sync = false; + }; + iterate(); + }; + + + + async.forEachOfLimit = async.eachOfLimit = function (obj, limit, iterator, callback) { + _forEachOfLimit(limit)(obj, iterator, callback); + }; + + var _forEachOfLimit = function (limit) { + + return function (obj, iterator, callback) { + callback = callback || function () {}; + var keys = _keys(obj); + var size = keys.length; + if (!size || limit <= 0) { + return callback(null); + } + var completed = 0; + var started = 0; + var running = 0; + + (function replenish () { + if (completed >= size) { + return callback(); + } + + while (running < limit && started < size) { + started += 1; + running += 1; + var key = keys[started - 1]; + iterator(obj[key], key, function (err) { + if (err) { + callback(err); + callback = function () {}; + } + else { + completed += 1; + running -= 1; + if (completed >= size) { + callback(null); } else { replenish(); @@ -214,19 +359,19 @@ var doParallel = function (fn) { return function () { - var args = Array.prototype.slice.call(arguments); + var args = _baseSlice(arguments); return fn.apply(null, [async.each].concat(args)); }; }; var doParallelLimit = function(limit, fn) { return function () { - var args = Array.prototype.slice.call(arguments); + var args = _baseSlice(arguments); return fn.apply(null, [_eachLimit(limit)].concat(args)); }; }; var doSeries = function (fn) { return function () { - var args = Array.prototype.slice.call(arguments); + var args = _baseSlice(arguments); return fn.apply(null, [async.eachSeries].concat(args)); }; }; @@ -273,7 +418,7 @@ callback(err); }); }, function (err) { - callback(err, memo); + callback(err || null, memo); }); }; // inject alias @@ -344,7 +489,7 @@ iterator(x, function (result) { if (result) { main_callback(x); - main_callback = function () {}; + main_callback = noop; } else { callback(); @@ -362,7 +507,7 @@ iterator(x, function (v) { if (v) { main_callback(true); - main_callback = function () {}; + main_callback = noop; } callback(); }); @@ -378,7 +523,7 @@ iterator(x, function (v) { if (!v) { main_callback(false); - main_callback = function () {}; + main_callback = noop; } callback(); }); @@ -416,11 +561,11 @@ }; async.auto = function (tasks, callback) { - callback = callback || function () {}; + callback = callback || noop; var keys = _keys(tasks); - var remainingTasks = keys.length + var remainingTasks = keys.length; if (!remainingTasks) { - return callback(); + return callback(null); } var results = {}; @@ -438,7 +583,7 @@ } }; var taskComplete = function () { - remainingTasks-- + remainingTasks--; _each(listeners.slice(0), function (fn) { fn(); }); @@ -448,7 +593,7 @@ if (!remainingTasks) { var theCallback = callback; // prevent final callback from calling itself if it errors - callback = function () {}; + callback = noop; theCallback(null, results); } @@ -457,7 +602,7 @@ _each(keys, function (k) { var task = _isArray(tasks[k]) ? tasks[k]: [tasks[k]]; var taskCallback = function (err) { - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); if (args.length <= 1) { args = args[0]; } @@ -469,7 +614,7 @@ safeResults[k] = args; callback(err, safeResults); // stop subsequent errors hitting callback multiple times - callback = function () {}; + callback = noop; } else { results[k] = args; @@ -477,6 +622,17 @@ } }; var requires = task.slice(0, Math.abs(task.length - 1)) || []; + // prevent dead-locks + var len = requires.length; + var dep; + while (len--) { + if (!(dep = tasks[requires[len]])) { + throw new Error('Has inexistant dependency'); + } + if (_isArray(dep) && !!~dep.indexOf(k)) { + throw new Error('Has cyclic dependencies'); + } + } var ready = function () { return _reduce(requires, function (a, x) { return (a && results.hasOwnProperty(x)); @@ -523,13 +679,13 @@ data = data[data.length - 1]; (wrappedCallback || callback)(data.err, data.result); }); - } + }; // If a callback is passed, run this as a controll flow - return callback ? wrappedTask() : wrappedTask + return callback ? wrappedTask() : wrappedTask; }; async.waterfall = function (tasks, callback) { - callback = callback || function () {}; + callback = callback || noop; if (!_isArray(tasks)) { var err = new Error('First argument to waterfall must be an array of functions'); return callback(err); @@ -541,10 +697,10 @@ return function (err) { if (err) { callback.apply(null, arguments); - callback = function () {}; + callback = noop; } else { - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); var next = iterator.next(); if (next) { args.push(wrapIterator(next)); @@ -562,12 +718,12 @@ }; var _parallel = function(eachfn, tasks, callback) { - callback = callback || function () {}; + callback = callback || noop; if (_isArray(tasks)) { eachfn.map(tasks, function (fn, callback) { if (fn) { fn(function (err) { - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); if (args.length <= 1) { args = args[0]; } @@ -580,7 +736,7 @@ var results = {}; eachfn.each(_keys(tasks), function (k, callback) { tasks[k](function (err) { - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); if (args.length <= 1) { args = args[0]; } @@ -602,12 +758,12 @@ }; async.series = function (tasks, callback) { - callback = callback || function () {}; + callback = callback || noop; if (_isArray(tasks)) { async.mapSeries(tasks, function (fn, callback) { if (fn) { fn(function (err) { - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); if (args.length <= 1) { args = args[0]; } @@ -620,7 +776,7 @@ var results = {}; async.eachSeries(_keys(tasks), function (k, callback) { tasks[k](function (err) { - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); if (args.length <= 1) { args = args[0]; } @@ -650,10 +806,10 @@ }; async.apply = function (fn) { - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); return function () { return fn.apply( - null, args.concat(Array.prototype.slice.call(arguments)) + null, args.concat(_baseSlice(arguments)) ); }; }; @@ -682,7 +838,7 @@ }); } else { - callback(); + callback(null); } }; @@ -691,12 +847,12 @@ if (err) { return callback(err); } - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); if (test.apply(null, args)) { async.doWhilst(iterator, test, callback); } else { - callback(); + callback(null); } }); }; @@ -711,7 +867,7 @@ }); } else { - callback(); + callback(null); } }; @@ -720,12 +876,12 @@ if (err) { return callback(err); } - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); if (!test.apply(null, args)) { async.doUntil(iterator, test, callback); } else { - callback(); + callback(null); } }); }; @@ -734,6 +890,9 @@ if (concurrency === undefined) { concurrency = 1; } + else if(concurrency === 0) { + throw new Error('Concurrency must not be zero'); + } function _insert(q, data, pos, callback) { if (!q.started){ q.started = true; @@ -741,7 +900,7 @@ if (!_isArray(data)) { data = [data]; } - if(data.length == 0) { + if(data.length === 0) { // call drain immediately if there are no tasks return async.setImmediate(function() { if (q.drain) { @@ -824,9 +983,10 @@ resume: function () { if (q.paused === false) { return; } q.paused = false; + var resumeCount = Math.min(q.concurrency, q.tasks.length); // Need to call q.process once per concurrent // worker to preserve full concurrency after pause - for (var w = 1; w <= q.concurrency; w++) { + for (var w = 1; w <= resumeCount; w++) { async.setImmediate(q.process); } } @@ -838,7 +998,7 @@ function _compareTasks(a, b){ return a.priority - b.priority; - }; + } function _binarySearch(sequence, item, compare) { var beg = -1, @@ -861,7 +1021,7 @@ if (!_isArray(data)) { data = [data]; } - if(data.length == 0) { + if(data.length === 0) { // call drain immediately if there are no tasks return async.setImmediate(function() { if (q.drain) { @@ -934,9 +1094,9 @@ return; } - var ts = typeof payload === 'number' - ? tasks.splice(0, payload) - : tasks.splice(0, tasks.length); + var ts = typeof payload === 'number' ? + tasks.splice(0, payload) : + tasks.splice(0, tasks.length); var ds = _map(ts, function (task) { return task.data; @@ -969,9 +1129,9 @@ var _console_fn = function (name) { return function (fn) { - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); fn.apply(null, args.concat([function (err) { - var args = Array.prototype.slice.call(arguments, 1); + var args = _baseSlice(arguments, 1); if (typeof console !== 'undefined') { if (err) { if (console.error) { @@ -1000,7 +1160,7 @@ return x; }; var memoized = function () { - var args = Array.prototype.slice.call(arguments); + var args = _baseSlice(arguments); var callback = args.pop(); var key = hasher.apply(null, args); if (key in memo) { @@ -1014,7 +1174,7 @@ else { queues[key] = [callback]; fn.apply(null, args.concat([function () { - memo[key] = arguments; + memo[key] = _baseSlice(arguments); var q = queues[key]; delete queues[key]; for (var i = 0, l = q.length; i < l; i++) { @@ -1054,14 +1214,14 @@ var fns = arguments; return function () { var that = this; - var args = Array.prototype.slice.call(arguments); + var args = _baseSlice(arguments); var callback = args.pop(); async.reduce(fns, args, function (newargs, fn, cb) { fn.apply(that, newargs.concat([function () { var err = arguments[0]; - var nextargs = Array.prototype.slice.call(arguments, 1); + var nextargs = _baseSlice(arguments, 1); cb(err, nextargs); - }])) + }])); }, function (err, results) { callback.apply(that, [err].concat(results)); @@ -1076,7 +1236,7 @@ var _applyEach = function (eachfn, fns /*args...*/) { var go = function () { var that = this; - var args = Array.prototype.slice.call(arguments); + var args = _baseSlice(arguments); var callback = args.pop(); return eachfn(fns, function (fn, cb) { fn.apply(that, args.concat([cb])); @@ -1084,7 +1244,7 @@ callback); }; if (arguments.length > 2) { - var args = Array.prototype.slice.call(arguments, 2); + var args = _baseSlice(arguments, 2); return go.apply(this, args); } else { |