diff options
author | Hubert Argasinski <argasinski.hubert@gmail.com> | 2022-04-15 00:06:27 -0400 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-15 00:06:27 -0400 |
commit | 6927a814ad505920179e5dd50e3ccb085f591273 (patch) | |
tree | 88c9612da98d4ef764a9f8fc27188d3683332a50 /lib/priorityQueue.js | |
parent | 576ba747a2aca0e5392f2aad75f4a9912603b2d5 (diff) | |
download | async-6927a814ad505920179e5dd50e3ccb085f591273.tar.gz |
fix: update priorityQueue functionality to match queue (#1790)
Diffstat (limited to 'lib/priorityQueue.js')
-rw-r--r-- | lib/priorityQueue.js | 62 |
1 files changed, 29 insertions, 33 deletions
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js index 53ea017..e418e48 100644 --- a/lib/priorityQueue.js +++ b/lib/priorityQueue.js @@ -1,4 +1,3 @@ -import setImmediate from './setImmediate.js' import queue from './queue.js' import Heap from './internal/Heap.js' @@ -19,54 +18,51 @@ import Heap from './internal/Heap.js' * @param {number} concurrency - An `integer` for determining how many `worker` * functions should be run in parallel. If omitted, the concurrency defaults to * `1`. If the concurrency is `0`, an error is thrown. - * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two + * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are three * differences between `queue` and `priorityQueue` objects: * * `push(task, priority, [callback])` - `priority` should be a number. If an * array of `tasks` is given, all tasks will be assigned the same priority. - * * The `unshift` method was removed. + * * `pushAsync(task, priority, [callback])` - the same as `priorityQueue.push`, + * except this returns a promise that rejects if an error occurs. + * * The `unshift` and `unshiftAsync` methods were removed. */ export default function(worker, concurrency) { // Start with a normal queue var q = queue(worker, concurrency); - var processingScheduled = false; + + var { + push, + pushAsync + } = q; q._tasks = new Heap(); + q._createTaskItem = ({data, priority}, callback) => { + return { + data, + priority, + callback + }; + }; - // Override push to accept second parameter representing priority - q.push = function(data, priority = 0, callback = () => {}) { - if (typeof callback !== 'function') { - throw new Error('task callback must be a function'); - } - q.started = true; - if (!Array.isArray(data)) { - data = [data]; - } - if (data.length === 0 && q.idle()) { - // call drain immediately if there are no tasks - return setImmediate(() => q.drain()); + function createDataItems(tasks, priority) { + if (!Array.isArray(tasks)) { + return {data: tasks, priority}; } + return tasks.map(data => { return {data, priority}; }); + } - for (var i = 0, l = data.length; i < l; i++) { - var item = { - data: data[i], - priority, - callback - }; - - q._tasks.push(item); - } + // Override push to accept second parameter representing priority + q.push = function(data, priority = 0, callback) { + return push(createDataItems(data, priority), callback); + }; - if (!processingScheduled) { - processingScheduled = true; - setImmediate(() => { - processingScheduled = false; - q.process(); - }); - } + q.pushAsync = function(data, priority = 0, callback) { + return pushAsync(createDataItems(data, priority), callback); }; - // Remove unshift function + // Remove unshift functions delete q.unshift; + delete q.unshiftAsync; return q; } |