diff options
author | Alex Early <alexander.early@gmail.com> | 2016-02-24 15:27:01 -0800 |
---|---|---|
committer | Alex Early <alexander.early@gmail.com> | 2016-02-24 15:27:01 -0800 |
commit | ab96f520b50613c604937ba913d3217f4d3cb259 (patch) | |
tree | 1112087e2974eea98d4f5e3cf1c0a79c420ec05a | |
parent | 29c302e53a3e61f6ca39742082d63a2679be2eb6 (diff) | |
parent | 2fc627e89ee6b6639615725539f8945ae297245e (diff) | |
download | async-ab96f520b50613c604937ba913d3217f4d3cb259.tar.gz |
Merge pull request #1034 from suguru03/feat-priority-queue
add unsaturation event to `priorityQueue`
-rw-r--r-- | lib/priorityQueue.js | 3 | ||||
-rw-r--r-- | mocha_test/priorityQueue.js | 70 |
2 files changed, 73 insertions, 0 deletions
diff --git a/lib/priorityQueue.js b/lib/priorityQueue.js index 6cc10ed..90ca03e 100644 --- a/lib/priorityQueue.js +++ b/lib/priorityQueue.js @@ -53,6 +53,9 @@ export default function(worker, concurrency) { if (q.tasks.length === q.concurrency) { q.saturated(); } + if (q.tasks.length <= (q.concurrency - q.buffer) ) { + q.unsaturated(); + } setImmediate(q.process); }); } diff --git a/mocha_test/priorityQueue.js b/mocha_test/priorityQueue.js new file mode 100644 index 0000000..9832b43 --- /dev/null +++ b/mocha_test/priorityQueue.js @@ -0,0 +1,70 @@ +var async = require('../lib'); +var expect = require('chai').expect; + +describe('priorityQueue', function() { + context('q.unsaturated(): ',function() { + it('should have a default buffer property that equals 25% of the concurrenct rate', function(done) { + var calls = []; + var q = async.priorityQueue(function(task, cb) { + // nop + calls.push('process ' + task); + async.setImmediate(cb); + }, 10); + expect(q.buffer).to.equal(2.5); + done(); + }); + + it('should allow a user to change the buffer property', function(done) { + var calls = []; + var q = async.priorityQueue(function(task, cb) { + // nop + calls.push('process ' + task); + async.setImmediate(cb); + }, 10); + q.buffer = 4; + expect(q.buffer).to.not.equal(2.5); + expect(q.buffer).to.equal(4); + done(); + }); + + it('should call the unsaturated callback if tasks length is less than concurrency minus buffer', function(done) { + var calls = []; + var q = async.priorityQueue(function(task, cb) { + calls.push('process ' + task); + async.setImmediate(cb); + }, 10); + q.unsaturated = function() { + calls.push('unsaturated'); + }; + q.empty = function() { + expect(calls.indexOf('unsaturated')).to.be.above(-1); + setTimeout(function() { + expect(calls).eql([ + 'unsaturated', + 'unsaturated', + 'unsaturated', + 'unsaturated', + 'unsaturated', + 'process foo4', + 'process foo3', + 'process foo2', + 'process foo1', + 'process foo0', + 'foo4 cb', + 'foo3 cb', + 'foo2 cb', + 'foo1 cb', + 'foo0 cb' + ]); + done(); + }, 50); + }; + q.push('foo0', 5, function () {calls.push('foo0 cb');}); + q.push('foo1', 4, function () {calls.push('foo1 cb');}); + q.push('foo2', 3, function () {calls.push('foo2 cb');}); + q.push('foo3', 2, function () {calls.push('foo3 cb');}); + q.push('foo4', 1, function () {calls.push('foo4 cb');}); + }); + }); +}); + |