summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHubert Argasinski <argasinski.hubert@gmail.com>2022-04-15 00:06:27 -0400
committerGitHub <noreply@github.com>2022-04-15 00:06:27 -0400
commit6927a814ad505920179e5dd50e3ccb085f591273 (patch)
tree88c9612da98d4ef764a9f8fc27188d3683332a50
parent576ba747a2aca0e5392f2aad75f4a9912603b2d5 (diff)
downloadasync-6927a814ad505920179e5dd50e3ccb085f591273.tar.gz
fix: update priorityQueue functionality to match queue (#1790)
-rw-r--r--lib/internal/queue.js13
-rw-r--r--lib/priorityQueue.js62
-rw-r--r--test/es2017/awaitableFunctions.js60
-rw-r--r--test/priorityQueue.js206
-rw-r--r--test/queue.js2
5 files changed, 293 insertions, 50 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js
index 886fe7a..75712c2 100644
--- a/lib/internal/queue.js
+++ b/lib/internal/queue.js
@@ -60,12 +60,11 @@ export default function queue(worker, concurrency, payload) {
res(args)
}
- var item = {
+ var item = q._createTaskItem(
data,
- callback: rejectOnError ?
- promiseCallback :
+ rejectOnError ? promiseCallback :
(callback || promiseCallback)
- };
+ );
if (insertAtFront) {
q._tasks.unshift(item);
@@ -147,6 +146,12 @@ export default function queue(worker, concurrency, payload) {
var isProcessing = false;
var q = {
_tasks: new DLL(),
+ _createTaskItem (data, callback) {
+ return {
+ data,
+ callback
+ };
+ },
*[Symbol.iterator] () {
yield* q._tasks[Symbol.iterator]()
},
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js
index 53ea017..e418e48 100644
--- a/lib/priorityQueue.js
+++ b/lib/priorityQueue.js
@@ -1,4 +1,3 @@
-import setImmediate from './setImmediate.js'
import queue from './queue.js'
import Heap from './internal/Heap.js'
@@ -19,54 +18,51 @@ import Heap from './internal/Heap.js'
* @param {number} concurrency - An `integer` for determining how many `worker`
* functions should be run in parallel. If omitted, the concurrency defaults to
* `1`. If the concurrency is `0`, an error is thrown.
- * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are two
+ * @returns {module:ControlFlow.QueueObject} A priorityQueue object to manage the tasks. There are three
* differences between `queue` and `priorityQueue` objects:
* * `push(task, priority, [callback])` - `priority` should be a number. If an
* array of `tasks` is given, all tasks will be assigned the same priority.
- * * The `unshift` method was removed.
+ * * `pushAsync(task, priority, [callback])` - the same as `priorityQueue.push`,
+ * except this returns a promise that rejects if an error occurs.
+ * * The `unshift` and `unshiftAsync` methods were removed.
*/
export default function(worker, concurrency) {
// Start with a normal queue
var q = queue(worker, concurrency);
- var processingScheduled = false;
+
+ var {
+ push,
+ pushAsync
+ } = q;
q._tasks = new Heap();
+ q._createTaskItem = ({data, priority}, callback) => {
+ return {
+ data,
+ priority,
+ callback
+ };
+ };
- // Override push to accept second parameter representing priority
- q.push = function(data, priority = 0, callback = () => {}) {
- if (typeof callback !== 'function') {
- throw new Error('task callback must be a function');
- }
- q.started = true;
- if (!Array.isArray(data)) {
- data = [data];
- }
- if (data.length === 0 && q.idle()) {
- // call drain immediately if there are no tasks
- return setImmediate(() => q.drain());
+ function createDataItems(tasks, priority) {
+ if (!Array.isArray(tasks)) {
+ return {data: tasks, priority};
}
+ return tasks.map(data => { return {data, priority}; });
+ }
- for (var i = 0, l = data.length; i < l; i++) {
- var item = {
- data: data[i],
- priority,
- callback
- };
-
- q._tasks.push(item);
- }
+ // Override push to accept second parameter representing priority
+ q.push = function(data, priority = 0, callback) {
+ return push(createDataItems(data, priority), callback);
+ };
- if (!processingScheduled) {
- processingScheduled = true;
- setImmediate(() => {
- processingScheduled = false;
- q.process();
- });
- }
+ q.pushAsync = function(data, priority = 0, callback) {
+ return pushAsync(createDataItems(data, priority), callback);
};
- // Remove unshift function
+ // Remove unshift functions
delete q.unshift;
+ delete q.unshiftAsync;
return q;
}
diff --git a/test/es2017/awaitableFunctions.js b/test/es2017/awaitableFunctions.js
index f961857..d68a18f 100644
--- a/test/es2017/awaitableFunctions.js
+++ b/test/es2017/awaitableFunctions.js
@@ -540,7 +540,7 @@ module.exports = function () {
it('should work with queues', async () => {
const q = async.queue(async (data) => {
if (data === 2) throw new Error('oh noes')
- await new Promise(resolve => setTimeout(resolve, 10))
+ await new Promise(resolve => setTimeout(() => resolve(data), 10))
return data
}, 5)
@@ -561,7 +561,7 @@ module.exports = function () {
const multiP = Promise.all(q.push([9, 10]))
await q.drain()
- await multiP
+ const res = await multiP
expect(calls.join()).to.eql([
'saturated',
'push cb 1',
@@ -581,6 +581,62 @@ module.exports = function () {
expect(emptyCalls).to.eql([
'empty',
])
+
+ expect(res).to.eql([
+ 9,
+ 10
+ ])
+ })
+
+ it('should work with priorityQueues', async () => {
+ const q = async.priorityQueue(async (data) => {
+ if (data === 2) throw new Error('oh noes')
+ await new Promise(resolve => setTimeout(() => resolve(data), 10))
+ return data
+ }, 5)
+
+ const calls = []
+ const errorCalls = []
+ const emptyCalls = []
+ q.error().catch(d => errorCalls.push('error ' + d))
+ q.saturated().then(() => calls.push('saturated'))
+ q.unsaturated().then(() => calls.push('unsaturated'))
+ q.empty().then(() => emptyCalls.push('empty'))
+
+ q.push(1, 1).then(d => calls.push('push cb ' + d))
+ q.push(2, 1).then(d => errorCalls.push('push cb ' + d))
+ q.push([3, 4, 5, 6], 0).map(p => p.then(d => calls.push('push cb ' + d)))
+ q.push(7, 3).then(d => calls.push('push cb ' + d))
+ q.push(8, 3).then(d => calls.push('push cb ' + d))
+
+ const multiP = Promise.all(q.push([9, 10], 1))
+
+ await q.drain()
+ const res = await multiP
+ expect(calls.join()).to.eql([
+ 'saturated',
+ 'push cb 3',
+ 'push cb 4',
+ 'push cb 5',
+ 'push cb 6',
+ 'push cb 1',
+ 'unsaturated',
+ 'push cb 7',
+ 'push cb 8'
+ ].join())
+
+ expect(errorCalls).to.eql([
+ 'push cb undefined',
+ 'error Error: oh noes',
+ ])
+ expect(emptyCalls).to.eql([
+ 'empty',
+ ])
+
+ expect(res).to.eql([
+ 9,
+ 10
+ ])
})
/*
diff --git a/test/priorityQueue.js b/test/priorityQueue.js
index 564a0d6..40f7fe6 100644
--- a/test/priorityQueue.js
+++ b/test/priorityQueue.js
@@ -16,13 +16,13 @@ describe('priorityQueue', () => {
q.push(1, 1.4, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
- expect(q.length()).to.equal(2);
+ expect(q.length()).to.equal(3);
call_order.push('callback ' + 1);
});
q.push(2, 0.2, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
- expect(q.length()).to.equal(3);
+ expect(q.length()).to.equal(4);
call_order.push('callback ' + 2);
});
q.push(3, 3.8, (err, arg) => {
@@ -31,26 +31,25 @@ describe('priorityQueue', () => {
expect(q.length()).to.equal(0);
call_order.push('callback ' + 3);
});
- q.push(4, 2.9, (err, arg) => {
+ q.push(['arr', 'arr'], 2.9, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
- expect(q.length()).to.equal(1);
- call_order.push('callback ' + 4);
+ call_order.push('callback arr');
});
- expect(q.length()).to.equal(4);
+ expect(q.length()).to.equal(5);
expect(q.concurrency).to.equal(1);
q.drain(() => {
expect(call_order).to.eql([
'process 2', 'callback 2',
'process 1', 'callback 1',
- 'process 4', 'callback 4',
+ 'process arr', 'callback arr',
+ 'process arr', 'callback arr',
'process 3', 'callback 3'
]);
expect(q.concurrency).to.equal(1);
expect(q.length()).to.equal(0);
- q.push([])
- expect(q.length()).to.equal(0)
+ expect(q.idle()).to.be.equal(true);
done()
});
try {
@@ -79,27 +78,32 @@ describe('priorityQueue', () => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(2);
+ expect(q.running()).to.equal(1);
call_order.push('callback ' + 1);
});
q.push(2, 0.2, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(1);
+ expect(q.running()).to.equal(1);
call_order.push('callback ' + 2);
});
q.push(3, 3.8, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(1);
call_order.push('callback ' + 3);
});
q.push(4, 2.9, (err, arg) => {
expect(err).to.equal('error');
expect(arg).to.equal('arg');
expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(0);
call_order.push('callback ' + 4);
});
expect(q.length()).to.equal(4);
+ expect(q.running()).to.equal(0);
expect(q.concurrency).to.equal(2);
q.drain(() => {
@@ -111,10 +115,30 @@ describe('priorityQueue', () => {
]);
expect(q.concurrency).to.equal(2);
expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(0);
done();
});
});
+ it('pushAsync', done => {
+ const calls = [];
+ const q = async.priorityQueue((task, cb) => {
+ if (task === 2) return cb(new Error('fail'));
+ cb();
+ })
+
+ q.pushAsync(1, 1, () => { throw new Error('should not be called') }).then(() => calls.push(1));
+ q.pushAsync(2, 0).catch(err => {
+ expect(err.message).to.equal('fail');
+ calls.push(2);
+ });
+ q.pushAsync([3, 4], 0).map(p => p.then(() => calls.push('arr')));
+ q.drain(() => setTimeout(() => {
+ expect(calls).to.eql([2, 'arr', 'arr', 1]);
+ done();
+ }));
+ });
+
it('pause in worker with concurrency', (done) => {
var call_order = [];
var q = async.priorityQueue((task, callback) => {
@@ -144,6 +168,104 @@ describe('priorityQueue', () => {
});
});
+ it('kill', (done) => {
+ var q = async.priorityQueue((/*task, callback*/) => {
+ setTimeout(() => {
+ throw new Error("Function should never be called");
+ }, 20);
+ }, 1);
+ q.drain(() => {
+ throw new Error("Function should never be called");
+ });
+
+ q.push(0);
+
+ q.kill();
+
+ setTimeout(() => {
+ expect(q.length()).to.equal(0);
+ done();
+ }, 40);
+ });
+
+ context('q.workersList():', () => {
+ it('should be the same length as running()', (done) => {
+ var q = async.priorityQueue((task, cb) => {
+ async.setImmediate(() => {
+ expect(q.workersList().length).to.equal(q.running());
+ cb();
+ });
+ }, 2);
+
+ q.drain(() => {
+ expect(q.workersList().length).to.equal(0);
+ expect(q.running()).to.equal(0);
+ done();
+ });
+
+ q.push('foo', 2);
+ q.push('bar', 1);
+ q.push('baz', 0);
+ });
+
+ it('should contain the items being processed', (done) => {
+ var itemsBeingProcessed = {
+ 'foo': [
+ {data: 'bar', priority: 1},
+ {data: 'foo', priority: 2}
+ ],
+ 'foo_cb': [
+ {data: 'foo', priority: 2}
+ ],
+ 'bar': [
+ {data: 'baz', priority: 0},
+ {data: 'bar', priority: 1}
+ ],
+ 'bar_cb': [
+ {data: 'bar', priority: 1},
+ {data: 'foo', priority: 2}
+ ],
+ 'baz': [
+ {data: 'baz', priority: 0}
+ ],
+ 'baz_cb': [
+ {data: 'baz', priority: 0},
+ {data: 'bar', priority: 1}
+ ]
+ };
+
+ function getWorkersListData(q) {
+ return q.workersList().map(({data, priority}) => {
+ return {data, priority};
+ });
+ }
+
+ var q = async.priorityQueue((task, cb) => {
+ expect(
+ getWorkersListData(q)
+ ).to.eql(itemsBeingProcessed[task]);
+ expect(q.workersList().length).to.equal(q.running());
+ async.setImmediate(() => {
+ expect(
+ getWorkersListData(q)
+ ).to.eql(itemsBeingProcessed[task+'_cb']);
+ expect(q.workersList().length).to.equal(q.running());
+ cb();
+ });
+ }, 2);
+
+ q.drain(() => {
+ expect(q.workersList()).to.eql([]);
+ expect(q.workersList().length).to.equal(q.running());
+ done();
+ });
+
+ q.push('foo', 2);
+ q.push('bar', 1);
+ q.push('baz', 0);
+ });
+ });
+
context('q.saturated(): ', () => {
it('should call the saturated callback if tasks length is concurrency', (done) => {
var calls = [];
@@ -247,6 +369,27 @@ describe('priorityQueue', () => {
});
});
+ it('should call the drain callback if receives an empty push', (done) => {
+ var call_order = [];
+
+ var q = async.priorityQueue((task, callback) => {
+ call_order.push(task);
+ callback('error', 'arg');
+ }, 1);
+
+ q.drain(() => {
+ call_order.push('drain')
+ expect(call_order).to.eql([
+ 'drain'
+ ]);
+ expect(q.length()).to.equal(0);
+ expect(q.running()).to.equal(0);
+ done();
+ });
+
+ q.push([], 1, () => { throw new Error('should not be called') });
+ });
+
it('should not call the drain callback if receives empty push and tasks are still pending', (done) => {
var call_order = [];
@@ -282,4 +425,49 @@ describe('priorityQueue', () => {
q.push([], 1, () => {});
});
+
+ it('should be iterable', (done) => {
+ var q = async.priorityQueue((data, cb) => {
+ if (data === 3) {
+ q.push(6)
+ expect([...q]).to.eql([4, 5, 6]);
+ }
+ async.setImmediate(cb);
+ });
+
+ q.push([1, 2, 3, 4, 5]);
+
+ expect([...q]).to.eql([1, 2, 3, 4, 5]);
+
+ q.drain(() => {
+ expect([...q]).to.eql([]);
+ done();
+ });
+ });
+
+ it('should error when calling unshift', () => {
+ var q = async.priorityQueue(() => {});
+ expect(() => {
+ q.unshift(1);
+ }).to.throw();
+ });
+
+ it('should error when calling unshiftAsync', () => {
+ var q = async.priorityQueue(() => {});
+ expect(() => {
+ q.unshiftAsync(1);
+ }).to.throw();
+ });
+
+ it('should error when the callback is called more than once', (done) => {
+ var q = async.priorityQueue((task, callback) => {
+ callback();
+ expect(() => {
+ callback();
+ }).to.throw();
+ done();
+ }, 2);
+
+ q.push(1);
+ });
});
diff --git a/test/queue.js b/test/queue.js
index f99e91a..615dd6b 100644
--- a/test/queue.js
+++ b/test/queue.js
@@ -170,7 +170,6 @@ describe('queue', function(){
})
q.pushAsync([3, 4]).map(p => p.then(() => calls.push('arr')))
q.drain(() => setTimeout(() => {
- console.log('drain')
expect(calls).to.eql([1, 2, 'arr', 'arr'])
done()
}))
@@ -190,7 +189,6 @@ describe('queue', function(){
})
q.unshiftAsync([3, 4]).map(p => p.then(() => calls.push('arr')))
q.drain(() => setTimeout(() => {
- console.log('drain')
expect(calls).to.eql(['arr', 'arr', 2, 1])
done()
}))