summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexander Early <alexander.early@gmail.com>2016-03-06 16:52:30 -0800
committerAlexander Early <alexander.early@gmail.com>2016-03-06 16:52:30 -0800
commit7688e9846e6a87e4107b21ffb57a4492438c64f9 (patch)
tree02932ea523ddd9ad29903dbb3ffd7cf8e02aa969
parent4d825639606b8db83c8e4c9c46b8e8771499071b (diff)
downloadasync-auto-no-deferral.tar.gz
refactor auto to not need a deferralauto-no-deferral
-rw-r--r--lib/auto.js163
-rw-r--r--mocha_test/auto.js41
2 files changed, 113 insertions, 91 deletions
diff --git a/lib/auto.js b/lib/auto.js
index ae3ecfc..22e2af2 100644
--- a/lib/auto.js
+++ b/lib/auto.js
@@ -1,8 +1,6 @@
'use strict';
import arrayEach from 'lodash/_arrayEach';
-import arrayEvery from 'lodash/_arrayEvery';
-import baseHas from 'lodash/_baseHas';
import forOwn from 'lodash/forOwn';
import indexOf from 'lodash/indexOf';
import isArray from 'lodash/isArray';
@@ -10,57 +8,113 @@ import okeys from 'lodash/keys';
import noop from 'lodash/noop';
import once from 'lodash/once';
import rest from 'lodash/rest';
-import onlyOnce from './internal/onlyOnce';
-import setImmediate from './internal/setImmediate';
+import onlyOnce from './internal/onlyOnce';
export default function (tasks, concurrency, callback) {
- if (typeof arguments[1] === 'function') {
+ if (typeof concurrency === 'function') {
// concurrency is optional, shift the args.
callback = concurrency;
concurrency = null;
}
callback = once(callback || noop);
var keys = okeys(tasks);
- var remainingTasks = keys.length;
- if (!remainingTasks) {
+ var numTasks = keys.length;
+ if (!numTasks) {
return callback(null);
}
if (!concurrency) {
- concurrency = remainingTasks;
+ concurrency = numTasks;
}
var results = {};
var runningTasks = 0;
var hasError = false;
- var listeners = [];
+ var listeners = {};
+
+ var readyTasks = [];
+
- function addListener(fn) {
- listeners.unshift(fn);
+ forOwn(tasks, function (task, key) {
+ if (!isArray(task)) {
+ // no dependencies
+ enqueueTask(key, [task]);
+ return;
+ }
+
+ var dependencies = task.slice(0, task.length - 1);
+ var remainingDependencies = dependencies.length;
+
+ checkForDeadlocks();
+
+ function checkForDeadlocks() {
+ var len = dependencies.length;
+ var dep;
+ while (len--) {
+ if (!(dep = tasks[dependencies[len]])) {
+ throw new Error('async.auto task `' + key +
+ '` has non-existent dependency in ' +
+ dependencies.join(', '));
+ }
+ if (isArray(dep) && indexOf(dep, key) >= 0) {
+ throw new Error('async.auto task `' + key +
+ '`Has cyclic dependencies');
+ }
+ }
+ }
+
+ arrayEach(dependencies, function (dependencyName) {
+ addListener(dependencyName, function () {
+ remainingDependencies--;
+ if (remainingDependencies === 0) {
+ enqueueTask(key, task);
+ }
+ });
+ });
+ });
+
+ processQueue();
+
+
+ function enqueueTask(key, task) {
+ readyTasks.push(function () {
+ runTask(key, task);
+ });
}
- function removeListener(fn) {
- var idx = indexOf(listeners, fn);
- if (idx >= 0) listeners.splice(idx, 1);
+ function processQueue() {
+ if (readyTasks.length === 0 && runningTasks === 0) {
+ return callback(null, results);
+ }
+ while(readyTasks.length && runningTasks < concurrency) {
+ var run = readyTasks.shift();
+ run();
+ }
+
}
- function taskComplete() {
- remainingTasks--;
- arrayEach(listeners.slice(), function (fn) {
+ function addListener(taskName, fn) {
+ var taskListeners = listeners[taskName];
+ if (!taskListeners) {
+ taskListeners = listeners[taskName] = [];
+ }
+
+ taskListeners.push(fn);
+ }
+
+ function taskComplete(taskName) {
+ var taskListeners = listeners[taskName] || [];
+ arrayEach(taskListeners, function (fn) {
fn();
});
+ processQueue();
}
- addListener(function () {
- if (!remainingTasks) {
- callback(null, results);
- }
- });
- arrayEach(keys, function (k) {
+ function runTask(key, task) {
if (hasError) return;
- var task = isArray(tasks[k]) ? tasks[k]: [tasks[k]];
+
var taskCallback = onlyOnce(rest(function(err, args) {
runningTasks--;
if (args.length <= 1) {
@@ -71,66 +125,25 @@ export default function (tasks, concurrency, callback) {
forOwn(results, function(val, rkey) {
safeResults[rkey] = val;
});
- safeResults[k] = args;
+ safeResults[key] = args;
hasError = true;
listeners = [];
callback(err, safeResults);
- }
- else {
- results[k] = args;
- setImmediate(taskComplete);
+ } else {
+ results[key] = args;
+ taskComplete(key);
}
}));
- var requires = task.slice(0, task.length - 1);
-
- checkForDeadlocks();
-
- if (ready()) {
- startNext();
+ runningTasks++;
+ var taskFn = task[task.length - 1];
+ if (task.length > 1) {
+ taskFn(results, taskCallback);
} else {
- addListener(listener);
- }
-
- function checkForDeadlocks() {
- var len = requires.length;
- var dep;
- while (len--) {
- if (!(dep = tasks[requires[len]])) {
- throw new Error('Has non-existent dependency in ' +
- requires.join(', '));
- }
- if (isArray(dep) && indexOf(dep, k) >= 0) {
- throw new Error('Has cyclic dependencies');
- }
- }
- }
-
- function ready() {
- return runningTasks < concurrency &&
- !baseHas(results, k) &&
- !hasError &&
- arrayEvery(requires, function (x) {
- return baseHas(results, x);
- });
+ taskFn(taskCallback);
}
+ }
- function startNext() {
- runningTasks++;
- var taskFn = task[task.length - 1];
- if (requires.length > 0) {
- taskFn(results, taskCallback);
- } else {
- taskFn(taskCallback);
- }
- }
- function listener() {
- if (ready()) {
- removeListener(listener);
- startNext();
- }
- }
- });
}
diff --git a/mocha_test/auto.js b/mocha_test/auto.js
index 6b96be4..1850a0c 100644
--- a/mocha_test/auto.js
+++ b/mocha_test/auto.js
@@ -40,7 +40,7 @@ describe('auto', function () {
},
function(err){
expect(err).to.equal(null);
- expect(callOrder).to.eql(['task2','task6','task3','task5','task1','task4']);
+ expect(callOrder).to.eql(['task2','task3','task6','task5','task1','task4']);
done();
});
});
@@ -214,19 +214,8 @@ describe('auto', function () {
// Issue 410 on github: https://github.com/caolan/async/issues/410
it('auto calls callback multiple times', function(done) {
- if (process.browser) {
- // node only test
- return done();
- }
var finalCallCount = 0;
- var domain = require('domain').create();
- domain.on('error', function (e) {
- // ignore test error
- if (!e._test_error) {
- return done(e);
- }
- });
- domain.run(function () {
+ try {
async.auto({
task1: function(callback) { callback(null); },
task2: ['task1', function(results, callback) { callback(null); }]
@@ -239,7 +228,11 @@ describe('auto', function () {
e._test_error = true;
throw e;
});
- });
+ } catch (e) {
+ if (!e._test_error) {
+ throw e;
+ }
+ }
setTimeout(function () {
expect(finalCallCount).to.equal(1);
done();
@@ -293,7 +286,7 @@ describe('auto', function () {
callback(null, 'task1');
}]
});
- }).to.throw;
+ }).to.throw();
done();
});
@@ -308,7 +301,7 @@ describe('auto', function () {
callback(null, 'task2');
}]
});
- }).to.throw;
+ }).to.throw();
done();
});
@@ -356,4 +349,20 @@ describe('auto', function () {
}).to.throw();
});
+ it("should avoid unncecessary deferrals", function (done) {
+ var isSync = true;
+
+ async.auto({
+ step1: function (cb) { cb(null, 1); },
+ step2: ["step1", function (results, cb) {
+ cb();
+ }]
+ }, function () {
+ expect(isSync).to.equal(true);
+ done();
+ });
+
+ isSync = false;
+ });
+
});