summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRobert Nagy <ronagy@icloud.com>2021-07-07 11:33:55 +0200
committerRobert Nagy <ronagy@icloud.com>2021-07-11 10:40:30 +0200
commitbb275ef2a4105c3a66920f64d32c5a024a14921f (patch)
treed4474181a30841055432fff82bd1819fc34859cc
parentc4f8363bf7dc4e9ee34f4bf1add2d996ad020110 (diff)
downloadnode-new-bb275ef2a4105c3a66920f64d32c5a024a14921f.tar.gz
stream: unify stream utils
Unify stream helps into utils. PR-URL: https://github.com/nodejs/node/pull/39294 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: James M Snell <jasnell@gmail.com>
-rw-r--r--lib/_http_client.js2
-rw-r--r--lib/_http_incoming.js6
-rw-r--r--lib/internal/streams/add-abort-signal.js4
-rw-r--r--lib/internal/streams/destroy.js29
-rw-r--r--lib/internal/streams/end-of-stream.js90
-rw-r--r--lib/internal/streams/pipeline.js14
-rw-r--r--lib/internal/streams/utils.js192
-rw-r--r--lib/stream/promises.js4
-rw-r--r--test/parallel/test-stream-finished.js3
-rw-r--r--test/parallel/test-stream-pipeline.js2
10 files changed, 235 insertions, 111 deletions
diff --git a/lib/_http_client.js b/lib/_http_client.js
index 598b585bcf..280c6ebab7 100644
--- a/lib/_http_client.js
+++ b/lib/_http_client.js
@@ -53,7 +53,6 @@ const {
prepareError,
} = require('_http_common');
const { OutgoingMessage } = require('_http_outgoing');
-const { kDestroy } = require('internal/streams/destroy');
const Agent = require('_http_agent');
const { Buffer } = require('buffer');
const { defaultTriggerAsyncIdScope } = require('internal/async_hooks');
@@ -610,7 +609,6 @@ function parserOnIncomingClient(res, shouldKeepAlive) {
DTRACE_HTTP_CLIENT_RESPONSE(socket, req);
req.res = res;
res.req = req;
- res[kDestroy] = null;
// Add our listener first, so that we guarantee socket cleanup
res.on('end', responseOnEnd);
diff --git a/lib/_http_incoming.js b/lib/_http_incoming.js
index a92687ce37..31b7db6f6c 100644
--- a/lib/_http_incoming.js
+++ b/lib/_http_incoming.js
@@ -31,7 +31,6 @@ const {
} = primordials;
const { Readable, finished } = require('stream');
-const { kDestroy } = require('internal/streams/destroy');
const kHeaders = Symbol('kHeaders');
const kHeadersCount = Symbol('kHeadersCount');
@@ -199,11 +198,6 @@ IncomingMessage.prototype._destroy = function _destroy(err, cb) {
}
};
-IncomingMessage.prototype[kDestroy] = function(err) {
- this.socket = null;
- this.destroy(err);
-};
-
IncomingMessage.prototype._addHeaderLines = _addHeaderLines;
function _addHeaderLines(headers, n) {
if (headers && headers.length) {
diff --git a/lib/internal/streams/add-abort-signal.js b/lib/internal/streams/add-abort-signal.js
index ba0da5e8bc..80814f0936 100644
--- a/lib/internal/streams/add-abort-signal.js
+++ b/lib/internal/streams/add-abort-signal.js
@@ -18,13 +18,13 @@ const validateAbortSignal = (signal, name) => {
}
};
-function isStream(obj) {
+function isNodeStream(obj) {
return !!(obj && typeof obj.pipe === 'function');
}
module.exports.addAbortSignal = function addAbortSignal(signal, stream) {
validateAbortSignal(signal, 'signal');
- if (!isStream(stream)) {
+ if (!isNodeStream(stream)) {
throw new ERR_INVALID_ARG_TYPE('stream', 'stream.Stream', stream);
}
return module.exports.addAbortSignalNoValidate(signal, stream);
diff --git a/lib/internal/streams/destroy.js b/lib/internal/streams/destroy.js
index df2d9f7f71..dd81fdacd5 100644
--- a/lib/internal/streams/destroy.js
+++ b/lib/internal/streams/destroy.js
@@ -10,6 +10,12 @@ const {
const {
Symbol,
} = primordials;
+const {
+ kDestroyed,
+ isDestroyed,
+ isFinished,
+ isServerRequest
+} = require('internal/streams/utils');
const kDestroy = Symbol('kDestroy');
const kConstruct = Symbol('kConstruct');
@@ -364,8 +370,6 @@ function isRequest(stream) {
return stream && stream.setHeader && typeof stream.abort === 'function';
}
-const kDestroyed = Symbol('kDestroyed');
-
function emitCloseLegacy(stream) {
stream.emit('close');
}
@@ -375,31 +379,20 @@ function emitErrorCloseLegacy(stream, err) {
process.nextTick(emitCloseLegacy, stream);
}
-function isDestroyed(stream) {
- return stream.destroyed || stream[kDestroyed];
-}
-
-function isReadable(stream) {
- return stream.readable && !stream.readableEnded && !isDestroyed(stream);
-}
-
-function isWritable(stream) {
- return stream.writable && !stream.writableEnded && !isDestroyed(stream);
-}
-
// Normalize destroy for legacy.
function destroyer(stream, err) {
if (isDestroyed(stream)) {
return;
}
- if (!err && (isReadable(stream) || isWritable(stream))) {
+ if (!err && !isFinished(stream)) {
err = new AbortError();
}
// TODO: Remove isRequest branches.
- if (typeof stream[kDestroy] === 'function') {
- stream[kDestroy](err);
+ if (isServerRequest(stream)) {
+ stream.socket = null;
+ stream.destroy(err);
} else if (isRequest(stream)) {
stream.abort();
} else if (isRequest(stream.req)) {
@@ -421,8 +414,6 @@ function destroyer(stream, err) {
}
module.exports = {
- kDestroy,
- isDestroyed,
construct,
destroyer,
destroy,
diff --git a/lib/internal/streams/end-of-stream.js b/lib/internal/streams/end-of-stream.js
index efc2441c51..274c2796ed 100644
--- a/lib/internal/streams/end-of-stream.js
+++ b/lib/internal/streams/end-of-stream.js
@@ -17,48 +17,23 @@ const {
validateObject,
} = require('internal/validators');
+const {
+ isClosed,
+ isReadable,
+ isReadableNodeStream,
+ isReadableFinished,
+ isWritable,
+ isWritableNodeStream,
+ isWritableFinished,
+ willEmitClose: _willEmitClose,
+} = require('internal/streams/utils');
+
function isRequest(stream) {
return stream.setHeader && typeof stream.abort === 'function';
}
-function isServerResponse(stream) {
- return (
- typeof stream._sent100 === 'boolean' &&
- typeof stream._removedConnection === 'boolean' &&
- typeof stream._removedContLen === 'boolean' &&
- typeof stream._removedTE === 'boolean' &&
- typeof stream._closed === 'boolean'
- );
-}
-
-function isReadable(stream) {
- return typeof stream.readable === 'boolean' ||
- typeof stream.readableEnded === 'boolean' ||
- !!stream._readableState;
-}
-
-function isWritable(stream) {
- return typeof stream.writable === 'boolean' ||
- typeof stream.writableEnded === 'boolean' ||
- !!stream._writableState;
-}
-
-function isWritableFinished(stream) {
- if (stream.writableFinished) return true;
- const wState = stream._writableState;
- if (!wState || wState.errored) return false;
- return wState.finished || (wState.ended && wState.length === 0);
-}
-
const nop = () => {};
-function isReadableEnded(stream) {
- if (stream.readableEnded) return true;
- const rState = stream._readableState;
- if (!rState || rState.errored) return false;
- return rState.endEmitted || (rState.ended && rState.length === 0);
-}
-
function eos(stream, options, callback) {
if (arguments.length === 2) {
callback = options;
@@ -74,13 +49,12 @@ function eos(stream, options, callback) {
callback = once(callback);
const readable = options.readable ||
- (options.readable !== false && isReadable(stream));
+ (options.readable !== false && isReadableNodeStream(stream));
const writable = options.writable ||
- (options.writable !== false && isWritable(stream));
+ (options.writable !== false && isWritableNodeStream(stream));
const wState = stream._writableState;
const rState = stream._readableState;
- const state = wState || rState;
const onlegacyfinish = () => {
if (!stream.writable) onfinish();
@@ -89,16 +63,13 @@ function eos(stream, options, callback) {
// TODO (ronag): Improve soft detection to include core modules and
// common ecosystem modules that do properly emit 'close' but fail
// this generic check.
- let willEmitClose = isServerResponse(stream) || (
- state &&
- state.autoDestroy &&
- state.emitClose &&
- state.closed === false &&
- isReadable(stream) === readable &&
- isWritable(stream) === writable
+ let willEmitClose = (
+ _willEmitClose(stream) &&
+ isReadableNodeStream(stream) === readable &&
+ isWritableNodeStream(stream) === writable
);
- let writableFinished = stream.writableFinished || wState?.finished;
+ let writableFinished = isWritableFinished(stream, false);
const onfinish = () => {
writableFinished = true;
// Stream should not be destroyed here. If it is that
@@ -107,12 +78,12 @@ function eos(stream, options, callback) {
if (stream.destroyed) willEmitClose = false;
if (willEmitClose && (!stream.readable || readable)) return;
- if (!readable || readableEnded) callback.call(stream);
+ if (!readable || readableFinished) callback.call(stream);
};
- let readableEnded = stream.readableEnded || rState?.endEmitted;
+ let readableFinished = isReadableFinished(stream, false);
const onend = () => {
- readableEnded = true;
+ readableFinished = true;
// Stream should not be destroyed here. If it is that
// means that user space is doing something differently and
// we cannot trust willEmitClose.
@@ -126,7 +97,7 @@ function eos(stream, options, callback) {
callback.call(stream, err);
};
- let closed = wState?.closed || rState?.closed;
+ let closed = isClosed(stream);
const onclose = () => {
closed = true;
@@ -137,13 +108,13 @@ function eos(stream, options, callback) {
return callback.call(stream, errored);
}
- if (readable && !readableEnded) {
- if (!isReadableEnded(stream))
+ if (readable && !readableFinished) {
+ if (!isReadableFinished(stream, false))
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
if (writable && !writableFinished) {
- if (!isWritableFinished(stream))
+ if (!isWritableFinished(stream, false))
return callback.call(stream,
new ERR_STREAM_PREMATURE_CLOSE());
}
@@ -185,19 +156,16 @@ function eos(stream, options, callback) {
}
} else if (
!readable &&
- (!willEmitClose || stream.readable) &&
- writableFinished
+ (!willEmitClose || isReadable(stream)) &&
+ (writableFinished || !isWritable(stream))
) {
process.nextTick(onclose);
} else if (
!writable &&
- (!willEmitClose || stream.writable) &&
- readableEnded
+ (!willEmitClose || isWritable(stream)) &&
+ (readableFinished || !isReadable(stream))
) {
process.nextTick(onclose);
- } else if (!wState && !rState && stream._closed === true) {
- // _closed is for OutgoingMessage which is not a proper Writable.
- process.nextTick(onclose);
} else if ((rState && stream.req && stream.aborted)) {
process.nextTick(onclose);
}
diff --git a/lib/internal/streams/pipeline.js b/lib/internal/streams/pipeline.js
index 5759dbd4a5..c98b3b3d21 100644
--- a/lib/internal/streams/pipeline.js
+++ b/lib/internal/streams/pipeline.js
@@ -27,8 +27,8 @@ const { validateCallback } = require('internal/validators');
const {
isIterable,
- isReadable,
- isStream,
+ isReadableNodeStream,
+ isNodeStream,
} = require('internal/streams/utils');
let PassThrough;
@@ -87,7 +87,7 @@ function popCallback(streams) {
function makeAsyncIterable(val) {
if (isIterable(val)) {
return val;
- } else if (isReadable(val)) {
+ } else if (isReadableNodeStream(val)) {
// Legacy streams are not Iterable.
return fromReadable(val);
}
@@ -204,7 +204,7 @@ function pipeline(...streams) {
const reading = i < streams.length - 1;
const writing = i > 0;
- if (isStream(stream)) {
+ if (isNodeStream(stream)) {
finishCount++;
destroys.push(destroyer(stream, reading, writing, finish));
}
@@ -216,7 +216,7 @@ function pipeline(...streams) {
throw new ERR_INVALID_RETURN_VALUE(
'Iterable, AsyncIterable or Stream', 'source', ret);
}
- } else if (isIterable(stream) || isReadable(stream)) {
+ } else if (isIterable(stream) || isReadableNodeStream(stream)) {
ret = stream;
} else {
throw new ERR_INVALID_ARG_TYPE(
@@ -271,8 +271,8 @@ function pipeline(...streams) {
finishCount++;
destroys.push(destroyer(ret, false, true, finish));
}
- } else if (isStream(stream)) {
- if (isReadable(ret)) {
+ } else if (isNodeStream(stream)) {
+ if (isReadableNodeStream(ret)) {
ret.pipe(stream);
// Compat. Before node v10.12.0 stdio used to throw an error so
diff --git a/lib/internal/streams/utils.js b/lib/internal/streams/utils.js
index 62eea02268..0cd9c8eb36 100644
--- a/lib/internal/streams/utils.js
+++ b/lib/internal/streams/utils.js
@@ -1,22 +1,34 @@
'use strict';
const {
+ Symbol,
SymbolAsyncIterator,
SymbolIterator,
} = primordials;
-function isReadable(obj) {
- return !!(obj && typeof obj.pipe === 'function' &&
- typeof obj.on === 'function');
+const kDestroyed = Symbol('kDestroyed');
+
+function isReadableNodeStream(obj) {
+ return !!(
+ obj &&
+ typeof obj.pipe === 'function' &&
+ typeof obj.on === 'function' &&
+ (!obj._writableState || obj._readableState?.readable !== false) && // Duplex
+ (!obj._writableState || obj._readableState) // Writable has .pipe.
+ );
}
-function isWritable(obj) {
- return !!(obj && typeof obj.write === 'function' &&
- typeof obj.on === 'function');
+function isWritableNodeStream(obj) {
+ return !!(
+ obj &&
+ typeof obj.write === 'function' &&
+ typeof obj.on === 'function' &&
+ (!obj._readableState || obj._writableState?.writable !== false) // Duplex
+ );
}
-function isStream(obj) {
- return isReadable(obj) || isWritable(obj);
+function isNodeStream(obj) {
+ return isReadableNodeStream(obj) || isWritableNodeStream(obj);
}
function isIterable(obj, isAsync) {
@@ -27,8 +39,170 @@ function isIterable(obj, isAsync) {
typeof obj[SymbolIterator] === 'function';
}
+function isDestroyed(stream) {
+ if (!isNodeStream(stream)) return null;
+ const wState = stream._writableState;
+ const rState = stream._readableState;
+ const state = wState || rState;
+ return !!(stream.destroyed || stream[kDestroyed] || state?.destroyed);
+}
+
+// Have been end():d.
+function isWritableEnded(stream) {
+ if (!isWritableNodeStream(stream)) return null;
+ if (stream.writableEnded === true) return true;
+ const wState = stream._writableState;
+ if (wState?.errored) return false;
+ if (typeof wState?.ended !== 'boolean') return null;
+ return wState.ended;
+}
+
+// Have emitted 'finish'.
+function isWritableFinished(stream, strict) {
+ if (!isWritableNodeStream(stream)) return null;
+ if (stream.writableFinished === true) return true;
+ const wState = stream._writableState;
+ if (wState?.errored) return false;
+ if (typeof wState?.finished !== 'boolean') return null;
+ return !!(
+ wState.finished ||
+ (strict === false && wState.ended === true && wState.length === 0)
+ );
+}
+
+// Have been push(null):d.
+function isReadableEnded(stream) {
+ if (!isReadableNodeStream(stream)) return null;
+ if (stream.readableEnded === true) return true;
+ const rState = stream._readableState;
+ if (!rState || rState.errored) return false;
+ if (typeof rState?.ended !== 'boolean') return null;
+ return rState.ended;
+}
+
+// Have emitted 'end'.
+function isReadableFinished(stream, strict) {
+ if (!isReadableNodeStream(stream)) return null;
+ const rState = stream._readableState;
+ if (rState?.errored) return false;
+ if (typeof rState?.endEmitted !== 'boolean') return null;
+ return !!(
+ rState.endEmitted ||
+ (strict === false && rState.ended === true && rState.length === 0)
+ );
+}
+
+function isReadable(stream) {
+ const r = isReadableNodeStream(stream);
+ if (r === null || typeof stream.readable !== 'boolean') return null;
+ if (isDestroyed(stream)) return false;
+ return r && stream.readable && !isReadableFinished(stream);
+}
+
+function isWritable(stream) {
+ const r = isWritableNodeStream(stream);
+ if (r === null || typeof stream.writable !== 'boolean') return null;
+ if (isDestroyed(stream)) return false;
+ return r && stream.writable && !isWritableEnded(stream);
+}
+
+function isFinished(stream, opts) {
+ if (!isNodeStream(stream)) {
+ return null;
+ }
+
+ if (isDestroyed(stream)) {
+ return true;
+ }
+
+ if (opts?.readable !== false && isReadable(stream)) {
+ return false;
+ }
+
+ if (opts?.writable !== false && isWritable(stream)) {
+ return false;
+ }
+
+ return true;
+}
+
+function isClosed(stream) {
+ if (!isNodeStream(stream)) {
+ return null;
+ }
+
+ const wState = stream._writableState;
+ const rState = stream._readableState;
+
+ if (
+ typeof wState?.closed === 'boolean' ||
+ typeof rState?.closed === 'boolean'
+ ) {
+ return wState?.closed || rState?.closed;
+ }
+
+ if (typeof stream._closed === 'boolean' && isOutgoingMessage(stream)) {
+ return stream._closed;
+ }
+
+ return null;
+}
+
+function isOutgoingMessage(stream) {
+ return (
+ typeof stream._closed === 'boolean' &&
+ typeof stream._defaultKeepAlive === 'boolean' &&
+ typeof stream._removedConnection === 'boolean' &&
+ typeof stream._removedContLen === 'boolean'
+ );
+}
+
+function isServerResponse(stream) {
+ return (
+ typeof stream._sent100 === 'boolean' &&
+ isOutgoingMessage(stream)
+ );
+}
+
+function isServerRequest(stream) {
+ return (
+ typeof stream._consuming === 'boolean' &&
+ typeof stream._dumped === 'boolean' &&
+ stream.req?.upgradeOrConnect === undefined
+ );
+}
+
+function willEmitClose(stream) {
+ if (!isNodeStream(stream)) return null;
+
+ const wState = stream._writableState;
+ const rState = stream._readableState;
+ const state = wState || rState;
+
+ return (!state && isServerResponse(stream)) || !!(
+ state &&
+ state.autoDestroy &&
+ state.emitClose &&
+ state.closed === false
+ );
+}
+
module.exports = {
+ kDestroyed,
+ isClosed,
+ isDestroyed,
+ isFinished,
isIterable,
isReadable,
- isStream,
+ isReadableNodeStream,
+ isReadableEnded,
+ isReadableFinished,
+ isNodeStream,
+ isWritable,
+ isWritableNodeStream,
+ isWritableEnded,
+ isWritableFinished,
+ isServerRequest,
+ isServerResponse,
+ willEmitClose,
};
diff --git a/lib/stream/promises.js b/lib/stream/promises.js
index f5d8731973..8a8e66417c 100644
--- a/lib/stream/promises.js
+++ b/lib/stream/promises.js
@@ -15,7 +15,7 @@ const {
const {
isIterable,
- isStream,
+ isNodeStream,
} = require('internal/streams/utils');
const pl = require('internal/streams/pipeline');
@@ -26,7 +26,7 @@ function pipeline(...streams) {
let signal;
const lastArg = streams[streams.length - 1];
if (lastArg && typeof lastArg === 'object' &&
- !isStream(lastArg) && !isIterable(lastArg)) {
+ !isNodeStream(lastArg) && !isIterable(lastArg)) {
const options = ArrayPrototypePop(streams);
signal = options.signal;
validateAbortSignal(signal, 'options.signal');
diff --git a/test/parallel/test-stream-finished.js b/test/parallel/test-stream-finished.js
index 8e37191169..8ada0c4c34 100644
--- a/test/parallel/test-stream-finished.js
+++ b/test/parallel/test-stream-finished.js
@@ -415,7 +415,7 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
d._writableState = {};
d._writableState.finished = true;
finished(d, { readable: false, writable: true }, common.mustCall((err) => {
- assert.strictEqual(err, undefined);
+ assert.strictEqual(err.code, 'ERR_STREAM_PREMATURE_CLOSE');
}));
d._writableState.errored = true;
d.emit('close');
@@ -586,7 +586,6 @@ testClosed((opts) => new Writable({ write() {}, ...opts }));
});
}
-
{
const w = new Writable({
write(chunk, encoding, callback) {
diff --git a/test/parallel/test-stream-pipeline.js b/test/parallel/test-stream-pipeline.js
index e2e5fe2e0d..1f4474e6b5 100644
--- a/test/parallel/test-stream-pipeline.js
+++ b/test/parallel/test-stream-pipeline.js
@@ -1035,7 +1035,7 @@ const net = require('net');
const dst = new PassThrough();
dst.readable = false;
pipeline(src, dst, common.mustSucceed(() => {
- assert.strictEqual(dst.destroyed, false);
+ assert.strictEqual(dst.destroyed, true);
}));
src.end();
}