summaryrefslogtreecommitdiff
path: root/lib
diff options
context:
space:
mode:
authorAlex Early <alexander.early@gmail.com>2016-03-09 13:11:57 -0800
committerAlex Early <alexander.early@gmail.com>2016-03-09 13:11:57 -0800
commit51202cad563a3a386c091eedd6184132756c1117 (patch)
tree5b3504ac0ae6cdfc000114889dff5489ab15eea9 /lib
parentb5ea2b1cd0d01a45aa155663808f0cdf46ff1fe9 (diff)
parent7688e9846e6a87e4107b21ffb57a4492438c64f9 (diff)
downloadasync-51202cad563a3a386c091eedd6184132756c1117.tar.gz
Merge pull request #1049 from caolan/auto-no-deferral
Refactor auto to not need a deferral
Diffstat (limited to 'lib')
-rw-r--r--lib/auto.js163
1 files changed, 88 insertions, 75 deletions
diff --git a/lib/auto.js b/lib/auto.js
index ae3ecfc..22e2af2 100644
--- a/lib/auto.js
+++ b/lib/auto.js
@@ -1,8 +1,6 @@
'use strict';
import arrayEach from 'lodash/_arrayEach';
-import arrayEvery from 'lodash/_arrayEvery';
-import baseHas from 'lodash/_baseHas';
import forOwn from 'lodash/forOwn';
import indexOf from 'lodash/indexOf';
import isArray from 'lodash/isArray';
@@ -10,57 +8,113 @@ import okeys from 'lodash/keys';
import noop from 'lodash/noop';
import once from 'lodash/once';
import rest from 'lodash/rest';
-import onlyOnce from './internal/onlyOnce';
-import setImmediate from './internal/setImmediate';
+import onlyOnce from './internal/onlyOnce';
export default function (tasks, concurrency, callback) {
- if (typeof arguments[1] === 'function') {
+ if (typeof concurrency === 'function') {
// concurrency is optional, shift the args.
callback = concurrency;
concurrency = null;
}
callback = once(callback || noop);
var keys = okeys(tasks);
- var remainingTasks = keys.length;
- if (!remainingTasks) {
+ var numTasks = keys.length;
+ if (!numTasks) {
return callback(null);
}
if (!concurrency) {
- concurrency = remainingTasks;
+ concurrency = numTasks;
}
var results = {};
var runningTasks = 0;
var hasError = false;
- var listeners = [];
+ var listeners = {};
+
+ var readyTasks = [];
+
- function addListener(fn) {
- listeners.unshift(fn);
+ forOwn(tasks, function (task, key) {
+ if (!isArray(task)) {
+ // no dependencies
+ enqueueTask(key, [task]);
+ return;
+ }
+
+ var dependencies = task.slice(0, task.length - 1);
+ var remainingDependencies = dependencies.length;
+
+ checkForDeadlocks();
+
+ function checkForDeadlocks() {
+ var len = dependencies.length;
+ var dep;
+ while (len--) {
+ if (!(dep = tasks[dependencies[len]])) {
+ throw new Error('async.auto task `' + key +
+ '` has non-existent dependency in ' +
+ dependencies.join(', '));
+ }
+ if (isArray(dep) && indexOf(dep, key) >= 0) {
+ throw new Error('async.auto task `' + key +
+ '`Has cyclic dependencies');
+ }
+ }
+ }
+
+ arrayEach(dependencies, function (dependencyName) {
+ addListener(dependencyName, function () {
+ remainingDependencies--;
+ if (remainingDependencies === 0) {
+ enqueueTask(key, task);
+ }
+ });
+ });
+ });
+
+ processQueue();
+
+
+ function enqueueTask(key, task) {
+ readyTasks.push(function () {
+ runTask(key, task);
+ });
}
- function removeListener(fn) {
- var idx = indexOf(listeners, fn);
- if (idx >= 0) listeners.splice(idx, 1);
+ function processQueue() {
+ if (readyTasks.length === 0 && runningTasks === 0) {
+ return callback(null, results);
+ }
+ while(readyTasks.length && runningTasks < concurrency) {
+ var run = readyTasks.shift();
+ run();
+ }
+
}
- function taskComplete() {
- remainingTasks--;
- arrayEach(listeners.slice(), function (fn) {
+ function addListener(taskName, fn) {
+ var taskListeners = listeners[taskName];
+ if (!taskListeners) {
+ taskListeners = listeners[taskName] = [];
+ }
+
+ taskListeners.push(fn);
+ }
+
+ function taskComplete(taskName) {
+ var taskListeners = listeners[taskName] || [];
+ arrayEach(taskListeners, function (fn) {
fn();
});
+ processQueue();
}
- addListener(function () {
- if (!remainingTasks) {
- callback(null, results);
- }
- });
- arrayEach(keys, function (k) {
+ function runTask(key, task) {
if (hasError) return;
- var task = isArray(tasks[k]) ? tasks[k]: [tasks[k]];
+
var taskCallback = onlyOnce(rest(function(err, args) {
runningTasks--;
if (args.length <= 1) {
@@ -71,66 +125,25 @@ export default function (tasks, concurrency, callback) {
forOwn(results, function(val, rkey) {
safeResults[rkey] = val;
});
- safeResults[k] = args;
+ safeResults[key] = args;
hasError = true;
listeners = [];
callback(err, safeResults);
- }
- else {
- results[k] = args;
- setImmediate(taskComplete);
+ } else {
+ results[key] = args;
+ taskComplete(key);
}
}));
- var requires = task.slice(0, task.length - 1);
-
- checkForDeadlocks();
-
- if (ready()) {
- startNext();
+ runningTasks++;
+ var taskFn = task[task.length - 1];
+ if (task.length > 1) {
+ taskFn(results, taskCallback);
} else {
- addListener(listener);
- }
-
- function checkForDeadlocks() {
- var len = requires.length;
- var dep;
- while (len--) {
- if (!(dep = tasks[requires[len]])) {
- throw new Error('Has non-existent dependency in ' +
- requires.join(', '));
- }
- if (isArray(dep) && indexOf(dep, k) >= 0) {
- throw new Error('Has cyclic dependencies');
- }
- }
- }
-
- function ready() {
- return runningTasks < concurrency &&
- !baseHas(results, k) &&
- !hasError &&
- arrayEvery(requires, function (x) {
- return baseHas(results, x);
- });
+ taskFn(taskCallback);
}
+ }
- function startNext() {
- runningTasks++;
- var taskFn = task[task.length - 1];
- if (requires.length > 0) {
- taskFn(results, taskCallback);
- } else {
- taskFn(taskCallback);
- }
- }
- function listener() {
- if (ready()) {
- removeListener(listener);
- startNext();
- }
- }
- });
}