summaryrefslogtreecommitdiff
path: root/dist/async.js
diff options
context:
space:
mode:
authorAlexander Early <alexander.early@gmail.com>2019-05-19 19:27:01 -0700
committerAlexander Early <alexander.early@gmail.com>2019-05-19 19:27:01 -0700
commitbd881300c0f69b32a0113c3c9b955db87cdc8c8f (patch)
treeec01d0f02567d03293fe005c4bb712ac641fe73a /dist/async.js
parent3fa33dd70c786d98dc203ae8a0d4d434d418dbd6 (diff)
downloadasync-bd881300c0f69b32a0113c3c9b955db87cdc8c8f.tar.gz
Update built files
Diffstat (limited to 'dist/async.js')
-rw-r--r--dist/async.js465
1 files changed, 323 insertions, 142 deletions
diff --git a/dist/async.js b/dist/async.js
index 1e9b8bf..0f015bb 100644
--- a/dist/async.js
+++ b/dist/async.js
@@ -197,17 +197,39 @@
return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn;
}
- function applyEach(eachfn) {
- return function(fns, ...callArgs) {
- var go = initialParams(function(args, callback) {
+ // conditionally promisify a function.
+ // only return a promise if a callback is omitted
+ function awaitify (asyncFn, arity = asyncFn.length) {
+ if (!arity) throw new Error('arity is undefined')
+ function awaitable (...args) {
+ if (typeof args[arity - 1] === 'function') {
+ return asyncFn.apply(this, args)
+ }
+
+ return new Promise((resolve, reject) => {
+ args[arity - 1] = (err, ...cbArgs) => {
+ if (err) return reject(err)
+ resolve(cbArgs.length > 1 ? cbArgs : cbArgs[0]);
+ };
+ asyncFn.apply(this, args);
+ })
+ }
+
+ Object.defineProperty(awaitable, 'name', {
+ value: `awaitable(${asyncFn.name})`
+ });
+
+ return awaitable
+ }
+
+ function applyEach (eachfn) {
+ return function applyEach(fns, ...callArgs) {
+ const go = awaitify(function (callback) {
var that = this;
return eachfn(fns, (fn, cb) => {
- wrapAsync(fn).apply(that, args.concat(cb));
+ wrapAsync(fn).apply(that, callArgs.concat(cb));
}, callback);
});
- if (callArgs.length) {
- return go.apply(this, callArgs);
- }
return go;
};
}
@@ -426,31 +448,6 @@
};
};
- // conditionally promisify a function.
- // only return a promise if a callback is omitted
- function awaitify (asyncFn, arity = asyncFn.length) {
- if (!arity) throw new Error('arity is undefined')
- function awaitable (...args) {
- if (typeof args[arity - 1] === 'function') {
- return asyncFn.apply(this, args)
- }
-
- return new Promise((resolve, reject) => {
- args[arity - 1] = (err, ...cbArgs) => {
- if (err) return reject(err)
- resolve(cbArgs.length > 1 ? cbArgs : cbArgs[0]);
- };
- asyncFn.apply(this, args);
- })
- }
-
- Object.defineProperty(awaitable, 'name', {
- value: `awaitable(${asyncFn.name})`
- });
-
- return awaitable
- }
-
/**
* The same as [`eachOf`]{@link module:Collections.eachOf} but runs a maximum of `limit` async operations at a
* time.
@@ -977,7 +974,7 @@
}
var FN_ARGS = /^(?:async\s+)?(?:function)?\s*[^(]*\(\s*([^)]+)\s*\)(?:\s*{)/m;
- var ARROW_FN_ARGS = /^(?:async\s+)?\(?\s*([^)^=]+)\s*\)?(?:\s*=>)/m;
+ var ARROW_FN_ARGS = /^(?:async\s+)?(?:function\s+)?\(?\s*([^)^=]+)\s*\)?(?:\s*=>)/m;
var FN_ARG_SPLIT = /,/;
var FN_ARG = /(=.+)?(\s*)$/;
var STRIP_COMMENTS = /((\/\/.*$)|(\/\*[\s\S]*?\*\/))/mg;
@@ -1212,8 +1209,6 @@
dll.head = dll.tail = node;
}
- const noop = () => {};
-
function queue(worker, concurrency, payload) {
if (concurrency == null) {
concurrency = 1;
@@ -1225,6 +1220,35 @@
var _worker = wrapAsync(worker);
var numRunning = 0;
var workersList = [];
+ const events = {
+ error: [],
+ drain: [],
+ saturated: [],
+ unsaturated: [],
+ empty: []
+ };
+
+ function on (event, handler) {
+ events[event].push(handler);
+ }
+
+ function once (event, handler) {
+ const handleAndRemove = (...args) => {
+ off(event, handleAndRemove);
+ handler(...args);
+ };
+ events[event].push(handleAndRemove);
+ }
+
+ function off (event, handler) {
+ if (!event) return Object.keys(events).forEach(ev => events[ev] = [])
+ if (!handler) return events[event] = []
+ events[event] = events[event].filter(ev => ev !== handler);
+ }
+
+ function trigger (event, ...args) {
+ events[event].forEach(handler => handler(...args));
+ }
var processingScheduled = false;
function _insert(data, insertAtFront, callback) {
@@ -1232,25 +1256,32 @@
throw new Error('task callback must be a function');
}
q.started = true;
- if (!Array.isArray(data)) {
- data = [data];
- }
- if (data.length === 0 && q.idle()) {
- // call drain immediately if there are no tasks
- return setImmediate$1(() => q.drain());
+ if (Array.isArray(data)) {
+ if (data.length === 0 && q.idle()) {
+ // call drain immediately if there are no tasks
+ return setImmediate$1(() => trigger('drain'));
+ }
+
+ return data.map(datum => _insert(datum, insertAtFront, callback));
}
- for (var i = 0, l = data.length; i < l; i++) {
- var item = {
- data: data[i],
- callback: callback || noop
- };
+ var res;
- if (insertAtFront) {
- q._tasks.unshift(item);
- } else {
- q._tasks.push(item);
+ var item = {
+ data,
+ callback: callback || function (err, ...args) {
+ // we don't care about the error, let the global error handler
+ // deal with it
+ if (err) return
+ if (args.length <= 1) return res(args[0])
+ res(args);
}
+ };
+
+ if (insertAtFront) {
+ q._tasks.unshift(item);
+ } else {
+ q._tasks.push(item);
}
if (!processingScheduled) {
@@ -1260,9 +1291,15 @@
q.process();
});
}
+
+ if (!callback) {
+ return new Promise((resolve) => {
+ res = resolve;
+ })
+ }
}
- function _next(tasks) {
+ function _createCB(tasks) {
return function (err, ...args) {
numRunning -= 1;
@@ -1279,21 +1316,35 @@
task.callback(err, ...args);
if (err != null) {
- q.error(err, task.data);
+ trigger('error', err, task.data);
}
}
if (numRunning <= (q.concurrency - q.buffer) ) {
- q.unsaturated();
+ trigger('unsaturated');
}
if (q.idle()) {
- q.drain();
+ trigger('drain');
}
q.process();
};
}
+ const eventMethod = (name) => (handler) => {
+ if (!handler) {
+ return new Promise((resolve, reject) => {
+ once(name, (err, data) => {
+ if (err) return reject(err)
+ resolve(data);
+ });
+ })
+ }
+ off(name);
+ on(name, handler);
+
+ };
+
var isProcessing = false;
var q = {
_tasks: new DLL(),
@@ -1302,23 +1353,18 @@
},
concurrency,
payload,
- saturated: noop,
- unsaturated:noop,
buffer: concurrency / 4,
- empty: noop,
- drain: noop,
- error: noop,
started: false,
paused: false,
push (data, callback) {
- _insert(data, false, callback);
+ return _insert(data, false, callback);
},
kill () {
- q.drain = noop;
+ off();
q._tasks.empty();
},
unshift (data, callback) {
- _insert(data, true, callback);
+ return _insert(data, true, callback);
},
remove (testFn) {
q._tasks.remove(testFn);
@@ -1344,14 +1390,14 @@
numRunning += 1;
if (q._tasks.length === 0) {
- q.empty();
+ trigger('empty');
}
if (numRunning === q.concurrency) {
- q.saturated();
+ trigger('saturated');
}
- var cb = onlyOnce(_next(tasks));
+ var cb = onlyOnce(_createCB(tasks));
_worker(data, cb);
}
isProcessing = false;
@@ -1377,40 +1423,33 @@
setImmediate$1(q.process);
}
};
+ // define these as fixed properties, so people get useful errors when updating
+ Object.defineProperties(q, {
+ saturated: {
+ writable: false,
+ value: eventMethod('saturated')
+ },
+ unsaturated: {
+ writable: false,
+ value: eventMethod('unsaturated')
+ },
+ empty: {
+ writable: false,
+ value: eventMethod('empty')
+ },
+ drain: {
+ writable: false,
+ value: eventMethod('drain')
+ },
+ error: {
+ writable: false,
+ value: eventMethod('error')
+ },
+ });
return q;
}
/**
- * A cargo of tasks for the worker function to complete. Cargo inherits all of
- * the same methods and event callbacks as [`queue`]{@link module:ControlFlow.queue}.
- * @typedef {Object} CargoObject
- * @memberOf module:ControlFlow
- * @property {Function} length - A function returning the number of items
- * waiting to be processed. Invoke like `cargo.length()`.
- * @property {number} payload - An `integer` for determining how many tasks
- * should be process per round. This property can be changed after a `cargo` is
- * created to alter the payload on-the-fly.
- * @property {Function} push - Adds `task` to the `queue`. The callback is
- * called once the `worker` has finished processing the task. Instead of a
- * single task, an array of `tasks` can be submitted. The respective callback is
- * used for every task in the list. Invoke like `cargo.push(task, [callback])`.
- * @property {Function} saturated - A callback that is called when the
- * `queue.length()` hits the concurrency and further tasks will be queued.
- * @property {Function} empty - A callback that is called when the last item
- * from the `queue` is given to a `worker`.
- * @property {Function} drain - A callback that is called when the last item
- * from the `queue` has returned from the `worker`.
- * @property {Function} idle - a function returning false if there are items
- * waiting or being processed, or true if not. Invoke like `cargo.idle()`.
- * @property {Function} pause - a function that pauses the processing of tasks
- * until `resume()` is called. Invoke like `cargo.pause()`.
- * @property {Function} resume - a function that resumes the processing of
- * queued tasks when the queue is paused. Invoke like `cargo.resume()`.
- * @property {Function} kill - a function that removes the `drain` callback and
- * empties remaining tasks from the queue forcing it to go idle. Invoke like `cargo.kill()`.
- */
-
- /**
* Creates a `cargo` object with the specified payload. Tasks added to the
* cargo will be processed altogether (up to the `payload` limit). If the
* `worker` is in progress, the task is queued until it becomes available. Once
@@ -1433,7 +1472,7 @@
* @param {number} [payload=Infinity] - An optional `integer` for determining
* how many tasks should be processed per round; if omitted, the default is
* unlimited.
- * @returns {module:ControlFlow.CargoObject} A cargo object to manage the tasks. Callbacks can
+ * @returns {module:ControlFlow.QueueObject} A cargo object to manage the tasks. Callbacks can
* attached as certain properties to listen for specific events during the
* lifecycle of the cargo and inner queue.
* @example
@@ -1453,9 +1492,8 @@
* cargo.push({name: 'bar'}, function(err) {
* console.log('finished processing bar');
* });
- * cargo.push({name: 'baz'}, function(err) {
- * console.log('finished processing baz');
- * });
+ * await cargo.push({name: 'baz'});
+ * console.log('finished processing baz');
*/
function cargo(worker, payload) {
return queue(worker, 1, payload);
@@ -1709,6 +1747,7 @@
* @method
* @see [async.concat]{@link module:Collections.concat}
* @category Collection
+ * @alias flatMapLimit
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {number} limit - The maximum number of async operations at a time.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
@@ -1724,7 +1763,7 @@
return mapLimit$1(coll, limit, (val, iterCb) => {
_iteratee(val, (err, ...args) => {
if (err) return iterCb(err);
- return iterCb(null, args);
+ return iterCb(err, args);
});
}, (err, mapResults) => {
var result = [];
@@ -1750,6 +1789,7 @@
* @memberOf module:Collections
* @method
* @category Collection
+ * @alias flatMap
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`,
* which should use an array as its result. Invoked with (item, callback).
@@ -1778,6 +1818,7 @@
* @method
* @see [async.concat]{@link module:Collections.concat}
* @category Collection
+ * @alias flatMapSeries
* @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over.
* @param {AsyncFunction} iteratee - A function to apply to each item in `coll`.
* The iteratee should complete with an array an array of results.
@@ -1849,7 +1890,7 @@
const iteratee = wrapAsync(_iteratee);
eachfn(arr, (value, _, callback) => {
iteratee(value, (err, result) => {
- if (err) return callback(err)
+ if (err || err === false) return callback(err);
if (check(result) && !testResult) {
testPassed = true;
@@ -2180,6 +2221,9 @@
/**
* The same as [`each`]{@link module:Collections.each} but runs only a single async operation at a time.
*
+ * Note, that unlike [`each`]{@link module:Collections.each}, this function applies iteratee to each item
+ * in series and therefore the iteratee functions will complete in order.
+
* @name eachSeries
* @static
* @memberOf module:Collections
@@ -2364,7 +2408,7 @@
if (v) {
results.push({index, value: x});
}
- iterCb();
+ iterCb(err);
});
}, err => {
if (err) return callback(err);
@@ -2529,7 +2573,7 @@
return mapLimit$1(coll, limit, (val, iterCb) => {
_iteratee(val, (err, key) => {
if (err) return iterCb(err);
- return iterCb(null, {key, val});
+ return iterCb(err, {key, val});
});
}, (err, mapResults) => {
var result = {};
@@ -2678,7 +2722,7 @@
_iteratee(val, key, (err, result) => {
if (err) return next(err);
newObj[key] = result;
- next();
+ next(err);
});
}, err => callback(err, newObj));
}
@@ -3000,6 +3044,9 @@
* @property {number} concurrency - an integer for determining how many `worker`
* functions should be run in parallel. This property can be changed after a
* `queue` is created to alter the concurrency on-the-fly.
+ * @property {number} payload - an integer that specifies how many items are
+ * passed to the worker function at a time. only applies if this is a
+ * [cargo]{@link module:ControlFlow.cargo} object
* @property {Function} push - add a new task to the `queue`. Calls `callback`
* once the `worker` has finished processing the task. Instead of a single task,
* a `tasks` array can be submitted. The respective callback is used for every
@@ -3012,20 +3059,26 @@
* [priorityQueue]{@link module:ControlFlow.priorityQueue} object.
* Invoked with `queue.remove(testFn)`, where `testFn` is of the form
* `function ({data, priority}) {}` and returns a Boolean.
- * @property {Function} saturated - a callback that is called when the number of
- * running workers hits the `concurrency` limit, and further tasks will be
- * queued.
- * @property {Function} unsaturated - a callback that is called when the number
- * of running workers is less than the `concurrency` & `buffer` limits, and
- * further tasks will not be queued.
+ * @property {Function} saturated - a function that sets a callback that is
+ * called when the number of running workers hits the `concurrency` limit, and
+ * further tasks will be queued. If the callback is omitted, `q.saturated()`
+ * returns a promise for the next occurrence.
+ * @property {Function} unsaturated - a function that sets a callback that is
+ * called when the number of running workers is less than the `concurrency` &
+ * `buffer` limits, and further tasks will not be queued. If the callback is
+ * omitted, `q.unsaturated()` returns a promise for the next occurrence.
* @property {number} buffer - A minimum threshold buffer in order to say that
* the `queue` is `unsaturated`.
- * @property {Function} empty - a callback that is called when the last item
- * from the `queue` is given to a `worker`.
- * @property {Function} drain - a callback that is called when the last item
- * from the `queue` has returned from the `worker`.
- * @property {Function} error - a callback that is called when a task errors.
- * Has the signature `function(error, task)`.
+ * @property {Function} empty - a function that sets a callback that is called
+ * when the last item from the `queue` is given to a `worker`. If the callback
+ * is omitted, `q.empty()` returns a promise for the next occurrence.
+ * @property {Function} drain - a function that sets a callback that is called
+ * when the last item from the `queue` has returned from the `worker`. If the
+ * callback is omitted, `q.drain()` returns a promise for the next occurrence.
+ * @property {Function} error - a function that sets a callback that is called
+ * when a task errors. Has the signature `function(error, task)`. If the
+ * callback is omitted, `error()` returns a promise that rejects on the next
+ * error.
* @property {boolean} paused - a boolean for determining whether the queue is
* in a paused state.
* @property {Function} pause - a function that pauses the processing of tasks
@@ -3047,6 +3100,12 @@
* for (let item of q) {
* console.log(item)
* }
+ *
+ * q.drain(() => {
+ * console.log('all done')
+ * })
+ * // or
+ * await q.drain()
*/
/**
@@ -3078,22 +3137,23 @@
* }, 2);
*
* // assign a callback
- * q.drain = function() {
+ * q.drain(function() {
* console.log('all items have been processed');
- * };
+ * });
+ * // or await the end
+ * await q.drain()
*
* // assign an error callback
- * q.error = function(err, task) {
+ * q.error(function(err, task) {
* console.error('task experienced an error');
- * };
+ * });
*
* // add some items to the queue
* q.push({name: 'foo'}, function(err) {
* console.log('finished processing foo');
* });
- * q.push({name: 'bar'}, function (err) {
- * console.log('finished processing bar');
- * });
+ * // callback is optional
+ * q.push({name: 'bar'});
*
* // add some items to the queue (batch-wise)
* q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) {
@@ -3112,6 +3172,121 @@
}, concurrency, 1);
}
+ // Binary min-heap implementation used for priority queue.
+ // Implementation is stable, i.e. push time is considered for equal priorities
+ class Heap {
+ constructor() {
+ this.heap = [];
+ this.pushCount = Number.MIN_SAFE_INTEGER;
+ }
+
+ get length() {
+ return this.heap.length;
+ }
+
+ empty () {
+ this.heap = [];
+ return this;
+ }
+
+ percUp(index) {
+ let p;
+
+ while (index > 0 && smaller(this.heap[index], this.heap[p=parent(index)])) {
+ let t = this.heap[index];
+ this.heap[index] = this.heap[p];
+ this.heap[p] = t;
+
+ index = p;
+ }
+ }
+
+ percDown(index) {
+ let l;
+
+ while ((l=leftChi(index)) < this.heap.length) {
+ if (l+1 < this.heap.length && smaller(this.heap[l+1], this.heap[l])) {
+ l = l+1;
+ }
+
+ if (smaller(this.heap[index], this.heap[l])) {
+ break;
+ }
+
+ let t = this.heap[index];
+ this.heap[index] = this.heap[l];
+ this.heap[l] = t;
+
+ index = l;
+ }
+ }
+
+ push(node) {
+ node.pushCount = ++this.pushCount;
+ this.heap.push(node);
+ this.percUp(this.heap.length-1);
+ }
+
+ unshift(node) {
+ return this.heap.push(node);
+ }
+
+ shift() {
+ let [top] = this.heap;
+
+ this.heap[0] = this.heap[this.heap.length-1];
+ this.heap.pop();
+ this.percDown(0);
+
+ return top;
+ }
+
+ toArray() {
+ return [...this];
+ }
+
+ *[Symbol.iterator] () {
+ for (let i = 0; i < this.heap.length; i++) {
+ yield this.heap[i].data;
+ }
+ }
+
+ remove (testFn) {
+ let j = 0;
+ for (let i = 0; i < this.heap.length; i++) {
+ if (!testFn(this.heap[i])) {
+ this.heap[j] = this.heap[i];
+ j++;
+ }
+ }
+
+ this.heap.splice(j);
+
+ for (let i = parent(this.heap.length-1); i >= 0; i--) {
+ this.percDown(i);
+ }
+
+ return this;
+ }
+ }
+
+ function leftChi(i) {
+ return (i<<1)+1;
+ }
+
+ function parent(i) {
+ return ((i+1)>>1)-1;
+ }
+
+ function smaller(x, y) {
+ if (x.priority !== y.priority) {
+ return x.priority < y.priority;
+ }
+ else {
+ return x.pushCount < y.pushCount;
+ }
+ }
+
/**
* The same as [async.queue]{@link module:ControlFlow.queue} only tasks are assigned a priority and
* completed in ascending priority order.
@@ -3139,6 +3314,8 @@
// Start with a normal queue
var q = queue$1(worker, concurrency);
+ q._tasks = new Heap();
+
// Override push to accept second parameter representing priority
q.push = function(data, priority = 0, callback = () => {}) {
if (typeof callback !== 'function') {
@@ -3148,16 +3325,11 @@
if (!Array.isArray(data)) {
data = [data];
}
- if (data.length === 0) {
+ if (data.length === 0 && q.idle()) {
// call drain immediately if there are no tasks
return setImmediate$1(() => q.drain());
}
- var nextNode = q._tasks.head;
- while (nextNode && priority >= nextNode.priority) {
- nextNode = nextNode.next;
- }
-
for (var i = 0, l = data.length; i < l; i++) {
var item = {
data: data[i],
@@ -3165,12 +3337,9 @@
callback
};
- if (nextNode) {
- q._tasks.insertBefore(nextNode, item);
- } else {
- q._tasks.push(item);
- }
+ q._tasks.push(item);
}
+
setImmediate$1(q.process);
};
@@ -3298,14 +3467,18 @@
var _fn = wrapAsync(fn);
return initialParams(function reflectOn(args, reflectCallback) {
args.push((error, ...cbArgs) => {
+ let retVal = {};
if (error) {
- return reflectCallback(null, { error });
+ retVal.error = error;
}
- var value = cbArgs;
- if (cbArgs.length <= 1) {
- [value] = cbArgs;
+ if (cbArgs.length > 0){
+ var value = cbArgs;
+ if (cbArgs.length <= 1) {
+ [value] = cbArgs;
+ }
+ retVal.value = value;
}
- reflectCallback(null, { value });
+ reflectCallback(null, retVal);
});
return _fn.apply(this, args);
@@ -3603,7 +3776,7 @@
if (err && attempt++ < options.times &&
(typeof options.errorFilter != 'function' ||
options.errorFilter(err))) {
- setTimeout(retryAttempt, options.intervalFunc(attempt));
+ setTimeout(retryAttempt, options.intervalFunc(attempt - 1));
} else {
callback(err, ...args);
}
@@ -3896,7 +4069,7 @@
return map$1(coll, (x, iterCb) => {
_iteratee(x, (err, criteria) => {
if (err) return iterCb(err);
- iterCb(null, {value: x, criteria});
+ iterCb(err, {value: x, criteria});
});
}, (err, results) => {
if (err) return callback(err);
@@ -4169,6 +4342,8 @@
var result;
return eachSeries$1(tasks, (task, taskCb) => {
wrapAsync(task)((err, ...args) => {
+ if (err === false) return taskCb(err);
+
if (args.length < 2) {
[result] = args;
} else {
@@ -4239,7 +4414,7 @@
callback = onlyOnce(callback);
var _fn = wrapAsync(iteratee);
var _test = wrapAsync(test);
- var results;
+ var results = [];
function next(err, ...rest) {
if (err) return callback(err);
@@ -4509,6 +4684,9 @@
find: detect$1,
findLimit: detectLimit$1,
findSeries: detectSeries$1,
+ flatMap: concat$1,
+ flatMapLimit: concatLimit$1,
+ flatMapSeries: concatSeries$1,
forEach: each,
forEachSeries: eachSeries$1,
forEachLimit: eachLimit$2,
@@ -4612,6 +4790,9 @@
exports.find = detect$1;
exports.findLimit = detectLimit$1;
exports.findSeries = detectSeries$1;
+ exports.flatMap = concat$1;
+ exports.flatMapLimit = concatLimit$1;
+ exports.flatMapSeries = concatSeries$1;
exports.forEach = each;
exports.forEachSeries = eachSeries$1;
exports.forEachLimit = eachLimit$2;