diff options
author | Mathias Buus <mathiasbuus@gmail.com> | 2018-04-04 16:52:19 +0200 |
---|---|---|
committer | Mathias Buus <mathiasbuus@gmail.com> | 2018-04-16 16:02:12 +0200 |
commit | f64bebf2059d35299da58cf9c5ca22d68035d617 (patch) | |
tree | 0c1861d2ff8ef1eff11c7a9ecf90302867b4084b /test/parallel/test-stream-pipeline.js | |
parent | 5cc948b77a1452cdd8b667978c3cc1188b433b1a (diff) | |
download | node-new-f64bebf2059d35299da58cf9c5ca22d68035d617.tar.gz |
stream: add pipeline and finished
PR-URL: https://github.com/nodejs/node/pull/19828
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: James M Snell <jasnell@gmail.com>
Diffstat (limited to 'test/parallel/test-stream-pipeline.js')
-rw-r--r-- | test/parallel/test-stream-pipeline.js | 483 |
1 files changed, 483 insertions, 0 deletions
diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js new file mode 100644 index 0000000000..e63ee2ed11 --- /dev/null +++ b/test/parallel/test-stream-pipeline.js @@ -0,0 +1,483 @@ +'use strict'; + +const common = require('../common'); +if (!common.hasCrypto) + common.skip('missing crypto'); +const { Stream, Writable, Readable, Transform, pipeline } = require('stream'); +const assert = require('assert'); +const http = require('http'); +const http2 = require('http2'); +const { promisify } = require('util'); + +common.crashOnUnhandledRejection(); + +{ + let finished = false; + const processed = []; + const expected = [ + Buffer.from('a'), + Buffer.from('b'), + Buffer.from('c') + ]; + + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + write(data, enc, cb) { + processed.push(data); + cb(); + } + }); + + write.on('finish', () => { + finished = true; + }); + + for (let i = 0; i < expected.length; i++) { + read.push(expected[i]); + } + read.push(null); + + pipeline(read, write, common.mustCall((err) => { + assert.ok(!err, 'no error'); + assert.ok(finished); + assert.deepStrictEqual(processed, expected); + })); +} + +{ + const read = new Readable({ + read() {} + }); + + assert.throws(() => { + pipeline(read, () => {}); + }, /ERR_MISSING_ARGS/); + assert.throws(() => { + pipeline(() => {}); + }, /ERR_MISSING_ARGS/); + assert.throws(() => { + pipeline(); + }, /ERR_MISSING_ARGS/); +} + +{ + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.push('data'); + setImmediate(() => read.destroy()); + + pipeline(read, write, common.mustCall((err) => { + assert.ok(err, 'should have an error'); + })); +} + +{ + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.push('data'); + setImmediate(() => read.destroy(new Error('kaboom'))); + + const dst = pipeline(read, write, common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + })); + + assert.strictEqual(dst, write); +} + +{ + const read = new Readable({ + read() {} + }); + + const transform = new Transform({ + transform(data, enc, cb) { + cb(new Error('kaboom')); + } + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.on('close', common.mustCall()); + transform.on('close', common.mustCall()); + write.on('close', common.mustCall()); + + const dst = pipeline(read, transform, write, common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + })); + + assert.strictEqual(dst, write); + + read.push('hello'); +} + +{ + const server = http.createServer((req, res) => { + const rs = new Readable({ + read() { + rs.push('hello'); + rs.push(null); + } + }); + + pipeline(rs, res); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + req.end(); + req.on('response', (res) => { + const buf = []; + res.on('data', (data) => buf.push(data)); + res.on('end', common.mustCall(() => { + assert.deepStrictEqual( + Buffer.concat(buf), + Buffer.from('hello') + ); + server.close(); + })); + }); + }); +} + +{ + const server = http.createServer((req, res) => { + const rs = new Readable({ + read() { + rs.push('hello'); + }, + destroy: common.mustCall((err, cb) => { + // prevents fd leaks by destroying http pipelines + cb(); + }) + }); + + pipeline(rs, res); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + req.end(); + req.on('response', (res) => { + setImmediate(() => { + res.destroy(); + server.close(); + }); + }); + }); +} + +{ + const server = http.createServer((req, res) => { + const rs = new Readable({ + read() { + rs.push('hello'); + }, + destroy: common.mustCall((err, cb) => { + cb(); + }) + }); + + pipeline(rs, res); + }); + + let cnt = 10; + + const badSink = new Writable({ + write(data, enc, cb) { + cnt--; + if (cnt === 0) cb(new Error('kaboom')); + else cb(); + } + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + req.end(); + req.on('response', (res) => { + pipeline(res, badSink, common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + server.close(); + })); + }); + }); +} + +{ + const server = http.createServer((req, res) => { + pipeline(req, res, common.mustCall()); + }); + + server.listen(0, () => { + const req = http.request({ + port: server.address().port + }); + + const rs = new Readable({ + read() { + rs.push('hello'); + } + }); + + pipeline(rs, req, common.mustCall(() => { + server.close(); + })); + + req.on('response', (res) => { + let cnt = 10; + res.on('data', () => { + cnt--; + if (cnt === 0) rs.destroy(); + }); + }); + }); +} + +{ + const server = http2.createServer((req, res) => { + pipeline(req, res, common.mustCall()); + }); + + server.listen(0, () => { + const url = `http://localhost:${server.address().port}`; + const client = http2.connect(url); + const req = client.request({ ':method': 'POST' }); + + const rs = new Readable({ + read() { + rs.push('hello'); + } + }); + + pipeline(rs, req, common.mustCall((err) => { + // TODO: this is working around an http2 bug + // where the client keeps the event loop going + // (replacing the rs.destroy() with req.end() + // exits it so seems to be a destroy bug there + client.unref(); + + server.close(); + client.close(); + })); + + let cnt = 10; + req.on('data', (data) => { + cnt--; + if (cnt === 0) rs.destroy(); + }); + }); +} + +{ + const makeTransform = () => { + const tr = new Transform({ + transform(data, enc, cb) { + cb(null, data); + } + }); + + tr.on('close', common.mustCall()); + return tr; + }; + + const rs = new Readable({ + read() { + rs.push('hello'); + } + }); + + let cnt = 10; + + const ws = new Writable({ + write(data, enc, cb) { + cnt--; + if (cnt === 0) return cb(new Error('kaboom')); + cb(); + } + }); + + rs.on('close', common.mustCall()); + ws.on('close', common.mustCall()); + + pipeline( + rs, + makeTransform(), + makeTransform(), + makeTransform(), + makeTransform(), + makeTransform(), + makeTransform(), + ws, + common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('kaboom')); + }) + ); +} + +{ + const oldStream = new Stream(); + + oldStream.pause = oldStream.resume = () => {}; + oldStream.write = (data) => { + oldStream.emit('data', data); + return true; + }; + oldStream.end = () => { + oldStream.emit('end'); + }; + + const expected = [ + Buffer.from('hello'), + Buffer.from('world') + ]; + + const rs = new Readable({ + read() { + for (let i = 0; i < expected.length; i++) { + rs.push(expected[i]); + } + rs.push(null); + } + }); + + const ws = new Writable({ + write(data, enc, cb) { + assert.deepStrictEqual(data, expected.shift()); + cb(); + } + }); + + let finished = false; + + ws.on('finish', () => { + finished = true; + }); + + pipeline( + rs, + oldStream, + ws, + common.mustCall((err) => { + assert(!err, 'no error'); + assert(finished, 'last stream finished'); + }) + ); +} + +{ + const oldStream = new Stream(); + + oldStream.pause = oldStream.resume = () => {}; + oldStream.write = (data) => { + oldStream.emit('data', data); + return true; + }; + oldStream.end = () => { + oldStream.emit('end'); + }; + + const destroyableOldStream = new Stream(); + + destroyableOldStream.pause = destroyableOldStream.resume = () => {}; + destroyableOldStream.destroy = common.mustCall(() => { + destroyableOldStream.emit('close'); + }); + destroyableOldStream.write = (data) => { + destroyableOldStream.emit('data', data); + return true; + }; + destroyableOldStream.end = () => { + destroyableOldStream.emit('end'); + }; + + const rs = new Readable({ + read() { + rs.destroy(new Error('stop')); + } + }); + + const ws = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + let finished = false; + + ws.on('finish', () => { + finished = true; + }); + + pipeline( + rs, + oldStream, + destroyableOldStream, + ws, + common.mustCall((err) => { + assert.deepStrictEqual(err, new Error('stop')); + assert(!finished, 'should not finish'); + }) + ); +} + +{ + const pipelinePromise = promisify(pipeline); + + async function run() { + const read = new Readable({ + read() {} + }); + + const write = new Writable({ + write(data, enc, cb) { + cb(); + } + }); + + read.push('data'); + read.push(null); + + let finished = false; + + write.on('finish', () => { + finished = true; + }); + + await pipelinePromise(read, write); + + assert(finished); + } + + run(); +} |