From 7688e9846e6a87e4107b21ffb57a4492438c64f9 Mon Sep 17 00:00:00 2001 From: Alexander Early Date: Sun, 6 Mar 2016 16:52:30 -0800 Subject: refactor auto to not need a deferral --- lib/auto.js | 163 +++++++++++++++++++++++++++++------------------------ mocha_test/auto.js | 41 ++++++++------ 2 files changed, 113 insertions(+), 91 deletions(-) diff --git a/lib/auto.js b/lib/auto.js index ae3ecfc..22e2af2 100644 --- a/lib/auto.js +++ b/lib/auto.js @@ -1,8 +1,6 @@ 'use strict'; import arrayEach from 'lodash/_arrayEach'; -import arrayEvery from 'lodash/_arrayEvery'; -import baseHas from 'lodash/_baseHas'; import forOwn from 'lodash/forOwn'; import indexOf from 'lodash/indexOf'; import isArray from 'lodash/isArray'; @@ -10,57 +8,113 @@ import okeys from 'lodash/keys'; import noop from 'lodash/noop'; import once from 'lodash/once'; import rest from 'lodash/rest'; -import onlyOnce from './internal/onlyOnce'; -import setImmediate from './internal/setImmediate'; +import onlyOnce from './internal/onlyOnce'; export default function (tasks, concurrency, callback) { - if (typeof arguments[1] === 'function') { + if (typeof concurrency === 'function') { // concurrency is optional, shift the args. callback = concurrency; concurrency = null; } callback = once(callback || noop); var keys = okeys(tasks); - var remainingTasks = keys.length; - if (!remainingTasks) { + var numTasks = keys.length; + if (!numTasks) { return callback(null); } if (!concurrency) { - concurrency = remainingTasks; + concurrency = numTasks; } var results = {}; var runningTasks = 0; var hasError = false; - var listeners = []; + var listeners = {}; + + var readyTasks = []; + - function addListener(fn) { - listeners.unshift(fn); + forOwn(tasks, function (task, key) { + if (!isArray(task)) { + // no dependencies + enqueueTask(key, [task]); + return; + } + + var dependencies = task.slice(0, task.length - 1); + var remainingDependencies = dependencies.length; + + checkForDeadlocks(); + + function checkForDeadlocks() { + var len = dependencies.length; + var dep; + while (len--) { + if (!(dep = tasks[dependencies[len]])) { + throw new Error('async.auto task `' + key + + '` has non-existent dependency in ' + + dependencies.join(', ')); + } + if (isArray(dep) && indexOf(dep, key) >= 0) { + throw new Error('async.auto task `' + key + + '`Has cyclic dependencies'); + } + } + } + + arrayEach(dependencies, function (dependencyName) { + addListener(dependencyName, function () { + remainingDependencies--; + if (remainingDependencies === 0) { + enqueueTask(key, task); + } + }); + }); + }); + + processQueue(); + + + function enqueueTask(key, task) { + readyTasks.push(function () { + runTask(key, task); + }); } - function removeListener(fn) { - var idx = indexOf(listeners, fn); - if (idx >= 0) listeners.splice(idx, 1); + function processQueue() { + if (readyTasks.length === 0 && runningTasks === 0) { + return callback(null, results); + } + while(readyTasks.length && runningTasks < concurrency) { + var run = readyTasks.shift(); + run(); + } + } - function taskComplete() { - remainingTasks--; - arrayEach(listeners.slice(), function (fn) { + function addListener(taskName, fn) { + var taskListeners = listeners[taskName]; + if (!taskListeners) { + taskListeners = listeners[taskName] = []; + } + + taskListeners.push(fn); + } + + function taskComplete(taskName) { + var taskListeners = listeners[taskName] || []; + arrayEach(taskListeners, function (fn) { fn(); }); + processQueue(); } - addListener(function () { - if (!remainingTasks) { - callback(null, results); - } - }); - arrayEach(keys, function (k) { + function runTask(key, task) { if (hasError) return; - var task = isArray(tasks[k]) ? tasks[k]: [tasks[k]]; + var taskCallback = onlyOnce(rest(function(err, args) { runningTasks--; if (args.length <= 1) { @@ -71,66 +125,25 @@ export default function (tasks, concurrency, callback) { forOwn(results, function(val, rkey) { safeResults[rkey] = val; }); - safeResults[k] = args; + safeResults[key] = args; hasError = true; listeners = []; callback(err, safeResults); - } - else { - results[k] = args; - setImmediate(taskComplete); + } else { + results[key] = args; + taskComplete(key); } })); - var requires = task.slice(0, task.length - 1); - - checkForDeadlocks(); - - if (ready()) { - startNext(); + runningTasks++; + var taskFn = task[task.length - 1]; + if (task.length > 1) { + taskFn(results, taskCallback); } else { - addListener(listener); - } - - function checkForDeadlocks() { - var len = requires.length; - var dep; - while (len--) { - if (!(dep = tasks[requires[len]])) { - throw new Error('Has non-existent dependency in ' + - requires.join(', ')); - } - if (isArray(dep) && indexOf(dep, k) >= 0) { - throw new Error('Has cyclic dependencies'); - } - } - } - - function ready() { - return runningTasks < concurrency && - !baseHas(results, k) && - !hasError && - arrayEvery(requires, function (x) { - return baseHas(results, x); - }); + taskFn(taskCallback); } + } - function startNext() { - runningTasks++; - var taskFn = task[task.length - 1]; - if (requires.length > 0) { - taskFn(results, taskCallback); - } else { - taskFn(taskCallback); - } - } - function listener() { - if (ready()) { - removeListener(listener); - startNext(); - } - } - }); } diff --git a/mocha_test/auto.js b/mocha_test/auto.js index 6b96be4..1850a0c 100644 --- a/mocha_test/auto.js +++ b/mocha_test/auto.js @@ -40,7 +40,7 @@ describe('auto', function () { }, function(err){ expect(err).to.equal(null); - expect(callOrder).to.eql(['task2','task6','task3','task5','task1','task4']); + expect(callOrder).to.eql(['task2','task3','task6','task5','task1','task4']); done(); }); }); @@ -214,19 +214,8 @@ describe('auto', function () { // Issue 410 on github: https://github.com/caolan/async/issues/410 it('auto calls callback multiple times', function(done) { - if (process.browser) { - // node only test - return done(); - } var finalCallCount = 0; - var domain = require('domain').create(); - domain.on('error', function (e) { - // ignore test error - if (!e._test_error) { - return done(e); - } - }); - domain.run(function () { + try { async.auto({ task1: function(callback) { callback(null); }, task2: ['task1', function(results, callback) { callback(null); }] @@ -239,7 +228,11 @@ describe('auto', function () { e._test_error = true; throw e; }); - }); + } catch (e) { + if (!e._test_error) { + throw e; + } + } setTimeout(function () { expect(finalCallCount).to.equal(1); done(); @@ -293,7 +286,7 @@ describe('auto', function () { callback(null, 'task1'); }] }); - }).to.throw; + }).to.throw(); done(); }); @@ -308,7 +301,7 @@ describe('auto', function () { callback(null, 'task2'); }] }); - }).to.throw; + }).to.throw(); done(); }); @@ -356,4 +349,20 @@ describe('auto', function () { }).to.throw(); }); + it("should avoid unncecessary deferrals", function (done) { + var isSync = true; + + async.auto({ + step1: function (cb) { cb(null, 1); }, + step2: ["step1", function (results, cb) { + cb(); + }] + }, function () { + expect(isSync).to.equal(true); + done(); + }); + + isSync = false; + }); + }); -- cgit v1.2.1