summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHubert Argasinski <argasinski.hubert@gmail.com>2017-06-10 23:14:13 -0400
committerGitHub <noreply@github.com>2017-06-10 23:14:13 -0400
commit0364ff3ac9727b69aefaab25999a491c8fa34fcd (patch)
treee6fe787c2d90e940ab64057ba0671ad1e05edb8a
parentec9dab73153028e4abdd0216f55496b8bb6f4054 (diff)
downloadasync-0364ff3ac9727b69aefaab25999a491c8fa34fcd.tar.gz
ensure q.workersList() contains items being processed [fixes #1428] (#1429)
* ensure q.workersList() contains items being processed [fixes #1428] * remove newline * improve q.workersList() test
-rw-r--r--lib/internal/queue.js5
-rw-r--r--mocha_test/cargo.js55
-rw-r--r--mocha_test/queue.js65
3 files changed, 120 insertions, 5 deletions
diff --git a/lib/internal/queue.js b/lib/internal/queue.js
index e357447..5e8d4d7 100644
--- a/lib/internal/queue.js
+++ b/lib/internal/queue.js
@@ -55,9 +55,10 @@ export default function queue(worker, concurrency, payload) {
for (var i = 0, l = tasks.length; i < l; i++) {
var task = tasks[i];
+
var index = indexOf(workersList, task, 0);
if (index >= 0) {
- workersList.splice(index)
+ workersList.splice(index, 1);
}
task.callback.apply(task, arguments);
@@ -118,11 +119,11 @@ export default function queue(worker, concurrency, payload) {
for (var i = 0; i < l; i++) {
var node = q._tasks.shift();
tasks.push(node);
+ workersList.push(node);
data.push(node.data);
}
numRunning += 1;
- workersList.push(tasks[0]);
if (q._tasks.length === 0) {
q.empty();
diff --git a/mocha_test/cargo.js b/mocha_test/cargo.js
index 235b9a2..55e6e96 100644
--- a/mocha_test/cargo.js
+++ b/mocha_test/cargo.js
@@ -236,7 +236,7 @@ describe('cargo', function () {
it('expose payload', function (done) {
var called_once = false;
- var cargo= async.cargo(function(tasks, cb) {
+ var cargo = async.cargo(function(tasks, cb) {
if (!called_once) {
expect(cargo.payload).to.equal(1);
assert(tasks.length === 1, 'should start with payload = 1');
@@ -261,4 +261,57 @@ describe('cargo', function () {
}, 15);
});
+ it('workersList', function(done) {
+ var called_once = false;
+
+ function getWorkersListData(cargo) {
+ return cargo.workersList().map(function(v) {
+ return v.data;
+ });
+ }
+
+ var cargo = async.cargo(function(tasks, cb) {
+ if (!called_once) {
+ expect(tasks).to.eql(['foo', 'bar']);
+ } else {
+ expect(tasks).to.eql(['baz']);
+ }
+ expect(getWorkersListData(cargo)).to.eql(tasks);
+ async.setImmediate(function() {
+ // ensure nothing has changed
+ expect(getWorkersListData(cargo)).to.eql(tasks);
+ called_once = true;
+ cb();
+ });
+ }, 2);
+
+ cargo.drain = function() {
+ expect(cargo.workersList()).to.eql([]);
+ expect(cargo.running()).to.equal(0);
+ done();
+ };
+
+ cargo.push('foo');
+ cargo.push('bar');
+ cargo.push('baz');
+ });
+
+ it('running', function(done) {
+ var cargo = async.cargo(function(tasks, cb) {
+ expect(cargo.running()).to.equal(1);
+ async.setImmediate(function() {
+ expect(cargo.running()).to.equal(1);
+ cb();
+ });
+ }, 2);
+
+ cargo.drain = function() {
+ expect(cargo.running()).to.equal(0);
+ done();
+ };
+
+ cargo.push('foo');
+ cargo.push('bar');
+ cargo.push('baz');
+ })
});
diff --git a/mocha_test/queue.js b/mocha_test/queue.js
index 7525c30..4539c14 100644
--- a/mocha_test/queue.js
+++ b/mocha_test/queue.js
@@ -656,7 +656,7 @@ describe('queue', function(){
});
});
- context('q.unsaturated(): ',function() {
+ context('q.unsaturated(): ', function() {
it('should have a default buffer property that equals 25% of the concurrenct rate', function(done){
var calls = [];
var q = async.queue(function(task, cb) {
@@ -719,6 +719,68 @@ describe('queue', function(){
});
});
+ context('workersList', function() {
+ it('should be the same length as running()', function(done) {
+ var q = async.queue(function(task, cb) {
+ async.setImmediate(function() {
+ expect(q.workersList().length).to.equal(q.running());
+ cb();
+ });
+ }, 2);
+
+ q.drain = function() {
+ expect(q.workersList().length).to.equal(0);
+ expect(q.running()).to.equal(0);
+ done();
+ };
+
+ q.push('foo');
+ q.push('bar');
+ q.push('baz');
+ });
+
+ it('should contain the items being processed', function(done) {
+ var itemsBeingProcessed = {
+ 'foo': ['foo'],
+ 'foo_cb': ['foo', 'bar'],
+ 'bar': ['foo', 'bar'],
+ 'bar_cb': ['bar', 'baz'],
+ 'baz': ['bar', 'baz'],
+ 'baz_cb': ['baz']
+ };
+
+ function getWorkersListData(q) {
+ return q.workersList().map(function(v) {
+ return v.data;
+ });
+ }
+
+ var q = async.queue(function(task, cb) {
+ expect(
+ getWorkersListData(q)
+ ).to.eql(itemsBeingProcessed[task]);
+ expect(q.workersList().length).to.equal(q.running());
+ async.setImmediate(function() {
+ expect(
+ getWorkersListData(q)
+ ).to.eql(itemsBeingProcessed[task+'_cb']);
+ expect(q.workersList().length).to.equal(q.running());
+ cb();
+ });
+ }, 2);
+
+ q.drain = function() {
+ expect(q.workersList()).to.eql([]);
+ expect(q.workersList().length).to.equal(q.running());
+ done();
+ };
+
+ q.push('foo');
+ q.push('bar');
+ q.push('baz');
+ });
+ })
+
it('remove', function(done) {
var result = [];
var q = async.queue(function(data, cb) {
@@ -738,4 +800,3 @@ describe('queue', function(){
}
});
});
-