summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2020-06-28 18:38:20 +0200
committerRobert Nagy <ronagy@icloud.com>2020-06-30 22:35:36 +0200
commit8f4b4f272e461b28beafb39adb20787fe7437526 (patch)
treee9d7863904f26e2b81ac9d6a8dfd8b1c5366d12c
parent204f20f2d1d5014e7f4fb2bf93a201995cc4914b (diff)
downloadnode-new-8f4b4f272e461b28beafb39adb20787fe7437526.tar.gz
stream: destroy wrapped streams on error
Stream should be destroyed and update state accordingly when the wrapped stream emits error. Does some additional cleanup with future TODOs that might be worth looking into. PR-URL: https://github.com/nodejs/node/pull/34102 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Luigi Pinca <luigipinca@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
-rw-r--r--lib/_stream_readable.js28
-rw-r--r--test/parallel/test-stream2-readable-wrap-error.js32
2 files changed, 55 insertions, 5 deletions
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 952f2d75b8..d33d53de2b 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -66,7 +66,6 @@ ObjectSetPrototypeOf(Readable.prototype, Stream.prototype);
ObjectSetPrototypeOf(Readable, Stream);
const { errorOrDestroy } = destroyImpl;
-const kProxyEvents = ['error', 'close', 'destroy', 'pause', 'resume'];
function prependListener(emitter, event, fn) {
// Sadly this is not cacheable as some libraries bundle their own
@@ -1051,10 +1050,29 @@ Readable.prototype.wrap = function(stream) {
}
}
- // Proxy certain important events.
- for (const kProxyEvent of kProxyEvents) {
- stream.on(kProxyEvent, this.emit.bind(this, kProxyEvent));
- }
+ stream.on('error', (err) => {
+ errorOrDestroy(this, err);
+ });
+
+ stream.on('close', () => {
+ // TODO(ronag): Update readable state?
+ this.emit('close');
+ });
+
+ stream.on('destroy', () => {
+ // TODO(ronag): this.destroy()?
+ this.emit('destroy');
+ });
+
+ stream.on('pause', () => {
+ // TODO(ronag): this.pause()?
+ this.emit('pause');
+ });
+
+ stream.on('resume', () => {
+ // TODO(ronag): this.resume()?
+ this.emit('resume');
+ });
// When we try to consume some more bytes, simply unpause the
// underlying stream.
diff --git a/test/parallel/test-stream2-readable-wrap-error.js b/test/parallel/test-stream2-readable-wrap-error.js
new file mode 100644
index 0000000000..b56b9bc41c
--- /dev/null
+++ b/test/parallel/test-stream2-readable-wrap-error.js
@@ -0,0 +1,32 @@
+'use strict';
+const common = require('../common');
+const assert = require('assert');
+
+const Readable = require('_stream_readable');
+const EE = require('events').EventEmitter;
+
+const oldStream = new EE();
+oldStream.pause = () => {};
+oldStream.resume = () => {};
+
+{
+ const r = new Readable({ autoDestroy: true })
+ .wrap(oldStream)
+ .on('error', common.mustCall(() => {
+ assert.strictEqual(r._readableState.errorEmitted, true);
+ assert.strictEqual(r._readableState.errored, true);
+ assert.strictEqual(r.destroyed, true);
+ }));
+ oldStream.emit('error', new Error());
+}
+
+{
+ const r = new Readable({ autoDestroy: false })
+ .wrap(oldStream)
+ .on('error', common.mustCall(() => {
+ assert.strictEqual(r._readableState.errorEmitted, true);
+ assert.strictEqual(r._readableState.errored, true);
+ assert.strictEqual(r.destroyed, false);
+ }));
+ oldStream.emit('error', new Error());
+}