diff options
author | Robert Nagy <ronagy@icloud.com> | 2020-06-28 18:38:20 +0200 |
---|---|---|
committer | Robert Nagy <ronagy@icloud.com> | 2020-06-30 22:35:36 +0200 |
commit | 8f4b4f272e461b28beafb39adb20787fe7437526 (patch) | |
tree | e9d7863904f26e2b81ac9d6a8dfd8b1c5366d12c | |
parent | 204f20f2d1d5014e7f4fb2bf93a201995cc4914b (diff) | |
download | node-new-8f4b4f272e461b28beafb39adb20787fe7437526.tar.gz |
stream: destroy wrapped streams on error
Stream should be destroyed and update state accordingly when
the wrapped stream emits error.
Does some additional cleanup with future TODOs that might be worth
looking into.
PR-URL: https://github.com/nodejs/node/pull/34102
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Reviewed-By: Luigi Pinca <luigipinca@gmail.com>
Reviewed-By: Anna Henningsen <anna@addaleax.net>
-rw-r--r-- | lib/_stream_readable.js | 28 | ||||
-rw-r--r-- | test/parallel/test-stream2-readable-wrap-error.js | 32 |
2 files changed, 55 insertions, 5 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js index 952f2d75b8..d33d53de2b 100644 --- a/lib/_stream_readable.js +++ b/lib/_stream_readable.js @@ -66,7 +66,6 @@ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype); ObjectSetPrototypeOf(Readable, Stream); const { errorOrDestroy } = destroyImpl; -const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume']; function prependListener(emitter, event, fn) { // Sadly this is not cacheable as some libraries bundle their own @@ -1051,10 +1050,29 @@ Readable.prototype.wrap = function(stream) { } } - // Proxy certain important events. - for (const kProxyEvent of kProxyEvents) { - stream.on(kProxyEvent, this.emit.bind(this, kProxyEvent)); - } + stream.on('error', (err) => { + errorOrDestroy(this, err); + }); + + stream.on('close', () => { + // TODO(ronag): Update readable state? + this.emit('close'); + }); + + stream.on('destroy', () => { + // TODO(ronag): this.destroy()? + this.emit('destroy'); + }); + + stream.on('pause', () => { + // TODO(ronag): this.pause()? + this.emit('pause'); + }); + + stream.on('resume', () => { + // TODO(ronag): this.resume()? + this.emit('resume'); + }); // When we try to consume some more bytes, simply unpause the // underlying stream. diff --git a/test/parallel/test-stream2-readable-wrap-error.js b/test/parallel/test-stream2-readable-wrap-error.js new file mode 100644 index 0000000000..b56b9bc41c --- /dev/null +++ b/test/parallel/test-stream2-readable-wrap-error.js @@ -0,0 +1,32 @@ +'use strict'; +const common = require('../common'); +const assert = require('assert'); + +const Readable = require('_stream_readable'); +const EE = require('events').EventEmitter; + +const oldStream = new EE(); +oldStream.pause = () => {}; +oldStream.resume = () => {}; + +{ + const r = new Readable({ autoDestroy: true }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, true); + })); + oldStream.emit('error', new Error()); +} + +{ + const r = new Readable({ autoDestroy: false }) + .wrap(oldStream) + .on('error', common.mustCall(() => { + assert.strictEqual(r._readableState.errorEmitted, true); + assert.strictEqual(r._readableState.errored, true); + assert.strictEqual(r.destroyed, false); + })); + oldStream.emit('error', new Error()); +} |