diff options
author | Graeme Yeates <yeatesgraeme@gmail.com> | 2015-12-23 15:50:39 -0500 |
---|---|---|
committer | Graeme Yeates <yeatesgraeme@gmail.com> | 2015-12-29 16:48:48 -0500 |
commit | 18e61d4f07f48604601f2effdbe2a7e188d14d4a (patch) | |
tree | d4761428f57ec80816bea109124fd836cdaf20a6 /lib/auto.js | |
parent | 7127b67f94a22247c36bf40e4f2685912e0f80e9 (diff) | |
download | async-18e61d4f07f48604601f2effdbe2a7e188d14d4a.tar.gz |
[WIP] modularization (#984)
Diffstat (limited to 'lib/auto.js')
-rw-r--r-- | lib/auto.js | 110 |
1 files changed, 110 insertions, 0 deletions
diff --git a/lib/auto.js b/lib/auto.js new file mode 100644 index 0000000..e48735c --- /dev/null +++ b/lib/auto.js @@ -0,0 +1,110 @@ +'use strict'; + +import arrayEach from 'lodash/internal/arrayEach'; +import forOwn from 'lodash/object/forOwn'; +import indexOf from 'lodash/array/indexOf'; +import isArray from 'lodash/lang/isArray'; +import keys from 'lodash/object/keys'; +import noop from 'lodash/utility/noop'; +import once from 'lodash/function/once'; +import restParam from 'lodash/function/restParam'; + +import reduce from './reduce'; +import setImmediate from './internal/setImmediate'; + +export default function auto(tasks, concurrency, cb) { + if (typeof arguments[1] === 'function') { + // concurrency is optional, shift the args. + cb = concurrency; + concurrency = null; + } + cb = once(cb || noop); + var okeys = keys(tasks); + var remainingTasks = okeys.length; + if (!remainingTasks) { + return cb(null); + } + if (!concurrency) { + concurrency = remainingTasks; + } + + var results = {}; + var runningTasks = 0; + + var listeners = []; + + function addListener(fn) { + listeners.unshift(fn); + } + + function removeListener(fn) { + var idx = indexOf(listeners, fn); + if (idx >= 0) listeners.splice(idx, 1); + } + + function taskComplete() { + remainingTasks--; + arrayEach(listeners, function(fn) { + fn(); + }); + } + + addListener(function() { + if (!remainingTasks) { + cb(null, results); + } + }); + + arrayEach(okeys, function(k) { + var task = isArray(tasks[k]) ? tasks[k] : [tasks[k]]; + var taskCallback = restParam(function(err, args) { + runningTasks--; + if (args.length <= 1) { + args = args[0]; + } + if (err) { + var safeResults = {}; + forOwn(results, function(val, rkey) { + safeResults[rkey] = val; + }); + safeResults[k] = args; + cb(err, safeResults); + } else { + results[k] = args; + setImmediate(taskComplete); + } + }); + var requires = task.slice(0, task.length - 1); + // prevent dead-locks + var len = requires.length; + var dep; + while (len--) { + if (!(dep = tasks[requires[len]])) { + throw new Error('Has inexistant dependency'); + } + if (isArray(dep) && indexOf(dep, k) >= 0) { + throw new Error('Has cyclic dependencies'); + } + } + + function ready() { + return runningTasks < concurrency && reduce(requires, function(a, x) { + return (a && results.hasOwnProperty(x)); + }, true) && !results.hasOwnProperty(k); + } + if (ready()) { + runningTasks++; + task[task.length - 1](taskCallback, results); + } else { + addListener(listener); + } + + function listener() { + if (ready()) { + runningTasks++; + removeListener(listener); + task[task.length - 1](taskCallback, results); + } + } + }); +} |