summaryrefslogtreecommitdiff
path: root/test/cargoQueue.js
diff options
context:
space:
mode:
authorJustin Chase <justin.m.chase@gmail.com>2018-08-07 16:27:09 -0500
committerAlex Early <alexander.early@gmail.com>2018-08-07 14:27:09 -0700
commitdb49b8922b541d011d1e457b658ef8435539fdb4 (patch)
treedf9b468fdbee158bac8189331daf8bf049df39eb /test/cargoQueue.js
parent50939d7fa88523e9ee86f6bcc84c7bec0cc97bc4 (diff)
downloadasync-db49b8922b541d011d1e457b658ef8435539fdb4.tar.gz
feat: Add cargoQueue type and tests (#1567)
* Add cargo queue type and tests * remove only on tests * make failing test more deterministic * Dont define a new type
Diffstat (limited to 'test/cargoQueue.js')
-rw-r--r--test/cargoQueue.js333
1 files changed, 333 insertions, 0 deletions
diff --git a/test/cargoQueue.js b/test/cargoQueue.js
new file mode 100644
index 0000000..3ff7f18
--- /dev/null
+++ b/test/cargoQueue.js
@@ -0,0 +1,333 @@
+var async = require('../lib');
+var {expect} = require('chai');
+var assert = require('assert');
+
+describe('cargoQueue', () => {
+
+ function worker (tasks, callback) {
+ this.call_order.push('process ' + tasks.join(' '));
+ callback('error', 'arg');
+ }
+
+ it('cargoQueue', (done) => {
+ var call_order = [],
+ delays = [40, 40, 20];
+
+ // worker: --12--34--5-
+ // order of completion: 1,2,3,4,5
+
+ var c = async.cargoQueue((tasks, callback) => {
+ setTimeout(() => {
+ call_order.push('process ' + tasks.join(' '));
+ callback('error', 'arg');
+ }, delays.shift());
+ }, 2, 2);
+
+ c.push(1, (err, arg) => {
+ expect(err).to.equal('error');
+ expect(arg).to.equal('arg');
+ expect(c.length()).to.equal(2);
+ call_order.push('callback ' + 1);
+ });
+ c.push(2, (err, arg) => {
+ expect(err).to.equal('error');
+ expect(arg).to.equal('arg');
+ expect(c.length()).to.equal(2);
+ call_order.push('callback ' + 2);
+ });
+
+ expect(c.length()).to.equal(2);
+
+ // async push
+ setTimeout(() => {
+ c.push(3, (err, arg) => {
+ expect(err).to.equal('error');
+ expect(arg).to.equal('arg');
+ expect(c.length()).to.equal(0);
+ call_order.push('callback ' + 3);
+ });
+ }, 15);
+ setTimeout(() => {
+ c.push(4, (err, arg) => {
+ expect(err).to.equal('error');
+ expect(arg).to.equal('arg');
+ expect(c.length()).to.equal(0);
+ call_order.push('callback ' + 4);
+ });
+ expect(c.length()).to.equal(1);
+ c.push(5, (err, arg) => {
+ expect(err).to.equal('error');
+ expect(arg).to.equal('arg');
+ expect(c.length()).to.equal(0);
+ call_order.push('callback ' + 5);
+ });
+ }, 30);
+
+
+ c.drain = function () {
+ expect(call_order).to.eql([
+ 'process 1 2', 'callback 1', 'callback 2',
+ 'process 3', 'callback 3',
+ 'process 4 5', 'callback 4', 'callback 5'
+ ]);
+ expect(c.length()).to.equal(0);
+ done();
+ };
+ });
+
+ it('without callback', (done) => {
+ var call_order = [];
+ var c = async.cargoQueue(worker.bind({ call_order }), 2, 2);
+ c.push(1);
+ setImmediate(() => {
+ c.push(2);
+ setImmediate(() => {
+ c.push(3);
+ c.push(4);
+ setImmediate(() => {
+ c.push(5);
+ c.drain = function complete () {
+ expect(call_order).to.eql([
+ 'process 1',
+ 'process 2',
+ 'process 3 4',
+ 'process 5'
+ ]);
+ done();
+ }
+ })
+ })
+ })
+ });
+
+ it('bulk task', (done) => {
+ var call_order = [],
+ delays = [20,30];
+
+ // worker: -123-4-
+ // order of completion: 1,2,3,4
+
+ var c = async.cargoQueue((tasks, callback) => {
+ setTimeout(() => {
+ call_order.push('process ' + tasks.join(' '));
+ callback('error', tasks.join(' '));
+ }, delays.shift());
+ }, 3, 2);
+
+ c.push( [1,2,3,4], (err, arg) => {
+ expect(err).to.equal('error');
+ call_order.push('callback ' + arg);
+ });
+
+ expect(c.length()).to.equal(4);
+
+ setTimeout(() => {
+ expect(call_order).to.eql([
+ 'process 1 2', 'callback 1 2', 'callback 1 2',
+ 'process 3 4', 'callback 3 4', 'callback 3 4',
+ ]);
+ expect(c.length()).to.equal(0);
+ done();
+ }, 200);
+ });
+
+ it('drain once', (done) => {
+
+ var c = async.cargoQueue((tasks, callback) => {
+ callback();
+ }, 3, 2);
+
+ var drainCounter = 0;
+ c.drain = function () {
+ drainCounter++;
+ };
+
+ for(var i = 0; i < 10; i++){
+ c.push(i);
+ }
+
+ setTimeout(() => {
+ expect(drainCounter).to.equal(1);
+ done();
+ }, 50);
+ });
+
+ it('drain twice', (done) => {
+
+ var c = async.cargoQueue((tasks, callback) => {
+ callback();
+ }, 3, 2);
+
+ function loadCargo(){
+ for(var i = 0; i < 10; i++){
+ c.push(i);
+ }
+ }
+
+ var drainCounter = 0;
+ c.drain = function () {
+ drainCounter++;
+ };
+
+ loadCargo();
+ setTimeout(loadCargo, 50);
+
+ setTimeout(() => {
+ expect(drainCounter).to.equal(2);
+ done();
+ }, 100);
+ });
+
+ it('events', (done) => {
+ var calls = [];
+ var q = async.cargoQueue((task, cb) => {
+ // nop
+ calls.push('process ' + task);
+ async.setImmediate(cb);
+ }, 3, 1);
+
+ q.saturated = function() {
+ assert(q.running() == 3, 'cargoQueue should be saturated now');
+ calls.push('saturated');
+ };
+ q.empty = function() {
+ assert(q.length() === 0, 'cargoQueue should be empty now');
+ calls.push('empty');
+ };
+ q.drain = function() {
+ assert(
+ q.length() === 0 && q.running() === 0,
+ 'cargoQueue should be empty now and no more workers should be running'
+ );
+ calls.push('drain');
+ expect(calls).to.eql([
+ 'process foo',
+ 'process bar',
+ 'saturated',
+ 'process zoo',
+ 'foo cb',
+ 'saturated',
+ 'process poo',
+ 'bar cb',
+ 'empty',
+ 'saturated',
+ 'process moo',
+ 'zoo cb',
+ 'poo cb',
+ 'moo cb',
+ 'drain'
+ ]);
+ done();
+ };
+ q.push('foo', () => {calls.push('foo cb');});
+ q.push('bar', () => {calls.push('bar cb');});
+ q.push('zoo', () => {calls.push('zoo cb');});
+ q.push('poo', () => {calls.push('poo cb');});
+ q.push('moo', () => {calls.push('moo cb');});
+ });
+
+ it('expose payload', (done) => {
+ var called_once = false;
+ var cargo = async.cargoQueue((tasks, cb) => {
+ if (!called_once) {
+ expect(cargo.payload).to.equal(1);
+ assert(tasks.length === 1, 'should start with payload = 1');
+ } else {
+ expect(cargo.payload).to.equal(2);
+ assert(tasks.length === 2, 'next call shold have payload = 2');
+ }
+ called_once = true;
+ setTimeout(cb, 25);
+ }, 1, 1);
+
+ cargo.drain = function () {
+ done();
+ };
+
+ expect(cargo.payload).to.equal(1);
+
+ cargo.push([1, 2, 3]);
+
+ setTimeout(() => {
+ cargo.payload = 2;
+ }, 15);
+ });
+
+
+ it('expose concurrency', (done) => {
+ var called_once = false;
+ var cargo = async.cargoQueue((tasks, cb) => {
+ if (!called_once) {
+ expect(cargo.concurrency).to.equal(1);
+ } else {
+ expect(cargo.concurrency).to.equal(2);
+ }
+ called_once = true;
+ setTimeout(cb, 25);
+ }, 1, 1);
+
+ cargo.drain = function () {
+ done();
+ };
+
+ expect(cargo.concurrency).to.equal(1);
+
+ cargo.push([1, 2, 3]);
+
+ setTimeout(() => {
+ cargo.concurrency = 2;
+ }, 15);
+ });
+
+ it('workersList', (done) => {
+ var called_once = false;
+
+ function getWorkersListData(cargo) {
+ return cargo.workersList().map((v) => {
+ return v.data;
+ });
+ }
+
+ var cargo = async.cargoQueue((tasks, cb) => {
+ if (!called_once) {
+ expect(tasks).to.eql(['foo', 'bar']);
+ } else {
+ expect(tasks).to.eql(['baz']);
+ }
+ expect(getWorkersListData(cargo)).to.eql(tasks);
+ async.setImmediate(() => {
+ // ensure nothing has changed
+ expect(getWorkersListData(cargo)).to.eql(tasks);
+ called_once = true;
+ cb();
+ });
+ }, 1, 2);
+
+ cargo.drain = function() {
+ expect(cargo.workersList()).to.eql([]);
+ expect(cargo.running()).to.equal(0);
+ done();
+ };
+
+ cargo.push('foo');
+ cargo.push('bar');
+ cargo.push('baz');
+ });
+
+ it('running', (done) => {
+ var cargo = async.cargoQueue((tasks, cb) => {
+ expect(cargo.running()).to.equal(1);
+ async.setImmediate(() => {
+ expect(cargo.running()).to.equal(1);
+ cb();
+ });
+ }, 1, 1);
+
+ cargo.drain = function() {
+ expect(cargo.running()).to.equal(0);
+ done();
+ };
+
+ cargo.push(['foo', 'bar', 'baz', 'boo']);
+ })
+});