From e6421579841cf78da5e19c9a7964e1e4f8dcb153 Mon Sep 17 00:00:00 2001 From: Ernie Casilla Date: Fri, 19 Feb 2016 20:48:46 -0500 Subject: added a base implementation for unsaturation event #868 updating the README fix readme --- README.md | 5 +++-- lib/internal/queue.js | 5 +++++ mocha_test/queue.js | 49 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 57 insertions(+), 2 deletions(-) create mode 100644 mocha_test/queue.js diff --git a/README.md b/README.md index 70f68f0..9763875 100644 --- a/README.md +++ b/README.md @@ -1191,8 +1191,9 @@ methods: the `worker` has finished processing the task. Instead of a single task, a `tasks` array can be submitted. The respective callback is used for every task in the list. * `unshift(task, [callback])` - add a new task to the front of the `queue`. -* `saturated` - a callback that is called when the `queue` length hits the `concurrency` limit, - and further tasks will be queued. +* `saturated` - a callback that is called when the `queue` length hits the `concurrency` limit, and further tasks will be queued. +* `unsaturated` - a callback that is called when the `queue` length is less than the `concurrency` & `buffer` limits, and further tasks will not be queued. +* `buffer` A minimum threshold buffer in order to say that the `queue` is `unsaturated`. * `empty` - a callback that is called when the last item from the `queue` is given to a `worker`. * `drain` - a callback that is called when the last item from the `queue` has returned from the `worker`. * `paused` - a boolean for determining whether the queue is in a paused state diff --git a/lib/internal/queue.js b/lib/internal/queue.js index 87c99f8..3e7eb1e 100644 --- a/lib/internal/queue.js +++ b/lib/internal/queue.js @@ -45,6 +45,9 @@ export default function queue(worker, concurrency, payload) { if (q.tasks.length === q.concurrency) { q.saturated(); } + if (q.tasks.length <= (q.concurrency - q.buffer) ) { + q.unsaturated(); + } }); setImmediate(q.process); } @@ -78,6 +81,8 @@ export default function queue(worker, concurrency, payload) { concurrency: concurrency, payload: payload, saturated: noop, + unsaturated:noop, + buffer: concurrency / 4, empty: noop, drain: noop, started: false, diff --git a/mocha_test/queue.js b/mocha_test/queue.js new file mode 100644 index 0000000..fab423a --- /dev/null +++ b/mocha_test/queue.js @@ -0,0 +1,49 @@ +var async = require('../lib'); +var expect = require('chai').expect; + + +describe('queue', function(){ + context('q.unsaturated(): ',function() { + it('should have a default buffer property that equals 25% of the concurrenct rate', function(done){ + var q = async.queue(function(task, cb) { + // nop + calls.push('process ' + task); + async.setImmediate(cb); + }, 10); + expect(q.buffer).to.equal(2.5); + done(); + }); + it('should allow a user to change the buffer property', function(done){ + var q = async.queue(function(task, cb) { + // nop + calls.push('process ' + task); + async.setImmediate(cb); + }, 10); + q.buffer = 4; + expect(q.buffer).to.not.equal(2.5); + expect(q.buffer).to.equal(4); + done(); + }); + it('should call the unsaturated callback if tasks length is less than concurrency minus buffer', function(done){ + var calls = []; + var q = async.queue(function(task, cb) { + // nop + calls.push('process ' + task); + async.setImmediate(cb); + }, 10); + q.unsaturated = function() { + calls.push('unsaturated'); + }; + q.empty = function() { + expect(calls.indexOf('unsaturated')).to.be.above(-1); + done(); + }; + q.push('foo0', function () {calls.push('foo0 cb');}); + q.push('foo1', function () {calls.push('foo1 cb');}); + q.push('foo2', function () {calls.push('foo2 cb');}); + q.push('foo3', function () {calls.push('foo3 cb');}); + q.push('foo4', function () {calls.push('foo4 cb');}); + }); + }); +}); + -- cgit v1.2.1