diff options
author | Alex Early <alexander.early@gmail.com> | 2016-03-09 13:11:57 -0800 |
---|---|---|
committer | Alex Early <alexander.early@gmail.com> | 2016-03-09 13:11:57 -0800 |
commit | 51202cad563a3a386c091eedd6184132756c1117 (patch) | |
tree | 5b3504ac0ae6cdfc000114889dff5489ab15eea9 /lib | |
parent | b5ea2b1cd0d01a45aa155663808f0cdf46ff1fe9 (diff) | |
parent | 7688e9846e6a87e4107b21ffb57a4492438c64f9 (diff) | |
download | async-51202cad563a3a386c091eedd6184132756c1117.tar.gz |
Merge pull request #1049 from caolan/auto-no-deferral
Refactor auto to not need a deferral
Diffstat (limited to 'lib')
-rw-r--r-- | lib/auto.js | 163 |
1 files changed, 88 insertions, 75 deletions
diff --git a/lib/auto.js b/lib/auto.js index ae3ecfc..22e2af2 100644 --- a/lib/auto.js +++ b/lib/auto.js @@ -1,8 +1,6 @@ 'use strict'; import arrayEach from 'lodash/_arrayEach'; -import arrayEvery from 'lodash/_arrayEvery'; -import baseHas from 'lodash/_baseHas'; import forOwn from 'lodash/forOwn'; import indexOf from 'lodash/indexOf'; import isArray from 'lodash/isArray'; @@ -10,57 +8,113 @@ import okeys from 'lodash/keys'; import noop from 'lodash/noop'; import once from 'lodash/once'; import rest from 'lodash/rest'; -import onlyOnce from './internal/onlyOnce'; -import setImmediate from './internal/setImmediate'; +import onlyOnce from './internal/onlyOnce'; export default function (tasks, concurrency, callback) { - if (typeof arguments[1] === 'function') { + if (typeof concurrency === 'function') { // concurrency is optional, shift the args. callback = concurrency; concurrency = null; } callback = once(callback || noop); var keys = okeys(tasks); - var remainingTasks = keys.length; - if (!remainingTasks) { + var numTasks = keys.length; + if (!numTasks) { return callback(null); } if (!concurrency) { - concurrency = remainingTasks; + concurrency = numTasks; } var results = {}; var runningTasks = 0; var hasError = false; - var listeners = []; + var listeners = {}; + + var readyTasks = []; + - function addListener(fn) { - listeners.unshift(fn); + forOwn(tasks, function (task, key) { + if (!isArray(task)) { + // no dependencies + enqueueTask(key, [task]); + return; + } + + var dependencies = task.slice(0, task.length - 1); + var remainingDependencies = dependencies.length; + + checkForDeadlocks(); + + function checkForDeadlocks() { + var len = dependencies.length; + var dep; + while (len--) { + if (!(dep = tasks[dependencies[len]])) { + throw new Error('async.auto task `' + key + + '` has non-existent dependency in ' + + dependencies.join(', ')); + } + if (isArray(dep) && indexOf(dep, key) >= 0) { + throw new Error('async.auto task `' + key + + '`Has cyclic dependencies'); + } + } + } + + arrayEach(dependencies, function (dependencyName) { + addListener(dependencyName, function () { + remainingDependencies--; + if (remainingDependencies === 0) { + enqueueTask(key, task); + } + }); + }); + }); + + processQueue(); + + + function enqueueTask(key, task) { + readyTasks.push(function () { + runTask(key, task); + }); } - function removeListener(fn) { - var idx = indexOf(listeners, fn); - if (idx >= 0) listeners.splice(idx, 1); + function processQueue() { + if (readyTasks.length === 0 && runningTasks === 0) { + return callback(null, results); + } + while(readyTasks.length && runningTasks < concurrency) { + var run = readyTasks.shift(); + run(); + } + } - function taskComplete() { - remainingTasks--; - arrayEach(listeners.slice(), function (fn) { + function addListener(taskName, fn) { + var taskListeners = listeners[taskName]; + if (!taskListeners) { + taskListeners = listeners[taskName] = []; + } + + taskListeners.push(fn); + } + + function taskComplete(taskName) { + var taskListeners = listeners[taskName] || []; + arrayEach(taskListeners, function (fn) { fn(); }); + processQueue(); } - addListener(function () { - if (!remainingTasks) { - callback(null, results); - } - }); - arrayEach(keys, function (k) { + function runTask(key, task) { if (hasError) return; - var task = isArray(tasks[k]) ? tasks[k]: [tasks[k]]; + var taskCallback = onlyOnce(rest(function(err, args) { runningTasks--; if (args.length <= 1) { @@ -71,66 +125,25 @@ export default function (tasks, concurrency, callback) { forOwn(results, function(val, rkey) { safeResults[rkey] = val; }); - safeResults[k] = args; + safeResults[key] = args; hasError = true; listeners = []; callback(err, safeResults); - } - else { - results[k] = args; - setImmediate(taskComplete); + } else { + results[key] = args; + taskComplete(key); } })); - var requires = task.slice(0, task.length - 1); - - checkForDeadlocks(); - - if (ready()) { - startNext(); + runningTasks++; + var taskFn = task[task.length - 1]; + if (task.length > 1) { + taskFn(results, taskCallback); } else { - addListener(listener); - } - - function checkForDeadlocks() { - var len = requires.length; - var dep; - while (len--) { - if (!(dep = tasks[requires[len]])) { - throw new Error('Has non-existent dependency in ' + - requires.join(', ')); - } - if (isArray(dep) && indexOf(dep, k) >= 0) { - throw new Error('Has cyclic dependencies'); - } - } - } - - function ready() { - return runningTasks < concurrency && - !baseHas(results, k) && - !hasError && - arrayEvery(requires, function (x) { - return baseHas(results, x); - }); + taskFn(taskCallback); } + } - function startNext() { - runningTasks++; - var taskFn = task[task.length - 1]; - if (requires.length > 0) { - taskFn(results, taskCallback); - } else { - taskFn(taskCallback); - } - } - function listener() { - if (ready()) { - removeListener(listener); - startNext(); - } - } - }); } |