summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--lib/_stream_transform.js67
-rw-r--r--lib/_stream_writable.js27
-rw-r--r--lib/internal/streams/destroy.js106
-rw-r--r--test/parallel/test-stream-construct-async-error.js258
4 files changed, 453 insertions, 5 deletions
diff --git a/lib/_stream_transform.js b/lib/_stream_transform.js
index 564cdf0e82..2fdd0d721b 100644
--- a/lib/_stream_transform.js
+++ b/lib/_stream_transform.js
@@ -107,8 +107,10 @@ function Transform(options) {
}
function final(cb) {
+ let called = false;
if (typeof this._flush === 'function' && !this.destroyed) {
- this._flush((er, data) => {
+ const result = this._flush((er, data) => {
+ called = true;
if (er) {
if (cb) {
cb(er);
@@ -126,6 +128,33 @@ function final(cb) {
cb();
}
});
+ if (result !== undefined && result !== null) {
+ try {
+ const then = result.then;
+ if (typeof then === 'function') {
+ then.call(
+ result,
+ (data) => {
+ if (called)
+ return;
+ if (data != null)
+ this.push(data);
+ this.push(null);
+ if (cb)
+ process.nextTick(cb);
+ },
+ (err) => {
+ if (cb) {
+ process.nextTick(cb, err);
+ } else {
+ process.nextTick(() => this.destroy(err));
+ }
+ });
+ }
+ } catch (err) {
+ process.nextTick(() => this.destroy(err));
+ }
+ }
} else {
this.push(null);
if (cb) {
@@ -151,7 +180,9 @@ Transform.prototype._write = function(chunk, encoding, callback) {
const wState = this._writableState;
const length = rState.length;
- this._transform(chunk, encoding, (err, val) => {
+ let called = false;
+ const result = this._transform(chunk, encoding, (err, val) => {
+ called = true;
if (err) {
callback(err);
return;
@@ -172,6 +203,38 @@ Transform.prototype._write = function(chunk, encoding, callback) {
this[kCallback] = callback;
}
});
+ if (result !== undefined && result != null) {
+ try {
+ const then = result.then;
+ if (typeof then === 'function') {
+ then.call(
+ result,
+ (val) => {
+ if (called)
+ return;
+
+ if (val != null) {
+ this.push(val);
+ }
+
+ if (
+ wState.ended ||
+ length === rState.length ||
+ rState.length < rState.highWaterMark ||
+ rState.length === 0) {
+ process.nextTick(callback);
+ } else {
+ this[kCallback] = callback;
+ }
+ },
+ (err) => {
+ process.nextTick(callback, err);
+ });
+ }
+ } catch (err) {
+ process.nextTick(callback, err);
+ }
+ }
};
Transform.prototype._read = function() {
diff --git a/lib/_stream_writable.js b/lib/_stream_writable.js
index 1232742eb9..22aee3236c 100644
--- a/lib/_stream_writable.js
+++ b/lib/_stream_writable.js
@@ -647,7 +647,7 @@ function needFinish(state) {
function callFinal(stream, state) {
state.sync = true;
state.pendingcb++;
- stream._final((err) => {
+ const result = stream._final((err) => {
state.pendingcb--;
if (err) {
for (const callback of state[kOnFinished].splice(0)) {
@@ -664,6 +664,31 @@ function callFinal(stream, state) {
process.nextTick(finish, stream, state);
}
});
+ if (result !== undefined && result !== null) {
+ try {
+ const then = result.then;
+ if (typeof then === 'function') {
+ then.call(
+ result,
+ function() {
+ if (state.prefinished)
+ return;
+ state.prefinish = true;
+ process.nextTick(() => stream.emit('prefinish'));
+ state.pendingcb++;
+ process.nextTick(finish, stream, state);
+ },
+ function(err) {
+ for (const callback of state[kOnFinished].splice(0)) {
+ process.nextTick(callback, err);
+ }
+ process.nextTick(errorOrDestroy, stream, err, state.sync);
+ });
+ }
+ } catch (err) {
+ process.nextTick(errorOrDestroy, stream, err, state.sync);
+ }
+ }
state.sync = false;
}
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index 3bccd46b7e..510f7a40c8 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -59,10 +59,13 @@ function destroy(err, cb) {
}
function _destroy(self, err, cb) {
- self._destroy(err || null, (err) => {
+ let called = false;
+ const result = self._destroy(err || null, (err) => {
const r = self._readableState;
const w = self._writableState;
+ called = true;
+
if (err) {
// Avoid V8 leak, https://github.com/nodejs/node/pull/34103#issuecomment-652002364
err.stack;
@@ -92,6 +95,64 @@ function _destroy(self, err, cb) {
process.nextTick(emitCloseNT, self);
}
});
+ if (result !== undefined && result !== null) {
+ try {
+ const then = result.then;
+ if (typeof then === 'function') {
+ then.call(
+ result,
+ function() {
+ if (called)
+ return;
+
+ const r = self._readableState;
+ const w = self._writableState;
+
+ if (w) {
+ w.closed = true;
+ }
+ if (r) {
+ r.closed = true;
+ }
+
+ if (typeof cb === 'function') {
+ process.nextTick(cb);
+ }
+
+ process.nextTick(emitCloseNT, self);
+ },
+ function(err) {
+ const r = self._readableState;
+ const w = self._writableState;
+ err.stack;
+
+ called = true;
+
+ if (w && !w.errored) {
+ w.errored = err;
+ }
+ if (r && !r.errored) {
+ r.errored = err;
+ }
+
+ if (w) {
+ w.closed = true;
+ }
+ if (r) {
+ r.closed = true;
+ }
+
+ if (typeof cb === 'function') {
+ process.nextTick(cb, err);
+ }
+
+ process.nextTick(emitErrorCloseNT, self, err);
+ });
+ }
+ } catch (err) {
+ process.nextTick(emitErrorNT, self, err);
+ }
+ }
}
function emitErrorCloseNT(self, err) {
@@ -230,7 +291,7 @@ function constructNT(stream) {
const s = w || r;
let called = false;
- stream._construct((err) => {
+ const result = stream._construct((err) => {
if (r) {
r.constructed = true;
}
@@ -252,6 +313,47 @@ function constructNT(stream) {
process.nextTick(emitConstructNT, stream);
}
});
+ if (result !== undefined && result !== null) {
+ try {
+ const then = result.then;
+ if (typeof then === 'function') {
+ then.call(
+ result,
+ function() {
+ // If the callback was invoked, do nothing further.
+ if (called)
+ return;
+ if (r) {
+ r.constructed = true;
+ }
+ if (w) {
+ w.constructed = true;
+ }
+ if (s.destroyed) {
+ process.nextTick(() => stream.emit(kDestroy));
+ } else {
+ process.nextTick(emitConstructNT, stream);
+ }
+ },
+ function(err) {
+ if (r) {
+ r.constructed = true;
+ }
+ if (w) {
+ w.constructed = true;
+ }
+ called = true;
+ if (s.destroyed) {
+ process.nextTick(() => stream.emit(kDestroy, err));
+ } else {
+ process.nextTick(errorOrDestroy, stream, err);
+ }
+ });
+ }
+ } catch (err) {
+ process.nextTick(emitErrorNT, stream, err);
+ }
+ }
}
function emitConstructNT(stream) {
diff --git a/test/parallel/test-stream-construct-async-error.js b/test/parallel/test-stream-construct-async-error.js
new file mode 100644
index 0000000000..34e450c853
--- /dev/null
+++ b/test/parallel/test-stream-construct-async-error.js
@@ -0,0 +1,258 @@
+'use strict';
+
+const common = require('../common');
+const {
+ Duplex,
+ Writable,
+ Transform,
+} = require('stream');
+const { setTimeout } = require('timers/promises');
+const assert = require('assert');
+
+{
+ class Foo extends Duplex {
+ async _construct(cb) {
+ // eslint-disable-next-line no-restricted-syntax
+ await setTimeout(common.platformTimeout(1));
+ cb();
+ throw new Error('boom');
+ }
+ }
+
+ const foo = new Foo();
+ foo.on('error', common.expectsError({
+ message: 'boom'
+ }));
+ foo.on('close', common.mustCall(() => {
+ assert(foo._writableState.constructed);
+ assert(foo._readableState.constructed);
+ }));
+}
+
+{
+ class Foo extends Duplex {
+ async _destroy(err, cb) {
+ // eslint-disable-next-line no-restricted-syntax
+ await setTimeout(common.platformTimeout(1));
+ throw new Error('boom');
+ }
+ }
+
+ const foo = new Foo();
+ foo.destroy();
+ foo.on('error', common.expectsError({
+ message: 'boom'
+ }));
+ foo.on('close', common.mustCall(() => {
+ assert(foo.destroyed);
+ }));
+}
+
+{
+ class Foo extends Duplex {
+ async _destroy(err, cb) {
+ // eslint-disable-next-line no-restricted-syntax
+ await setTimeout(common.platformTimeout(1));
+ }
+ }
+
+ const foo = new Foo();
+ foo.destroy();
+ foo.on('close', common.mustCall(() => {
+ assert(foo.destroyed);
+ }));
+}
+
+{
+ class Foo extends Duplex {
+ async _construct() {
+ // eslint-disable-next-line no-restricted-syntax
+ await setTimeout(common.platformTimeout(1));
+ }
+
+ _write = common.mustCall((chunk, encoding, cb) => {
+ cb();
+ })
+
+ _read() {}
+ }
+
+ const foo = new Foo();
+ foo.write('test', common.mustCall());
+}
+
+{
+ class Foo extends Duplex {
+ async _construct(callback) {
+ // eslint-disable-next-line no-restricted-syntax
+ await setTimeout(common.platformTimeout(1));
+ callback();
+ }
+
+ _write = common.mustCall((chunk, encoding, cb) => {
+ cb();
+ })
+
+ _read() {}
+ }
+
+ const foo = new Foo();
+ foo.write('test', common.mustCall());
+}
+
+{
+ class Foo extends Writable {
+ _write = common.mustCall((chunk, encoding, cb) => {
+ cb();
+ })
+
+ async _final() {
+ // eslint-disable-next-line no-restricted-syntax
+ await setTimeout(common.platformTimeout(1));
+ }
+ }
+
+ const foo = new Foo();
+ foo.end('hello');
+ foo.on('finish', common.mustCall());
+}
+
+{
+ class Foo extends Writable {
+ _write = common.mustCall((chunk, encoding, cb) => {
+ cb();
+ })
+
+ async _final(callback) {
+ // eslint-disable-next-line no-restricted-syntax
+ await setTimeout(common.platformTimeout(1));
+ callback();
+ }
+ }
+
+ const foo = new Foo();
+ foo.end('hello');
+ foo.on('finish', common.mustCall());
+}
+
+{
+ class Foo extends Writable {
+ _write = common.mustCall((chunk, encoding, cb) => {
+ cb();
+ })
+
+ async _final() {
+ // eslint-disable-next-line no-restricted-syntax
+ await setTimeout(common.platformTimeout(1));
+ throw new Error('boom');
+ }
+ }
+
+ const foo = new Foo();
+ foo.end('hello');
+ foo.on('error', common.expectsError({
+ message: 'boom'
+ }));
+ foo.on('close', common.mustCall());
+}
+
+{
+ const expected = ['hello', 'world'];
+ class Foo extends Transform {
+ async _flush() {
+ return 'world';
+ }
+
+ _transform(chunk, encoding, callback) {
+ callback(null, chunk);
+ }
+ }
+
+ const foo = new Foo();
+ foo.end('hello');
+ foo.on('data', common.mustCall((chunk) => {
+ assert.strictEqual(chunk.toString(), expected.shift());
+ }, 2));
+}
+
+{
+ const expected = ['hello', 'world'];
+ class Foo extends Transform {
+ async _flush(callback) {
+ callback(null, 'world');
+ }
+
+ _transform(chunk, encoding, callback) {
+ callback(null, chunk);
+ }
+ }
+
+ const foo = new Foo();
+ foo.end('hello');
+ foo.on('data', common.mustCall((chunk) => {
+ assert.strictEqual(chunk.toString(), expected.shift());
+ }, 2));
+}
+
+{
+ class Foo extends Transform {
+ async _flush(callback) {
+ throw new Error('boom');
+ }
+
+ _transform(chunk, encoding, callback) {
+ callback(null, chunk);
+ }
+ }
+
+ const foo = new Foo();
+ foo.end('hello');
+ foo.on('data', common.mustCall());
+ foo.on('error', common.expectsError({
+ message: 'boom'
+ }));
+ foo.on('close', common.mustCall());
+}
+
+{
+ class Foo extends Transform {
+ async _transform(chunk) {
+ return chunk.toString().toUpperCase();
+ }
+ }
+
+ const foo = new Foo();
+ foo.end('hello');
+ foo.on('data', common.mustCall((chunk) => {
+ assert.strictEqual(chunk.toString(), 'HELLO');
+ }));
+}
+
+{
+ class Foo extends Transform {
+ async _transform(chunk, _, callback) {
+ callback(null, chunk.toString().toUpperCase());
+ }
+ }
+
+ const foo = new Foo();
+ foo.end('hello');
+ foo.on('data', common.mustCall((chunk) => {
+ assert.strictEqual(chunk.toString(), 'HELLO');
+ }));
+}
+
+{
+ class Foo extends Transform {
+ async _transform() {
+ throw new Error('boom');
+ }
+ }
+
+ const foo = new Foo();
+ foo.end('hello');
+ foo.on('error', common.expectsError({
+ message: 'boom'
+ }));
+ foo.on('close', common.mustCall());
+}