summaryrefslogtreecommitdiff
path: root/lib/priorityQueue.js
diff options
context:
space:
mode:
authorHubert Argasinski <argasinski.hubert@gmail.com>2022-04-15 00:06:27 -0400
committerGitHub <noreply@github.com>2022-04-15 00:06:27 -0400
commit6927a814ad505920179e5dd50e3ccb085f591273 (patch)
tree88c9612da98d4ef764a9f8fc27188d3683332a50 /lib/priorityQueue.js
parent576ba747a2aca0e5392f2aad75f4a9912603b2d5 (diff)
downloadasync-6927a814ad505920179e5dd50e3ccb085f591273.tar.gz
fix: update priorityQueue functionality to match queue (#1790)
Diffstat (limited to 'lib/priorityQueue.js')
-rw-r--r--lib/priorityQueue.js62
1 files changed, 29 insertions, 33 deletions
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js
index 53ea017..e418e48 100644
--- a/lib/priorityQueue.js
+++ b/lib/priorityQueue.js
@@ -1,4 +1,3 @@
-import setImmediate from './setImmediate.js'
import queue from './queue.js'
import Heap from './internal/Heap.js'
@@ -19,54 +18,51 @@ import Heap from './internal/Heap.js'
* @param {number} concurrency - An `integer` for determining how many `worker`
* functions should be run in parallel. If omitted, the concurrency defaults to
* `1`. If the concurrency is `0`, an error is thrown.
- * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two
+ * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are three
* differences between `queue` and `priorityQueue` objects:
* * `push(task, priority, [callback])` - `priority` should be a number. If an
* array of `tasks` is given, all tasks will be assigned the same priority.
- * * The `unshift` method was removed.
+ * * `pushAsync(task, priority, [callback])` - the same as `priorityQueue.push`,
+ * except this returns a promise that rejects if an error occurs.
+ * * The `unshift` and `unshiftAsync` methods were removed.
*/
export default function(worker, concurrency) {
// Start with a normal queue
var q = queue(worker, concurrency);
- var processingScheduled = false;
+
+ var {
+ push,
+ pushAsync
+ } = q;
q._tasks = new Heap();
+ q._createTaskItem = ({data, priority}, callback) => {
+ return {
+ data,
+ priority,
+ callback
+ };
+ };
- // Override push to accept second parameter representing priority
- q.push = function(data, priority = 0, callback = () => {}) {
- if (typeof callback !== 'function') {
- throw new Error('task callback must be a function');
- }
- q.started = true;
- if (!Array.isArray(data)) {
- data = [data];
- }
- if (data.length === 0 && q.idle()) {
- // call drain immediately if there are no tasks
- return setImmediate(() => q.drain());
+ function createDataItems(tasks, priority) {
+ if (!Array.isArray(tasks)) {
+ return {data: tasks, priority};
}
+ return tasks.map(data => { return {data, priority}; });
+ }
- for (var i = 0, l = data.length; i < l; i++) {
- var item = {
- data: data[i],
- priority,
- callback
- };
-
- q._tasks.push(item);
- }
+ // Override push to accept second parameter representing priority
+ q.push = function(data, priority = 0, callback) {
+ return push(createDataItems(data, priority), callback);
+ };
- if (!processingScheduled) {
- processingScheduled = true;
- setImmediate(() => {
- processingScheduled = false;
- q.process();
- });
- }
+ q.pushAsync = function(data, priority = 0, callback) {
+ return pushAsync(createDataItems(data, priority), callback);
};
- // Remove unshift function
+ // Remove unshift functions
delete q.unshift;
+ delete q.unshiftAsync;
return q;
}