summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlex Early <alexander.early@gmail.com>2018-07-01 17:12:06 -0700
committerGitHub <noreply@github.com>2018-07-01 17:12:06 -0700
commit53f613039af68353371c2953446fa8084b3fc86b (patch)
treebd6d4e25cc30c6321c7b0810d86e1049b43764eb
parent2a135a422f5da6ab6c8d242b2428828bb51eb1f8 (diff)
downloadasync-53f613039af68353371c2953446fa8084b3fc86b.tar.gz
feat: Canceling flows (#1542)
* cancelable foreach * cancelable waterfall * cancellable auto * fix lint * fix tests * cancelable whilst/until/during/forever * fix waterfall test. It WILL get there * docs * auto should not start other tasks once canceled * simplify waterfall, add test for arrays * simplify eachOf * cancelable retry * cancelable eachOf for arrays * revert test tweak
-rw-r--r--intro.md34
-rw-r--r--lib/auto.js8
-rw-r--r--lib/doDuring.js2
-rw-r--r--lib/doWhilst.js1
-rw-r--r--lib/during.js2
-rw-r--r--lib/eachOf.js7
-rw-r--r--lib/forever.js1
-rw-r--r--lib/internal/eachOfLimit.js6
-rw-r--r--lib/retry.js1
-rw-r--r--lib/tryEach.js2
-rw-r--r--lib/waterfall.js1
-rw-r--r--lib/whilst.js1
-rw-r--r--test/auto.js59
-rw-r--r--test/during.js48
-rw-r--r--test/eachOf.js107
-rw-r--r--test/forever.js13
-rw-r--r--test/parallel.js26
-rw-r--r--test/retry.js12
-rw-r--r--test/until.js32
-rw-r--r--test/waterfall.js26
-rw-r--r--test/whilst.js32
21 files changed, 414 insertions, 7 deletions
diff --git a/intro.md b/intro.md
index 43d3c81..2470954 100644
--- a/intro.md
+++ b/intro.md
@@ -173,6 +173,40 @@ async.map([1, 2, 3], AsyncSquaringLibrary.square.bind(AsyncSquaringLibrary), fun
});
```
+### Subtle Memory Leaks
+
+There are cases where you might want to exit early from async flow, when calling an Async method inside another async function:
+
+```javascript
+function myFunction (args, outerCallback) {
+ async.waterfall([
+ //...
+ function (arg, next) {
+ if (someImportantCondition()) {
+ return outerCallback(null)
+ }
+ },
+ function (arg, next) {/*...*/}
+ ], function done (err) {
+ //...
+ })
+}
+```
+
+Something happened in a waterfall where you want to skip the rest of the execution, so you call an outer callack. However, Async will still wait for that inner `next` callback to be called, leaving some closure scope allocated.
+
+As of version 3.0, you can call any Async callback with `false` as the `error` argument, and the rest of the execution of the Async method will be stopped or ignored.
+
+```javascript
+ function (arg, next) {
+ if (someImportantCondition()) {
+ outerCallback(null)
+ return next(false) // ← signal that you called an outer callback
+ }
+ },
+```
+
+
## Download
The source is available for download from
diff --git a/lib/auto.js b/lib/auto.js
index a6150aa..500620a 100644
--- a/lib/auto.js
+++ b/lib/auto.js
@@ -102,6 +102,7 @@ export default function (tasks, concurrency, callback) {
var results = {};
var runningTasks = 0;
+ var canceled = false;
var hasError = false;
var listeners = Object.create(null);
@@ -156,6 +157,7 @@ export default function (tasks, concurrency, callback) {
}
function processQueue() {
+ if (canceled) return
if (readyTasks.length === 0 && runningTasks === 0) {
return callback(null, results);
}
@@ -189,6 +191,10 @@ export default function (tasks, concurrency, callback) {
var taskCallback = onlyOnce(function(err, result) {
runningTasks--;
+ if (err === false) {
+ canceled = true
+ return
+ }
if (arguments.length > 2) {
result = slice(arguments, 1);
}
@@ -200,7 +206,7 @@ export default function (tasks, concurrency, callback) {
safeResults[key] = result;
hasError = true;
listeners = Object.create(null);
-
+ if (canceled) return
callback(err, safeResults);
} else {
results[key] = result;
diff --git a/lib/doDuring.js b/lib/doDuring.js
index ac2a04c..5eb2b92 100644
--- a/lib/doDuring.js
+++ b/lib/doDuring.js
@@ -30,6 +30,7 @@ export default function doDuring(fn, test, callback) {
function next(err/*, ...args*/) {
if (err) return callback(err);
+ if (err === false) return;
var args = slice(arguments, 1);
args.push(check);
_test.apply(this, args);
@@ -37,6 +38,7 @@ export default function doDuring(fn, test, callback) {
function check(err, truth) {
if (err) return callback(err);
+ if (err === false) return;
if (!truth) return callback(null);
_fn(next);
}
diff --git a/lib/doWhilst.js b/lib/doWhilst.js
index 3c2b865..5245820 100644
--- a/lib/doWhilst.js
+++ b/lib/doWhilst.js
@@ -31,6 +31,7 @@ export default function doWhilst(iteratee, test, callback) {
var _iteratee = wrapAsync(iteratee);
var next = function(err/*, ...args*/) {
if (err) return callback(err);
+ if (err === false) return;
var args = slice(arguments, 1);
if (test.apply(this, args)) return _iteratee(next);
callback.apply(null, [null].concat(args));
diff --git a/lib/during.js b/lib/during.js
index cda9979..02da045 100644
--- a/lib/during.js
+++ b/lib/during.js
@@ -45,11 +45,13 @@ export default function during(test, fn, callback) {
function next(err) {
if (err) return callback(err);
+ if (err === false) return;
_test(check);
}
function check(err, truth) {
if (err) return callback(err);
+ if (err === false) return;
if (!truth) return callback(null);
_fn(next);
}
diff --git a/lib/eachOf.js b/lib/eachOf.js
index bb9edfa..5dbeb81 100644
--- a/lib/eachOf.js
+++ b/lib/eachOf.js
@@ -12,12 +12,17 @@ function eachOfArrayLike(coll, iteratee, callback) {
callback = once(callback || noop);
var index = 0,
completed = 0,
- length = coll.length;
+ length = coll.length,
+ canceled = false;
if (length === 0) {
callback(null);
}
function iteratorCallback(err, value) {
+ if (err === false) {
+ canceled = true
+ }
+ if (canceled === true) return
if (err) {
callback(err);
} else if ((++completed === length) || value === breakLoop) {
diff --git a/lib/forever.js b/lib/forever.js
index 3753251..fb69adf 100644
--- a/lib/forever.js
+++ b/lib/forever.js
@@ -38,6 +38,7 @@ export default function forever(fn, errback) {
function next(err) {
if (err) return done(err);
+ if (err === false) return;
task(next);
}
next();
diff --git a/lib/internal/eachOfLimit.js b/lib/internal/eachOfLimit.js
index 6ce0762..ae153b1 100644
--- a/lib/internal/eachOfLimit.js
+++ b/lib/internal/eachOfLimit.js
@@ -14,15 +14,21 @@ export default function _eachOfLimit(limit) {
}
var nextElem = iterator(obj);
var done = false;
+ var canceled = false;
var running = 0;
var looping = false;
function iterateeCallback(err, value) {
+ if (canceled) return
running -= 1;
if (err) {
done = true;
callback(err);
}
+ else if (err === false) {
+ done = true;
+ canceled = true;
+ }
else if (value === breakLoop || (done && running <= 0)) {
done = true;
return callback(null);
diff --git a/lib/retry.js b/lib/retry.js
index 390fd55..ef04406 100644
--- a/lib/retry.js
+++ b/lib/retry.js
@@ -133,6 +133,7 @@ export default function retry(opts, task, callback) {
var attempt = 1;
function retryAttempt() {
_task(function(err) {
+ if (err === false) return
if (err && attempt++ < options.times &&
(typeof options.errorFilter != 'function' ||
options.errorFilter(err))) {
diff --git a/lib/tryEach.js b/lib/tryEach.js
index 87fba12..82649b4 100644
--- a/lib/tryEach.js
+++ b/lib/tryEach.js
@@ -52,7 +52,7 @@ export default function tryEach(tasks, callback) {
result = res;
}
error = err;
- callback(!err);
+ callback(err ? null : {});
});
}, function () {
callback(error, result);
diff --git a/lib/waterfall.js b/lib/waterfall.js
index 3e71d2c..ceaf59d 100644
--- a/lib/waterfall.js
+++ b/lib/waterfall.js
@@ -75,6 +75,7 @@ export default function(tasks, callback) {
}
function next(err/*, ...args*/) {
+ if (err === false) return
if (err || taskIndex === tasks.length) {
return callback.apply(null, arguments);
}
diff --git a/lib/whilst.js b/lib/whilst.js
index da748fa..32c3b52 100644
--- a/lib/whilst.js
+++ b/lib/whilst.js
@@ -44,6 +44,7 @@ export default function whilst(test, iteratee, callback) {
if (!test()) return callback(null);
var next = function(err/*, ...args*/) {
if (err) return callback(err);
+ if (err === false) return;
if (test()) return _iteratee(next);
var args = slice(arguments, 1);
callback.apply(null, [null].concat(args));
diff --git a/test/auto.js b/test/auto.js
index 489dbea..3b561cb 100644
--- a/test/auto.js
+++ b/test/auto.js
@@ -168,6 +168,63 @@ describe('auto', function () {
setTimeout(done, 100);
});
+ it('auto canceled', function(done){
+ const call_order = []
+ async.auto({
+ task1: function(callback){
+ call_order.push(1)
+ callback(false);
+ },
+ task2: ['task1', function(/*results, callback*/){
+ call_order.push(2)
+ throw new Error('task2 should not be called');
+ }],
+ task3: function(callback){
+ call_order.push(3)
+ callback('testerror2');
+ }
+ },
+ function(){
+ throw new Error('should not get here')
+ });
+ setTimeout(() => {
+ expect(call_order).to.eql([1, 3])
+ done()
+ }, 10);
+ });
+
+ it('does not start other tasks when it has been canceled', function(done) {
+ const call_order = []
+ debugger
+ async.auto({
+ task1: function(callback) {
+ call_order.push(1);
+ // defer calling task2, so task3 has time to stop execution
+ async.setImmediate(callback);
+ },
+ task2: ['task1', function( /*results, callback*/ ) {
+ call_order.push(2);
+ throw new Error('task2 should not be called');
+ }],
+ task3: function(callback) {
+ call_order.push(3);
+ callback(false);
+ },
+ task4: ['task3', function( /*results, callback*/ ) {
+ call_order.push(4);
+ throw new Error('task4 should not be called');
+ }]
+ },
+ function() {
+ throw new Error('should not get here')
+ });
+
+ setTimeout(() => {
+ expect(call_order).to.eql([1, 3])
+ done()
+ }, 25)
+ });
+
it('auto no callback', function(done){
async.auto({
task1: function(callback){callback();},
@@ -185,7 +242,7 @@ describe('auto', function () {
it('auto error should pass partial results', function(done) {
async.auto({
task1: function(callback){
- callback(false, 'result1');
+ callback(null, 'result1');
},
task2: ['task1', function(results, callback){
callback('testerror', 'result2');
diff --git a/test/during.js b/test/during.js
index 13db8b3..e0636e5 100644
--- a/test/during.js
+++ b/test/during.js
@@ -34,6 +34,22 @@ describe('during', function() {
);
});
+ it('during canceling', (done) => {
+ let counter = 0;
+ async.during(
+ cb => cb(null, true),
+ cb => {
+ counter++
+ cb(counter === 2 ? false : null);
+ },
+ () => { throw new Error('should not get here')}
+ );
+ setTimeout(() => {
+ expect(counter).to.equal(2);
+ done();
+ }, 10)
+ })
+
it('doDuring', function(done) {
var call_order = [];
@@ -95,4 +111,36 @@ describe('during', function() {
}
);
});
+
+ it('doDuring canceling', (done) => {
+ let counter = 0;
+ async.doDuring(
+ cb => {
+ counter++
+ cb(counter === 2 ? false : null);
+ },
+ cb => cb(null, true),
+ () => { throw new Error('should not get here')}
+ );
+ setTimeout(() => {
+ expect(counter).to.equal(2);
+ done();
+ }, 10)
+ })
+
+ it('doDuring canceling in test', (done) => {
+ let counter = 0;
+ async.doDuring(
+ cb => {
+ counter++
+ cb(null, counter);
+ },
+ (n, cb) => cb(n === 2 ? false : null, true),
+ () => { throw new Error('should not get here')}
+ );
+ setTimeout(() => {
+ expect(counter).to.equal(2);
+ done();
+ }, 10)
+ })
});
diff --git a/test/eachOf.js b/test/eachOf.js
index dd73c22..7d80990 100644
--- a/test/eachOf.js
+++ b/test/eachOf.js
@@ -403,4 +403,111 @@ describe("eachOf", function() {
done();
});
});
+
+ it('forEachOfLimit canceled', function(done) {
+ var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 };
+ var call_order = [];
+
+ async.forEachOfLimit(obj, 3, function(value, key, callback){
+ call_order.push(value, key);
+ if (value === 2) {
+ return callback(false);
+ }
+ callback()
+ }, function(){
+ throw new Error('should not get here')
+ });
+ setTimeout(() => {
+ expect(call_order).to.eql([ 1, "a", 2, "b" ]);
+ done()
+ }, 10);
+ });
+
+ it('forEachOfLimit canceled (async)', function(done) {
+ var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 };
+ var call_order = [];
+
+ async.forEachOfLimit(obj, 3, function(value, key, callback){
+ call_order.push(value, key);
+ setTimeout(() => {
+ if (value === 2) {
+ return callback(false);
+ }
+ callback()
+ })
+ }, function(){
+ throw new Error('should not get here')
+ });
+ setTimeout(() => {
+ expect(call_order).to.eql([ 1, "a", 2, "b", 3, "c", 4, "d" ]);
+ done()
+ }, 20);
+ });
+
+ it('eachOfLimit canceled (async, array)', function(done) {
+ var obj = ['a', 'b', 'c', 'd', 'e'];
+ var call_order = [];
+
+ async.eachOfLimit(obj, 3, function(value, key, callback){
+ call_order.push(key, value);
+ setTimeout(() => {
+ if (value === 'b') {
+ return callback(false);
+ }
+ callback()
+ })
+ }, function(){
+ throw new Error('should not get here')
+ });
+ setTimeout(() => {
+ expect(call_order).to.eql([ 0, "a", 1, "b", 2, "c", 3, "d" ]);
+ done()
+ }, 20);
+ });
+
+ it('eachOf canceled (async, array)', function(done) {
+ var arr = ['a', 'b', 'c', 'd', 'e'];
+ var call_order = [];
+
+ async.eachOf(arr, function(value, key, callback){
+ call_order.push(key, value);
+ setTimeout(() => {
+ if (value === 'b') {
+ return callback(false);
+ }
+ callback()
+ })
+ }, function(){
+ throw new Error('should not get here')
+ });
+ setTimeout(() => {
+ expect(call_order).to.eql([ 0, "a", 1, "b", 2, "c", 3, "d", 4, "e" ]);
+ done()
+ }, 20);
+ });
+
+ it('forEachOfLimit canceled (async, w/ error)', function(done) {
+ var obj = { a: 1, b: 2, c: 3, d: 4, e: 5 };
+ var call_order = [];
+
+ async.forEachOfLimit(obj, 3, function(value, key, callback){
+ call_order.push(value, key);
+ setTimeout(() => {
+ if (value === 2) {
+ return callback(false);
+ }
+ if (value === 3) {
+ return callback('fail');
+ }
+ callback()
+ })
+ }, function(){
+ throw new Error('should not get here')
+ });
+ setTimeout(() => {
+ expect(call_order).to.eql([ 1, "a", 2, "b", 3, "c", 4, "d" ]);
+ done()
+ }, 20);
+ });
+
});
diff --git a/test/forever.js b/test/forever.js
index e00a22d..b698399 100644
--- a/test/forever.js
+++ b/test/forever.js
@@ -38,5 +38,18 @@ describe('forever', function(){
done();
});
});
+
+ it('should cancel', (done) => {
+ var counter = 0;
+ async.forever(cb => {
+ counter++
+ cb(counter === 2 ? false : null)
+ }, () => { throw new Error('should not get here') })
+
+ setTimeout(() => {
+ expect(counter).to.eql(2)
+ done()
+ }, 10)
+ })
});
});
diff --git a/test/parallel.js b/test/parallel.js
index 2b07822..c7de888 100644
--- a/test/parallel.js
+++ b/test/parallel.js
@@ -191,6 +191,32 @@ describe('parallel', function() {
});
});
+ it('parallel limit canceled', function(done) {
+ const call_order = []
+ async.parallelLimit([
+ function(callback){
+ call_order.push(1)
+ callback();
+ },
+ function(callback){
+ call_order.push(2)
+ callback(false);
+ },
+ function(callback){
+ call_order.push(3)
+ callback('error', 2);
+ }
+ ],
+ 1,
+ function(){
+ throw new Error('should not get here')
+ });
+ setTimeout(() => {
+ expect(call_order).to.eql([1, 2]);
+ done()
+ }, 25);
+ });
+
it('parallel call in another context @nycinvalid @nodeonly', function(done) {
var vm = require('vm');
var sandbox = {
diff --git a/test/retry.js b/test/retry.js
index 8a9f4da..d5e0d40 100644
--- a/test/retry.js
+++ b/test/retry.js
@@ -122,6 +122,18 @@ describe("retry", function () {
}, 50);
});
+ it("should be cancelable", function (done) {
+ var calls = 0;
+ async.retry(2, function(cb) {
+ calls++;
+ cb(calls > 1 ? false : 'fail');
+ }, () => { throw new Error('should not get here') });
+ setTimeout(function () {
+ expect(calls).to.equal(2);
+ done();
+ }, 10);
+ });
+
it('retry does not precompute the intervals (#1226)', function(done) {
var callTimes = [];
function intervalFunc() {
diff --git a/test/until.js b/test/until.js
index e98b184..2b82fe1 100644
--- a/test/until.js
+++ b/test/until.js
@@ -34,6 +34,22 @@ describe('until', function(){
);
});
+ it('until canceling', (done) => {
+ let counter = 0;
+ async.until(
+ () => false,
+ cb => {
+ counter++
+ cb(counter === 2 ? false: null);
+ },
+ () => { throw new Error('should not get here')}
+ );
+ setTimeout(() => {
+ expect(counter).to.equal(2);
+ done();
+ }, 10)
+ })
+
it('doUntil', function(done) {
var call_order = [];
var count = 0;
@@ -92,4 +108,20 @@ describe('until', function(){
}
);
});
+
+ it('doUntil canceling', (done) => {
+ let counter = 0;
+ async.doUntil(
+ cb => {
+ counter++
+ cb(counter === 2 ? false: null);
+ },
+ () => false,
+ () => { throw new Error('should not get here')}
+ );
+ setTimeout(() => {
+ expect(counter).to.equal(2);
+ done();
+ }, 10)
+ })
});
diff --git a/test/waterfall.js b/test/waterfall.js
index 78b11bb..1501f03 100644
--- a/test/waterfall.js
+++ b/test/waterfall.js
@@ -90,6 +90,28 @@ describe("waterfall", function () {
});
});
+
+ it('canceled', function(done){
+ const call_order = []
+ async.waterfall([
+ function(callback){
+ call_order.push(1)
+ callback(false);
+ },
+ function(callback){
+ call_order.push(2)
+ assert(false, 'next function should not be called');
+ callback();
+ }
+ ], function(){
+ throw new Error('should not get here')
+ });
+ setTimeout(() => {
+ expect(call_order).to.eql([1])
+ done()
+ }, 10)
+ });
+
it('multiple callback calls', function(){
var arr = [
function(callback){
@@ -131,9 +153,7 @@ describe("waterfall", function () {
function(arg1, arg2, callback){
setTimeout(callback, 15, null, arg1, arg2, 'three');
}
- ], function () {
- throw new Error('should not get here')
- });
+ ]);
});
it('call in another context @nycinvalid @nodeonly', function(done) {
diff --git a/test/whilst.js b/test/whilst.js
index e04c2b7..2ce2f23 100644
--- a/test/whilst.js
+++ b/test/whilst.js
@@ -48,6 +48,22 @@ describe('whilst', function(){
done();
});
+ it('whilst canceling', function(done) {
+ var counter = 0;
+ async.whilst(
+ function () { return counter < 3; },
+ function (cb) {
+ counter++;
+ cb(counter === 2 ? false : null);
+ },
+ () => { throw new Error('should not get here')}
+ );
+ setTimeout(() => {
+ expect(counter).to.equal(2);
+ done();
+ }, 10)
+ });
+
it('doWhilst', function(done) {
var call_order = [];
@@ -122,4 +138,20 @@ describe('whilst', function(){
}
);
});
+
+ it('doWhilst canceling', (done) => {
+ let counter = 0;
+ async.doWhilst(
+ cb => {
+ counter++
+ cb(counter === 2 ? false : null);
+ },
+ () => true,
+ () => { throw new Error('should not get here')}
+ );
+ setTimeout(() => {
+ expect(counter).to.equal(2);
+ done();
+ }, 10)
+ })
});