summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMathias Buus <mathiasbuus@gmail.com>2018-04-04 16:52:19 +0200
committerMathias Buus <mathiasbuus@gmail.com>2018-04-16 16:02:12 +0200
commitf64bebf2059d35299da58cf9c5ca22d68035d617 (patch)
tree0c1861d2ff8ef1eff11c7a9ecf90302867b4084b
parent5cc948b77a1452cdd8b667978c3cc1188b433b1a (diff)
downloadnode-new-f64bebf2059d35299da58cf9c5ca22d68035d617.tar.gz
stream: add pipeline and finished
PR-URL: https://github.com/nodejs/node/pull/19828 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
-rw-r--r--doc/api/errors.md6
-rw-r--r--doc/api/stream.md106
-rw-r--r--lib/internal/errors.js1
-rw-r--r--lib/internal/streams/end-of-stream.js96
-rw-r--r--lib/internal/streams/pipeline.js95
-rw-r--r--lib/stream.js5
-rw-r--r--node.gyp2
-rw-r--r--test/parallel/test-stream-finished.js123
-rw-r--r--test/parallel/test-stream-pipeline.js483
9 files changed, 917 insertions, 0 deletions
diff --git a/doc/api/errors.md b/doc/api/errors.md
index 49834414de..00d30193df 100644
--- a/doc/api/errors.md
+++ b/doc/api/errors.md
@@ -1431,6 +1431,12 @@ An attempt was made to call [`stream.pipe()`][] on a [`Writable`][] stream.
An attempt was made to call [`stream.write()`][] with a `null` chunk.
+<a id="ERR_STREAM_PREMATURE_CLOSE"></a>
+### ERR_STREAM_PREMATURE_CLOSE
+
+An error returned by `stream.finished()` and `stream.pipeline()`, when a stream
+or a pipeline ends non gracefully with no explicit error.
+
<a id="ERR_STREAM_PUSH_AFTER_EOF"></a>
### ERR_STREAM_PUSH_AFTER_EOF
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 73eb8396de..705b58a31c 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -46,6 +46,9 @@ There are four fundamental stream types within Node.js:
* [Transform][] - Duplex streams that can modify or transform the data as it
is written and read (for example [`zlib.createDeflate()`][]).
+Additionally this module includes the utility functions [pipeline][] and
+[finished][].
+
### Object Mode
All streams created by Node.js APIs operate exclusively on strings and `Buffer`
@@ -1287,6 +1290,107 @@ implementors should not override this method, but instead implement
[`readable._destroy()`][readable-_destroy].
The default implementation of `_destroy()` for `Transform` also emit `'close'`.
+### stream.finished(stream, callback)
+<!-- YAML
+added: REPLACEME
+-->
+
+* `stream` {Stream} A readable and/or writable stream.
+* `callback` {Function} A callback function that takes an optional error
+ argument.
+
+A function to get notified when a stream is no longer readable, writable
+or has experienced an error or a premature close event.
+
+```js
+const { finished } = require('stream');
+
+const rs = fs.createReadStream('archive.tar');
+
+finished(rs, (err) => {
+ if (err) {
+ console.error('Stream failed', err);
+ } else {
+ console.log('Stream is done reading');
+ }
+});
+
+rs.resume(); // drain the stream
+```
+
+Especially useful in error handling scenarios where a stream is destroyed
+prematurely (like an aborted HTTP request), and will not emit `'end'`
+or `'finish'`.
+
+The `finished` API is promisify'able as well;
+
+```js
+const finished = util.promisify(stream.finished);
+
+const rs = fs.createReadStream('archive.tar');
+
+async function run() {
+ await finished(rs);
+ console.log('Stream is done reading');
+}
+
+run().catch(console.error);
+rs.resume(); // drain the stream
+```
+
+### stream.pipeline(...streams[, callback])
+<!-- YAML
+added: REPLACEME
+-->
+
+* `...streams` {Stream} Two or more streams to pipe between.
+* `callback` {Function} A callback function that takes an optional error
+ argument.
+
+A module method to pipe between streams forwarding errors and properly cleaning
+up and provide a callback when the pipeline is complete.
+
+```js
+const { pipeline } = require('stream');
+const fs = require('fs');
+const zlib = require('zlib');
+
+// Use the pipeline API to easily pipe a series of streams
+// together and get notified when the pipeline is fully done.
+
+// A pipeline to gzip a potentially huge tar file efficiently:
+
+pipeline(
+ fs.createReadStream('archive.tar'),
+ zlib.createGzip(),
+ fs.createWriteStream('archive.tar.gz'),
+ (err) => {
+ if (err) {
+ console.error('Pipeline failed', err);
+ } else {
+ console.log('Pipeline succeeded');
+ }
+ }
+);
+```
+
+The `pipeline` API is promisify'able as well:
+
+```js
+const pipeline = util.promisify(stream.pipeline);
+
+async function run() {
+ await pipeline(
+ fs.createReadStream('archive.tar'),
+ zlib.createGzip(),
+ fs.createWriteStream('archive.tar.gz')
+ );
+ console.log('Pipeline succeeded');
+}
+
+run().catch(console.error);
+```
+
## API for Stream Implementers
<!--type=misc-->
@@ -2397,6 +2501,8 @@ contain multi-byte characters.
[http-incoming-message]: http.html#http_class_http_incomingmessage
[zlib]: zlib.html
[hwm-gotcha]: #stream_highwatermark_discrepancy_after_calling_readable_setencoding
+[pipeline]: #stream_stream_pipeline_streams_callback
+[finished]: #stream_stream_finished_stream_callback
[stream-_flush]: #stream_transform_flush_callback
[stream-_read]: #stream_readable_read_size_1
[stream-_transform]: #stream_transform_transform_chunk_encoding_callback
diff --git a/lib/internal/errors.js b/lib/internal/errors.js
index 8505ec39b6..2bc2c7bce5 100644
--- a/lib/internal/errors.js
+++ b/lib/internal/errors.js
@@ -961,6 +961,7 @@ E('ERR_STDOUT_CLOSE', 'process.stdout cannot be closed', Error);
E('ERR_STREAM_CANNOT_PIPE', 'Cannot pipe, not readable', Error);
E('ERR_STREAM_DESTROYED', 'Cannot call %s after a stream was destroyed', Error);
E('ERR_STREAM_NULL_VALUES', 'May not write null values to stream', TypeError);
+E('ERR_STREAM_PREMATURE_CLOSE', 'Premature close', Error);
E('ERR_STREAM_PUSH_AFTER_EOF', 'stream.push() after EOF', Error);
E('ERR_STREAM_UNSHIFT_AFTER_END_EVENT',
'stream.unshift() after end event', Error);
diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js
new file mode 100644
index 0000000000..eeb8a61456
--- /dev/null
+++ b/lib/internal/streams/end-of-stream.js
@@ -0,0 +1,96 @@
+// Ported from https://github.com/mafintosh/end-of-stream with
+// permission from the author, Mathias Buus (@mafintosh).
+
+'use strict';
+
+const {
+ ERR_STREAM_PREMATURE_CLOSE
+} = require('internal/errors').codes;
+
+function noop() {}
+
+function isRequest(stream) {
+ return stream.setHeader && typeof stream.abort === 'function';
+}
+
+function once(callback) {
+ let called = false;
+ return function(err) {
+ if (called) return;
+ called = true;
+ callback.call(this, err);
+ };
+}
+
+function eos(stream, opts, callback) {
+ if (typeof opts === 'function') return eos(stream, null, opts);
+ if (!opts) opts = {};
+
+ callback = once(callback || noop);
+
+ const ws = stream._writableState;
+ const rs = stream._readableState;
+ let readable = opts.readable || (opts.readable !== false && stream.readable);
+ let writable = opts.writable || (opts.writable !== false && stream.writable);
+
+ const onlegacyfinish = () => {
+ if (!stream.writable) onfinish();
+ };
+
+ const onfinish = () => {
+ writable = false;
+ if (!readable) callback.call(stream);
+ };
+
+ const onend = () => {
+ readable = false;
+ if (!writable) callback.call(stream);
+ };
+
+ const onerror = (err) => {
+ callback.call(stream, err);
+ };
+
+ const onclose = () => {
+ if (readable && !(rs && rs.ended)) {
+ return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
+ }
+ if (writable && !(ws && ws.ended)) {
+ return callback.call(stream, new ERR_STREAM_PREMATURE_CLOSE());
+ }
+ };
+
+ const onrequest = () => {
+ stream.req.on('finish', onfinish);
+ };
+
+ if (isRequest(stream)) {
+ stream.on('complete', onfinish);
+ stream.on('abort', onclose);
+ if (stream.req) onrequest();
+ else stream.on('request', onrequest);
+ } else if (writable && !ws) { // legacy streams
+ stream.on('end', onlegacyfinish);
+ stream.on('close', onlegacyfinish);
+ }
+
+ stream.on('end', onend);
+ stream.on('finish', onfinish);
+ if (opts.error !== false) stream.on('error', onerror);
+ stream.on('close', onclose);
+
+ return function() {
+ stream.removeListener('complete', onfinish);
+ stream.removeListener('abort', onclose);
+ stream.removeListener('request', onrequest);
+ if (stream.req) stream.req.removeListener('finish', onfinish);
+ stream.removeListener('end', onlegacyfinish);
+ stream.removeListener('close', onlegacyfinish);
+ stream.removeListener('finish', onfinish);
+ stream.removeListener('end', onend);
+ stream.removeListener('error', onerror);
+ stream.removeListener('close', onclose);
+ };
+}
+
+module.exports = eos;
diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
new file mode 100644
index 0000000000..7e87210a77
--- /dev/null
+++ b/lib/internal/streams/pipeline.js
@@ -0,0 +1,95 @@
+// Ported from https://github.com/mafintosh/pump with
+// permission from the author, Mathias Buus (@mafintosh).
+
+'use strict';
+
+const eos = require('internal/streams/end-of-stream');
+
+const {
+ ERR_MISSING_ARGS,
+ ERR_STREAM_DESTROYED
+} = require('internal/errors').codes;
+
+function once(callback) {
+ let called = false;
+ return function(err) {
+ if (called) return;
+ called = true;
+ callback(err);
+ };
+}
+
+function noop() {}
+
+function isRequest(stream) {
+ return stream.setHeader && typeof stream.abort === 'function';
+}
+
+function destroyer(stream, reading, writing, callback) {
+ callback = once(callback);
+
+ let closed = false;
+ stream.on('close', () => {
+ closed = true;
+ });
+
+ eos(stream, { readable: reading, writable: writing }, (err) => {
+ if (err) return callback(err);
+ closed = true;
+ callback();
+ });
+
+ let destroyed = false;
+ return (err) => {
+ if (closed) return;
+ if (destroyed) return;
+ destroyed = true;
+
+ // request.destroy just do .end - .abort is what we want
+ if (isRequest(stream)) return stream.abort();
+ if (typeof stream.destroy === 'function') return stream.destroy();
+
+ callback(err || new ERR_STREAM_DESTROYED('pipe'));
+ };
+}
+
+function call(fn) {
+ fn();
+}
+
+function pipe(from, to) {
+ return from.pipe(to);
+}
+
+function popCallback(streams) {
+ if (!streams.length) return noop;
+ if (typeof streams[streams.length - 1] !== 'function') return noop;
+ return streams.pop();
+}
+
+function pipeline(...streams) {
+ const callback = popCallback(streams);
+
+ if (Array.isArray(streams[0])) streams = streams[0];
+
+ if (streams.length < 2) {
+ throw new ERR_MISSING_ARGS('streams');
+ }
+
+ let error;
+ const destroys = streams.map(function(stream, i) {
+ const reading = i < streams.length - 1;
+ const writing = i > 0;
+ return destroyer(stream, reading, writing, function(err) {
+ if (!error) error = err;
+ if (err) destroys.forEach(call);
+ if (reading) return;
+ destroys.forEach(call);
+ callback(error);
+ });
+ });
+
+ return streams.reduce(pipe);
+}
+
+module.exports = pipeline;
diff --git a/lib/stream.js b/lib/stream.js
index ba056026d8..7c235108c0 100644
--- a/lib/stream.js
+++ b/lib/stream.js
@@ -22,6 +22,8 @@
'use strict';
const { Buffer } = require('buffer');
+const pipeline = require('internal/streams/pipeline');
+const eos = require('internal/streams/end-of-stream');
// Note: export Stream before Readable/Writable/Duplex/...
// to avoid a cross-reference(require) issues
@@ -33,6 +35,9 @@ Stream.Duplex = require('_stream_duplex');
Stream.Transform = require('_stream_transform');
Stream.PassThrough = require('_stream_passthrough');
+Stream.pipeline = pipeline;
+Stream.finished = eos;
+
// Backwards-compat with node 0.4.x
Stream.Stream = Stream;
diff --git a/node.gyp b/node.gyp
index e2e6842c4f..7ca158c25f 100644
--- a/node.gyp
+++ b/node.gyp
@@ -154,6 +154,8 @@
'lib/internal/streams/legacy.js',
'lib/internal/streams/destroy.js',
'lib/internal/streams/state.js',
+ 'lib/internal/streams/pipeline.js',
+ 'lib/internal/streams/end-of-stream.js',
'lib/internal/wrap_js_stream.js',
'deps/v8/tools/splaytree.js',
'deps/v8/tools/codemap.js',
diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js
new file mode 100644
index 0000000000..2b0c156eb0
--- /dev/null
+++ b/test/parallel/test-stream-finished.js
@@ -0,0 +1,123 @@
+'use strict';
+
+const common = require('../common');
+const { Writable, Readable, Transform, finished } = require('stream');
+const assert = require('assert');
+const fs = require('fs');
+const { promisify } = require('util');
+
+common.crashOnUnhandledRejection();
+
+{
+ const rs = new Readable({
+ read() {}
+ });
+
+ finished(rs, common.mustCall((err) => {
+ assert(!err, 'no error');
+ }));
+
+ rs.push(null);
+ rs.resume();
+}
+
+{
+ const ws = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ finished(ws, common.mustCall((err) => {
+ assert(!err, 'no error');
+ }));
+
+ ws.end();
+}
+
+{
+ const tr = new Transform({
+ transform(data, enc, cb) {
+ cb();
+ }
+ });
+
+ let finish = false;
+ let ended = false;
+
+ tr.on('end', () => {
+ ended = true;
+ });
+
+ tr.on('finish', () => {
+ finish = true;
+ });
+
+ finished(tr, common.mustCall((err) => {
+ assert(!err, 'no error');
+ assert(finish);
+ assert(ended);
+ }));
+
+ tr.end();
+ tr.resume();
+}
+
+{
+ const rs = fs.createReadStream(__filename);
+
+ rs.resume();
+ finished(rs, common.mustCall());
+}
+
+{
+ const finishedPromise = promisify(finished);
+
+ async function run() {
+ const rs = fs.createReadStream(__filename);
+ const done = common.mustCall();
+
+ let ended = false;
+ rs.resume();
+ rs.on('end', () => {
+ ended = true;
+ });
+ await finishedPromise(rs);
+ assert(ended);
+ done();
+ }
+
+ run();
+}
+
+{
+ const rs = fs.createReadStream('file-does-not-exist');
+
+ finished(rs, common.mustCall((err) => {
+ assert.strictEqual(err.code, 'ENOENT');
+ }));
+}
+
+{
+ const rs = new Readable();
+
+ finished(rs, common.mustCall((err) => {
+ assert(!err, 'no error');
+ }));
+
+ rs.push(null);
+ rs.emit('close'); // should not trigger an error
+ rs.resume();
+}
+
+{
+ const rs = new Readable();
+
+ finished(rs, common.mustCall((err) => {
+ assert(err, 'premature close error');
+ }));
+
+ rs.emit('close'); // should trigger error
+ rs.push(null);
+ rs.resume();
+}
diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js
new file mode 100644
index 0000000000..e63ee2ed11
--- /dev/null
+++ b/test/parallel/test-stream-pipeline.js
@@ -0,0 +1,483 @@
+'use strict';
+
+const common = require('../common');
+if (!common.hasCrypto)
+ common.skip('missing crypto');
+const { Stream, Writable, Readable, Transform, pipeline } = require('stream');
+const assert = require('assert');
+const http = require('http');
+const http2 = require('http2');
+const { promisify } = require('util');
+
+common.crashOnUnhandledRejection();
+
+{
+ let finished = false;
+ const processed = [];
+ const expected = [
+ Buffer.from('a'),
+ Buffer.from('b'),
+ Buffer.from('c')
+ ];
+
+ const read = new Readable({
+ read() {}
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ processed.push(data);
+ cb();
+ }
+ });
+
+ write.on('finish', () => {
+ finished = true;
+ });
+
+ for (let i = 0; i < expected.length; i++) {
+ read.push(expected[i]);
+ }
+ read.push(null);
+
+ pipeline(read, write, common.mustCall((err) => {
+ assert.ok(!err, 'no error');
+ assert.ok(finished);
+ assert.deepStrictEqual(processed, expected);
+ }));
+}
+
+{
+ const read = new Readable({
+ read() {}
+ });
+
+ assert.throws(() => {
+ pipeline(read, () => {});
+ }, /ERR_MISSING_ARGS/);
+ assert.throws(() => {
+ pipeline(() => {});
+ }, /ERR_MISSING_ARGS/);
+ assert.throws(() => {
+ pipeline();
+ }, /ERR_MISSING_ARGS/);
+}
+
+{
+ const read = new Readable({
+ read() {}
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ read.push('data');
+ setImmediate(() => read.destroy());
+
+ pipeline(read, write, common.mustCall((err) => {
+ assert.ok(err, 'should have an error');
+ }));
+}
+
+{
+ const read = new Readable({
+ read() {}
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ read.push('data');
+ setImmediate(() => read.destroy(new Error('kaboom')));
+
+ const dst = pipeline(read, write, common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('kaboom'));
+ }));
+
+ assert.strictEqual(dst, write);
+}
+
+{
+ const read = new Readable({
+ read() {}
+ });
+
+ const transform = new Transform({
+ transform(data, enc, cb) {
+ cb(new Error('kaboom'));
+ }
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ read.on('close', common.mustCall());
+ transform.on('close', common.mustCall());
+ write.on('close', common.mustCall());
+
+ const dst = pipeline(read, transform, write, common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('kaboom'));
+ }));
+
+ assert.strictEqual(dst, write);
+
+ read.push('hello');
+}
+
+{
+ const server = http.createServer((req, res) => {
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ rs.push(null);
+ }
+ });
+
+ pipeline(rs, res);
+ });
+
+ server.listen(0, () => {
+ const req = http.request({
+ port: server.address().port
+ });
+
+ req.end();
+ req.on('response', (res) => {
+ const buf = [];
+ res.on('data', (data) => buf.push(data));
+ res.on('end', common.mustCall(() => {
+ assert.deepStrictEqual(
+ Buffer.concat(buf),
+ Buffer.from('hello')
+ );
+ server.close();
+ }));
+ });
+ });
+}
+
+{
+ const server = http.createServer((req, res) => {
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ },
+ destroy: common.mustCall((err, cb) => {
+ // prevents fd leaks by destroying http pipelines
+ cb();
+ })
+ });
+
+ pipeline(rs, res);
+ });
+
+ server.listen(0, () => {
+ const req = http.request({
+ port: server.address().port
+ });
+
+ req.end();
+ req.on('response', (res) => {
+ setImmediate(() => {
+ res.destroy();
+ server.close();
+ });
+ });
+ });
+}
+
+{
+ const server = http.createServer((req, res) => {
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ },
+ destroy: common.mustCall((err, cb) => {
+ cb();
+ })
+ });
+
+ pipeline(rs, res);
+ });
+
+ let cnt = 10;
+
+ const badSink = new Writable({
+ write(data, enc, cb) {
+ cnt--;
+ if (cnt === 0) cb(new Error('kaboom'));
+ else cb();
+ }
+ });
+
+ server.listen(0, () => {
+ const req = http.request({
+ port: server.address().port
+ });
+
+ req.end();
+ req.on('response', (res) => {
+ pipeline(res, badSink, common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('kaboom'));
+ server.close();
+ }));
+ });
+ });
+}
+
+{
+ const server = http.createServer((req, res) => {
+ pipeline(req, res, common.mustCall());
+ });
+
+ server.listen(0, () => {
+ const req = http.request({
+ port: server.address().port
+ });
+
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ }
+ });
+
+ pipeline(rs, req, common.mustCall(() => {
+ server.close();
+ }));
+
+ req.on('response', (res) => {
+ let cnt = 10;
+ res.on('data', () => {
+ cnt--;
+ if (cnt === 0) rs.destroy();
+ });
+ });
+ });
+}
+
+{
+ const server = http2.createServer((req, res) => {
+ pipeline(req, res, common.mustCall());
+ });
+
+ server.listen(0, () => {
+ const url = `http://localhost:${server.address().port}`;
+ const client = http2.connect(url);
+ const req = client.request({ ':method': 'POST' });
+
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ }
+ });
+
+ pipeline(rs, req, common.mustCall((err) => {
+ // TODO: this is working around an http2 bug
+ // where the client keeps the event loop going
+ // (replacing the rs.destroy() with req.end()
+ // exits it so seems to be a destroy bug there
+ client.unref();
+
+ server.close();
+ client.close();
+ }));
+
+ let cnt = 10;
+ req.on('data', (data) => {
+ cnt--;
+ if (cnt === 0) rs.destroy();
+ });
+ });
+}
+
+{
+ const makeTransform = () => {
+ const tr = new Transform({
+ transform(data, enc, cb) {
+ cb(null, data);
+ }
+ });
+
+ tr.on('close', common.mustCall());
+ return tr;
+ };
+
+ const rs = new Readable({
+ read() {
+ rs.push('hello');
+ }
+ });
+
+ let cnt = 10;
+
+ const ws = new Writable({
+ write(data, enc, cb) {
+ cnt--;
+ if (cnt === 0) return cb(new Error('kaboom'));
+ cb();
+ }
+ });
+
+ rs.on('close', common.mustCall());
+ ws.on('close', common.mustCall());
+
+ pipeline(
+ rs,
+ makeTransform(),
+ makeTransform(),
+ makeTransform(),
+ makeTransform(),
+ makeTransform(),
+ makeTransform(),
+ ws,
+ common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('kaboom'));
+ })
+ );
+}
+
+{
+ const oldStream = new Stream();
+
+ oldStream.pause = oldStream.resume = () => {};
+ oldStream.write = (data) => {
+ oldStream.emit('data', data);
+ return true;
+ };
+ oldStream.end = () => {
+ oldStream.emit('end');
+ };
+
+ const expected = [
+ Buffer.from('hello'),
+ Buffer.from('world')
+ ];
+
+ const rs = new Readable({
+ read() {
+ for (let i = 0; i < expected.length; i++) {
+ rs.push(expected[i]);
+ }
+ rs.push(null);
+ }
+ });
+
+ const ws = new Writable({
+ write(data, enc, cb) {
+ assert.deepStrictEqual(data, expected.shift());
+ cb();
+ }
+ });
+
+ let finished = false;
+
+ ws.on('finish', () => {
+ finished = true;
+ });
+
+ pipeline(
+ rs,
+ oldStream,
+ ws,
+ common.mustCall((err) => {
+ assert(!err, 'no error');
+ assert(finished, 'last stream finished');
+ })
+ );
+}
+
+{
+ const oldStream = new Stream();
+
+ oldStream.pause = oldStream.resume = () => {};
+ oldStream.write = (data) => {
+ oldStream.emit('data', data);
+ return true;
+ };
+ oldStream.end = () => {
+ oldStream.emit('end');
+ };
+
+ const destroyableOldStream = new Stream();
+
+ destroyableOldStream.pause = destroyableOldStream.resume = () => {};
+ destroyableOldStream.destroy = common.mustCall(() => {
+ destroyableOldStream.emit('close');
+ });
+ destroyableOldStream.write = (data) => {
+ destroyableOldStream.emit('data', data);
+ return true;
+ };
+ destroyableOldStream.end = () => {
+ destroyableOldStream.emit('end');
+ };
+
+ const rs = new Readable({
+ read() {
+ rs.destroy(new Error('stop'));
+ }
+ });
+
+ const ws = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ let finished = false;
+
+ ws.on('finish', () => {
+ finished = true;
+ });
+
+ pipeline(
+ rs,
+ oldStream,
+ destroyableOldStream,
+ ws,
+ common.mustCall((err) => {
+ assert.deepStrictEqual(err, new Error('stop'));
+ assert(!finished, 'should not finish');
+ })
+ );
+}
+
+{
+ const pipelinePromise = promisify(pipeline);
+
+ async function run() {
+ const read = new Readable({
+ read() {}
+ });
+
+ const write = new Writable({
+ write(data, enc, cb) {
+ cb();
+ }
+ });
+
+ read.push('data');
+ read.push(null);
+
+ let finished = false;
+
+ write.on('finish', () => {
+ finished = true;
+ });
+
+ await pipelinePromise(read, write);
+
+ assert(finished);
+ }
+
+ run();
+}