diff options
author | Alexander Early <alexander.early@gmail.com> | 2019-05-19 19:27:01 -0700 |
---|---|---|
committer | Alexander Early <alexander.early@gmail.com> | 2019-05-19 19:27:01 -0700 |
commit | bd881300c0f69b32a0113c3c9b955db87cdc8c8f (patch) | |
tree | ec01d0f02567d03293fe005c4bb712ac641fe73a /dist/async.js | |
parent | 3fa33dd70c786d98dc203ae8a0d4d434d418dbd6 (diff) | |
download | async-bd881300c0f69b32a0113c3c9b955db87cdc8c8f.tar.gz |
Update built files
Diffstat (limited to 'dist/async.js')
-rw-r--r-- | dist/async.js | 465 |
1 files changed, 323 insertions, 142 deletions
diff --git a/dist/async.js b/dist/async.js index 1e9b8bf..0f015bb 100644 --- a/dist/async.js +++ b/dist/async.js @@ -197,17 +197,39 @@ return isAsync(asyncFn) ? asyncify(asyncFn) : asyncFn; } - function applyEach(eachfn) { - return function(fns, ...callArgs) { - var go = initialParams(function(args, callback) { + // conditionally promisify a function. + // only return a promise if a callback is omitted + function awaitify (asyncFn, arity = asyncFn.length) { + if (!arity) throw new Error('arity is undefined') + function awaitable (...args) { + if (typeof args[arity - 1] === 'function') { + return asyncFn.apply(this, args) + } + + return new Promise((resolve, reject) => { + args[arity - 1] = (err, ...cbArgs) => { + if (err) return reject(err) + resolve(cbArgs.length > 1 ? cbArgs : cbArgs[0]); + }; + asyncFn.apply(this, args); + }) + } + + Object.defineProperty(awaitable, 'name', { + value: `awaitable(${asyncFn.name})` + }); + + return awaitable + } + + function applyEach (eachfn) { + return function applyEach(fns, ...callArgs) { + const go = awaitify(function (callback) { var that = this; return eachfn(fns, (fn, cb) => { - wrapAsync(fn).apply(that, args.concat(cb)); + wrapAsync(fn).apply(that, callArgs.concat(cb)); }, callback); }); - if (callArgs.length) { - return go.apply(this, callArgs); - } return go; }; } @@ -426,31 +448,6 @@ }; }; - // conditionally promisify a function. - // only return a promise if a callback is omitted - function awaitify (asyncFn, arity = asyncFn.length) { - if (!arity) throw new Error('arity is undefined') - function awaitable (...args) { - if (typeof args[arity - 1] === 'function') { - return asyncFn.apply(this, args) - } - - return new Promise((resolve, reject) => { - args[arity - 1] = (err, ...cbArgs) => { - if (err) return reject(err) - resolve(cbArgs.length > 1 ? cbArgs : cbArgs[0]); - }; - asyncFn.apply(this, args); - }) - } - - Object.defineProperty(awaitable, 'name', { - value: `awaitable(${asyncFn.name})` - }); - - return awaitable - } - /** * The same as [`eachOf`]{@link module:Collections.eachOf} but runs a maximum of `limit` async operations at a * time. @@ -977,7 +974,7 @@ } var FN_ARGS = /^(?:async\s+)?(?:function)?\s*[^(]*\(\s*([^)]+)\s*\)(?:\s*{)/m; - var ARROW_FN_ARGS = /^(?:async\s+)?\(?\s*([^)^=]+)\s*\)?(?:\s*=>)/m; + var ARROW_FN_ARGS = /^(?:async\s+)?(?:function\s+)?\(?\s*([^)^=]+)\s*\)?(?:\s*=>)/m; var FN_ARG_SPLIT = /,/; var FN_ARG = /(=.+)?(\s*)$/; var STRIP_COMMENTS = /((\/\/.*$)|(\/\*[\s\S]*?\*\/))/mg; @@ -1212,8 +1209,6 @@ dll.head = dll.tail = node; } - const noop = () => {}; - function queue(worker, concurrency, payload) { if (concurrency == null) { concurrency = 1; @@ -1225,6 +1220,35 @@ var _worker = wrapAsync(worker); var numRunning = 0; var workersList = []; + const events = { + error: [], + drain: [], + saturated: [], + unsaturated: [], + empty: [] + }; + + function on (event, handler) { + events[event].push(handler); + } + + function once (event, handler) { + const handleAndRemove = (...args) => { + off(event, handleAndRemove); + handler(...args); + }; + events[event].push(handleAndRemove); + } + + function off (event, handler) { + if (!event) return Object.keys(events).forEach(ev => events[ev] = []) + if (!handler) return events[event] = [] + events[event] = events[event].filter(ev => ev !== handler); + } + + function trigger (event, ...args) { + events[event].forEach(handler => handler(...args)); + } var processingScheduled = false; function _insert(data, insertAtFront, callback) { @@ -1232,25 +1256,32 @@ throw new Error('task callback must be a function'); } q.started = true; - if (!Array.isArray(data)) { - data = [data]; - } - if (data.length === 0 && q.idle()) { - // call drain immediately if there are no tasks - return setImmediate$1(() => q.drain()); + if (Array.isArray(data)) { + if (data.length === 0 && q.idle()) { + // call drain immediately if there are no tasks + return setImmediate$1(() => trigger('drain')); + } + + return data.map(datum => _insert(datum, insertAtFront, callback)); } - for (var i = 0, l = data.length; i < l; i++) { - var item = { - data: data[i], - callback: callback || noop - }; + var res; - if (insertAtFront) { - q._tasks.unshift(item); - } else { - q._tasks.push(item); + var item = { + data, + callback: callback || function (err, ...args) { + // we don't care about the error, let the global error handler + // deal with it + if (err) return + if (args.length <= 1) return res(args[0]) + res(args); } + }; + + if (insertAtFront) { + q._tasks.unshift(item); + } else { + q._tasks.push(item); } if (!processingScheduled) { @@ -1260,9 +1291,15 @@ q.process(); }); } + + if (!callback) { + return new Promise((resolve) => { + res = resolve; + }) + } } - function _next(tasks) { + function _createCB(tasks) { return function (err, ...args) { numRunning -= 1; @@ -1279,21 +1316,35 @@ task.callback(err, ...args); if (err != null) { - q.error(err, task.data); + trigger('error', err, task.data); } } if (numRunning <= (q.concurrency - q.buffer) ) { - q.unsaturated(); + trigger('unsaturated'); } if (q.idle()) { - q.drain(); + trigger('drain'); } q.process(); }; } + const eventMethod = (name) => (handler) => { + if (!handler) { + return new Promise((resolve, reject) => { + once(name, (err, data) => { + if (err) return reject(err) + resolve(data); + }); + }) + } + off(name); + on(name, handler); + + }; + var isProcessing = false; var q = { _tasks: new DLL(), @@ -1302,23 +1353,18 @@ }, concurrency, payload, - saturated: noop, - unsaturated:noop, buffer: concurrency / 4, - empty: noop, - drain: noop, - error: noop, started: false, paused: false, push (data, callback) { - _insert(data, false, callback); + return _insert(data, false, callback); }, kill () { - q.drain = noop; + off(); q._tasks.empty(); }, unshift (data, callback) { - _insert(data, true, callback); + return _insert(data, true, callback); }, remove (testFn) { q._tasks.remove(testFn); @@ -1344,14 +1390,14 @@ numRunning += 1; if (q._tasks.length === 0) { - q.empty(); + trigger('empty'); } if (numRunning === q.concurrency) { - q.saturated(); + trigger('saturated'); } - var cb = onlyOnce(_next(tasks)); + var cb = onlyOnce(_createCB(tasks)); _worker(data, cb); } isProcessing = false; @@ -1377,40 +1423,33 @@ setImmediate$1(q.process); } }; + // define these as fixed properties, so people get useful errors when updating + Object.defineProperties(q, { + saturated: { + writable: false, + value: eventMethod('saturated') + }, + unsaturated: { + writable: false, + value: eventMethod('unsaturated') + }, + empty: { + writable: false, + value: eventMethod('empty') + }, + drain: { + writable: false, + value: eventMethod('drain') + }, + error: { + writable: false, + value: eventMethod('error') + }, + }); return q; } /** - * A cargo of tasks for the worker function to complete. Cargo inherits all of - * the same methods and event callbacks as [`queue`]{@link module:ControlFlow.queue}. - * @typedef {Object} CargoObject - * @memberOf module:ControlFlow - * @property {Function} length - A function returning the number of items - * waiting to be processed. Invoke like `cargo.length()`. - * @property {number} payload - An `integer` for determining how many tasks - * should be process per round. This property can be changed after a `cargo` is - * created to alter the payload on-the-fly. - * @property {Function} push - Adds `task` to the `queue`. The callback is - * called once the `worker` has finished processing the task. Instead of a - * single task, an array of `tasks` can be submitted. The respective callback is - * used for every task in the list. Invoke like `cargo.push(task, [callback])`. - * @property {Function} saturated - A callback that is called when the - * `queue.length()` hits the concurrency and further tasks will be queued. - * @property {Function} empty - A callback that is called when the last item - * from the `queue` is given to a `worker`. - * @property {Function} drain - A callback that is called when the last item - * from the `queue` has returned from the `worker`. - * @property {Function} idle - a function returning false if there are items - * waiting or being processed, or true if not. Invoke like `cargo.idle()`. - * @property {Function} pause - a function that pauses the processing of tasks - * until `resume()` is called. Invoke like `cargo.pause()`. - * @property {Function} resume - a function that resumes the processing of - * queued tasks when the queue is paused. Invoke like `cargo.resume()`. - * @property {Function} kill - a function that removes the `drain` callback and - * empties remaining tasks from the queue forcing it to go idle. Invoke like `cargo.kill()`. - */ - - /** * Creates a `cargo` object with the specified payload. Tasks added to the * cargo will be processed altogether (up to the `payload` limit). If the * `worker` is in progress, the task is queued until it becomes available. Once @@ -1433,7 +1472,7 @@ * @param {number} [payload=Infinity] - An optional `integer` for determining * how many tasks should be processed per round; if omitted, the default is * unlimited. - * @returns {module:ControlFlow.CargoObject} A cargo object to manage the tasks. Callbacks can + * @returns {module:ControlFlow.QueueObject} A cargo object to manage the tasks. Callbacks can * attached as certain properties to listen for specific events during the * lifecycle of the cargo and inner queue. * @example @@ -1453,9 +1492,8 @@ * cargo.push({name: 'bar'}, function(err) { * console.log('finished processing bar'); * }); - * cargo.push({name: 'baz'}, function(err) { - * console.log('finished processing baz'); - * }); + * await cargo.push({name: 'baz'}); + * console.log('finished processing baz'); */ function cargo(worker, payload) { return queue(worker, 1, payload); @@ -1709,6 +1747,7 @@ * @method * @see [async.concat]{@link module:Collections.concat} * @category Collection + * @alias flatMapLimit * @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over. * @param {number} limit - The maximum number of async operations at a time. * @param {AsyncFunction} iteratee - A function to apply to each item in `coll`, @@ -1724,7 +1763,7 @@ return mapLimit$1(coll, limit, (val, iterCb) => { _iteratee(val, (err, ...args) => { if (err) return iterCb(err); - return iterCb(null, args); + return iterCb(err, args); }); }, (err, mapResults) => { var result = []; @@ -1750,6 +1789,7 @@ * @memberOf module:Collections * @method * @category Collection + * @alias flatMap * @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over. * @param {AsyncFunction} iteratee - A function to apply to each item in `coll`, * which should use an array as its result. Invoked with (item, callback). @@ -1778,6 +1818,7 @@ * @method * @see [async.concat]{@link module:Collections.concat} * @category Collection + * @alias flatMapSeries * @param {Array|Iterable|AsyncIterable|Object} coll - A collection to iterate over. * @param {AsyncFunction} iteratee - A function to apply to each item in `coll`. * The iteratee should complete with an array an array of results. @@ -1849,7 +1890,7 @@ const iteratee = wrapAsync(_iteratee); eachfn(arr, (value, _, callback) => { iteratee(value, (err, result) => { - if (err) return callback(err) + if (err || err === false) return callback(err); if (check(result) && !testResult) { testPassed = true; @@ -2180,6 +2221,9 @@ /** * The same as [`each`]{@link module:Collections.each} but runs only a single async operation at a time. * + * Note, that unlike [`each`]{@link module:Collections.each}, this function applies iteratee to each item + * in series and therefore the iteratee functions will complete in order. + * @name eachSeries * @static * @memberOf module:Collections @@ -2364,7 +2408,7 @@ if (v) { results.push({index, value: x}); } - iterCb(); + iterCb(err); }); }, err => { if (err) return callback(err); @@ -2529,7 +2573,7 @@ return mapLimit$1(coll, limit, (val, iterCb) => { _iteratee(val, (err, key) => { if (err) return iterCb(err); - return iterCb(null, {key, val}); + return iterCb(err, {key, val}); }); }, (err, mapResults) => { var result = {}; @@ -2678,7 +2722,7 @@ _iteratee(val, key, (err, result) => { if (err) return next(err); newObj[key] = result; - next(); + next(err); }); }, err => callback(err, newObj)); } @@ -3000,6 +3044,9 @@ * @property {number} concurrency - an integer for determining how many `worker` * functions should be run in parallel. This property can be changed after a * `queue` is created to alter the concurrency on-the-fly. + * @property {number} payload - an integer that specifies how many items are + * passed to the worker function at a time. only applies if this is a + * [cargo]{@link module:ControlFlow.cargo} object * @property {Function} push - add a new task to the `queue`. Calls `callback` * once the `worker` has finished processing the task. Instead of a single task, * a `tasks` array can be submitted. The respective callback is used for every @@ -3012,20 +3059,26 @@ * [priorityQueue]{@link module:ControlFlow.priorityQueue} object. * Invoked with `queue.remove(testFn)`, where `testFn` is of the form * `function ({data, priority}) {}` and returns a Boolean. - * @property {Function} saturated - a callback that is called when the number of - * running workers hits the `concurrency` limit, and further tasks will be - * queued. - * @property {Function} unsaturated - a callback that is called when the number - * of running workers is less than the `concurrency` & `buffer` limits, and - * further tasks will not be queued. + * @property {Function} saturated - a function that sets a callback that is + * called when the number of running workers hits the `concurrency` limit, and + * further tasks will be queued. If the callback is omitted, `q.saturated()` + * returns a promise for the next occurrence. + * @property {Function} unsaturated - a function that sets a callback that is + * called when the number of running workers is less than the `concurrency` & + * `buffer` limits, and further tasks will not be queued. If the callback is + * omitted, `q.unsaturated()` returns a promise for the next occurrence. * @property {number} buffer - A minimum threshold buffer in order to say that * the `queue` is `unsaturated`. - * @property {Function} empty - a callback that is called when the last item - * from the `queue` is given to a `worker`. - * @property {Function} drain - a callback that is called when the last item - * from the `queue` has returned from the `worker`. - * @property {Function} error - a callback that is called when a task errors. - * Has the signature `function(error, task)`. + * @property {Function} empty - a function that sets a callback that is called + * when the last item from the `queue` is given to a `worker`. If the callback + * is omitted, `q.empty()` returns a promise for the next occurrence. + * @property {Function} drain - a function that sets a callback that is called + * when the last item from the `queue` has returned from the `worker`. If the + * callback is omitted, `q.drain()` returns a promise for the next occurrence. + * @property {Function} error - a function that sets a callback that is called + * when a task errors. Has the signature `function(error, task)`. If the + * callback is omitted, `error()` returns a promise that rejects on the next + * error. * @property {boolean} paused - a boolean for determining whether the queue is * in a paused state. * @property {Function} pause - a function that pauses the processing of tasks @@ -3047,6 +3100,12 @@ * for (let item of q) { * console.log(item) * } + * + * q.drain(() => { + * console.log('all done') + * }) + * // or + * await q.drain() */ /** @@ -3078,22 +3137,23 @@ * }, 2); * * // assign a callback - * q.drain = function() { + * q.drain(function() { * console.log('all items have been processed'); - * }; + * }); + * // or await the end + * await q.drain() * * // assign an error callback - * q.error = function(err, task) { + * q.error(function(err, task) { * console.error('task experienced an error'); - * }; + * }); * * // add some items to the queue * q.push({name: 'foo'}, function(err) { * console.log('finished processing foo'); * }); - * q.push({name: 'bar'}, function (err) { - * console.log('finished processing bar'); - * }); + * // callback is optional + * q.push({name: 'bar'}); * * // add some items to the queue (batch-wise) * q.push([{name: 'baz'},{name: 'bay'},{name: 'bax'}], function(err) { @@ -3112,6 +3172,121 @@ }, concurrency, 1); } + // Binary min-heap implementation used for priority queue. + // Implementation is stable, i.e. push time is considered for equal priorities + class Heap { + constructor() { + this.heap = []; + this.pushCount = Number.MIN_SAFE_INTEGER; + } + + get length() { + return this.heap.length; + } + + empty () { + this.heap = []; + return this; + } + + percUp(index) { + let p; + + while (index > 0 && smaller(this.heap[index], this.heap[p=parent(index)])) { + let t = this.heap[index]; + this.heap[index] = this.heap[p]; + this.heap[p] = t; + + index = p; + } + } + + percDown(index) { + let l; + + while ((l=leftChi(index)) < this.heap.length) { + if (l+1 < this.heap.length && smaller(this.heap[l+1], this.heap[l])) { + l = l+1; + } + + if (smaller(this.heap[index], this.heap[l])) { + break; + } + + let t = this.heap[index]; + this.heap[index] = this.heap[l]; + this.heap[l] = t; + + index = l; + } + } + + push(node) { + node.pushCount = ++this.pushCount; + this.heap.push(node); + this.percUp(this.heap.length-1); + } + + unshift(node) { + return this.heap.push(node); + } + + shift() { + let [top] = this.heap; + + this.heap[0] = this.heap[this.heap.length-1]; + this.heap.pop(); + this.percDown(0); + + return top; + } + + toArray() { + return [...this]; + } + + *[Symbol.iterator] () { + for (let i = 0; i < this.heap.length; i++) { + yield this.heap[i].data; + } + } + + remove (testFn) { + let j = 0; + for (let i = 0; i < this.heap.length; i++) { + if (!testFn(this.heap[i])) { + this.heap[j] = this.heap[i]; + j++; + } + } + + this.heap.splice(j); + + for (let i = parent(this.heap.length-1); i >= 0; i--) { + this.percDown(i); + } + + return this; + } + } + + function leftChi(i) { + return (i<<1)+1; + } + + function parent(i) { + return ((i+1)>>1)-1; + } + + function smaller(x, y) { + if (x.priority !== y.priority) { + return x.priority < y.priority; + } + else { + return x.pushCount < y.pushCount; + } + } + /** * The same as [async.queue]{@link module:ControlFlow.queue} only tasks are assigned a priority and * completed in ascending priority order. @@ -3139,6 +3314,8 @@ // Start with a normal queue var q = queue$1(worker, concurrency); + q._tasks = new Heap(); + // Override push to accept second parameter representing priority q.push = function(data, priority = 0, callback = () => {}) { if (typeof callback !== 'function') { @@ -3148,16 +3325,11 @@ if (!Array.isArray(data)) { data = [data]; } - if (data.length === 0) { + if (data.length === 0 && q.idle()) { // call drain immediately if there are no tasks return setImmediate$1(() => q.drain()); } - var nextNode = q._tasks.head; - while (nextNode && priority >= nextNode.priority) { - nextNode = nextNode.next; - } - for (var i = 0, l = data.length; i < l; i++) { var item = { data: data[i], @@ -3165,12 +3337,9 @@ callback }; - if (nextNode) { - q._tasks.insertBefore(nextNode, item); - } else { - q._tasks.push(item); - } + q._tasks.push(item); } + setImmediate$1(q.process); }; @@ -3298,14 +3467,18 @@ var _fn = wrapAsync(fn); return initialParams(function reflectOn(args, reflectCallback) { args.push((error, ...cbArgs) => { + let retVal = {}; if (error) { - return reflectCallback(null, { error }); + retVal.error = error; } - var value = cbArgs; - if (cbArgs.length <= 1) { - [value] = cbArgs; + if (cbArgs.length > 0){ + var value = cbArgs; + if (cbArgs.length <= 1) { + [value] = cbArgs; + } + retVal.value = value; } - reflectCallback(null, { value }); + reflectCallback(null, retVal); }); return _fn.apply(this, args); @@ -3603,7 +3776,7 @@ if (err && attempt++ < options.times && (typeof options.errorFilter != 'function' || options.errorFilter(err))) { - setTimeout(retryAttempt, options.intervalFunc(attempt)); + setTimeout(retryAttempt, options.intervalFunc(attempt - 1)); } else { callback(err, ...args); } @@ -3896,7 +4069,7 @@ return map$1(coll, (x, iterCb) => { _iteratee(x, (err, criteria) => { if (err) return iterCb(err); - iterCb(null, {value: x, criteria}); + iterCb(err, {value: x, criteria}); }); }, (err, results) => { if (err) return callback(err); @@ -4169,6 +4342,8 @@ var result; return eachSeries$1(tasks, (task, taskCb) => { wrapAsync(task)((err, ...args) => { + if (err === false) return taskCb(err); + if (args.length < 2) { [result] = args; } else { @@ -4239,7 +4414,7 @@ callback = onlyOnce(callback); var _fn = wrapAsync(iteratee); var _test = wrapAsync(test); - var results; + var results = []; function next(err, ...rest) { if (err) return callback(err); @@ -4509,6 +4684,9 @@ find: detect$1, findLimit: detectLimit$1, findSeries: detectSeries$1, + flatMap: concat$1, + flatMapLimit: concatLimit$1, + flatMapSeries: concatSeries$1, forEach: each, forEachSeries: eachSeries$1, forEachLimit: eachLimit$2, @@ -4612,6 +4790,9 @@ exports.find = detect$1; exports.findLimit = detectLimit$1; exports.findSeries = detectSeries$1; + exports.flatMap = concat$1; + exports.flatMapLimit = concatLimit$1; + exports.flatMapSeries = concatSeries$1; exports.forEach = each; exports.forEachSeries = eachSeries$1; exports.forEachLimit = eachLimit$2; |