summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJames M Snell <jasnell@gmail.com>2021-07-30 15:02:13 -0700
committerJames M Snell <jasnell@gmail.com>2021-08-06 15:42:58 -0700
commitc52410710e35150eb88e33de18ae7056b6c8754c (patch)
tree2a9fbb62055110d0c6fd8488c1a410de883bc181
parente4b1fb5e6422c1ff151234bb9de792d45dd88d87 (diff)
downloadnode-new-c52410710e35150eb88e33de18ae7056b6c8754c.tar.gz
stream: utility consumers for web and node.js streams
Signed-off-by: James M Snell <jasnell@gmail.com> PR-URL: https://github.com/nodejs/node/pull/39594 Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
-rw-r--r--doc/api/webstreams.md73
-rw-r--r--lib/stream/consumers.js84
-rw-r--r--test/parallel/test-stream-consumers.js234
3 files changed, 391 insertions, 0 deletions
diff --git a/doc/api/webstreams.md b/doc/api/webstreams.md
index d99a66a716..67675d6297 100644
--- a/doc/api/webstreams.md
+++ b/doc/api/webstreams.md
@@ -1270,5 +1270,78 @@ added: REPLACEME
* Type: {WritableStream}
+### Utility Consumers
+<!-- YAML
+added: REPLACEME
+-->
+
+The utility consumer functions provide common options for consuming
+streams.
+
+They are accessed using:
+
+```mjs
+import {
+ arrayBuffer,
+ blob,
+ json,
+ text,
+} from 'node:stream/consumers';
+```
+
+```cjs
+const {
+ arrayBuffer,
+ blob,
+ json,
+ text,
+} = require('stream/consumers');
+```
+
+#### `streamConsumers.arrayBuffer(stream)`
+<!-- YAML
+added: REPLACEME
+-->
+
+* `stream` {ReadableStream|stream.Readable|AsyncIterator}
+* Returns: {Promise} Fulfills with an `ArrayBuffer` containing the full
+ contents of the stream.
+
+#### `streamConsumers.blob(stream)`
+<!-- YAML
+added: REPLACEME
+-->
+
+* `stream` {ReadableStream|stream.Readable|AsyncIterator}
+* Returns: {Promise} Fulfills with a {Blob} containing the full contents
+ of the stream.
+
+#### `streamConsumers.buffer(stream)`
+<!-- YAML
+added: REPLACEME
+-->
+
+* `stream` {ReadableStream|stream.Readable|AsyncIterator}
+* Returns: {Promise} Fulfills with a {Buffer} containing the full
+ contents of the stream.
+
+#### `streamConsumers.json(stream)`
+<!-- YAML
+added: REPLACEME
+-->
+
+* `stream` {ReadableStream|stream.Readable|AsyncIterator}
+* Returns: {Promise} Fulfills with the contents of the stream parsed as a
+ UTF-8 encoded string that is then passed through `JSON.parse()`.
+
+#### `streamConsumers.text(stream)`
+<!-- YAML
+added: REPLACEME
+-->
+
+* `stream` {ReadableStream|stream.Readable|AsyncIterator}
+* Returns: {Promise} Fulfills with the contents of the stream parsed as a
+ UTF-8 encoded string.
+
[Streams]: stream.md
[WHATWG Streams Standard]: https://streams.spec.whatwg.org/
diff --git a/lib/stream/consumers.js b/lib/stream/consumers.js
new file mode 100644
index 0000000000..ffe6e53120
--- /dev/null
+++ b/lib/stream/consumers.js
@@ -0,0 +1,84 @@
+'use strict';
+
+const {
+ JSONParse,
+} = primordials;
+
+const {
+ TextDecoder,
+} = require('internal/encoding');
+
+const {
+ Blob,
+} = require('internal/blob');
+
+const {
+ Buffer,
+} = require('buffer');
+
+/**
+ * @typedef {import('../internal/webstreams/readablestream').ReadableStream
+ * } ReadableStream
+ * @typedef {import('../internal/streams/readable')} Readable
+ */
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<Blob>}
+ */
+async function blob(stream) {
+ const chunks = [];
+ for await (const chunk of stream)
+ chunks.push(chunk);
+ return new Blob(chunks);
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<ArrayBuffer>}
+ */
+async function arrayBuffer(stream) {
+ const ret = await blob(stream);
+ return ret.arrayBuffer();
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<Buffer>}
+ */
+async function buffer(stream) {
+ return Buffer.from(await arrayBuffer(stream));
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<string>}
+ */
+async function text(stream) {
+ const dec = new TextDecoder();
+ let str = '';
+ for await (const chunk of stream) {
+ if (typeof chunk === 'string')
+ str += chunk;
+ else
+ str += dec.decode(chunk, { stream: true });
+ }
+ return str;
+}
+
+/**
+ * @param {AsyncIterable|ReadableStream|Readable} stream
+ * @returns {Promise<any>}
+ */
+async function json(stream) {
+ const str = await text(stream);
+ return JSONParse(str);
+}
+
+module.exports = {
+ arrayBuffer,
+ blob,
+ buffer,
+ text,
+ json,
+};
diff --git a/test/parallel/test-stream-consumers.js b/test/parallel/test-stream-consumers.js
new file mode 100644
index 0000000000..8f6a9deb1c
--- /dev/null
+++ b/test/parallel/test-stream-consumers.js
@@ -0,0 +1,234 @@
+// Flags: --no-warnings
+'use strict';
+
+const common = require('../common');
+const assert = require('assert');
+
+const {
+ arrayBuffer,
+ blob,
+ buffer,
+ text,
+ json,
+} = require('stream/consumers');
+
+const {
+ PassThrough
+} = require('stream');
+
+const {
+ TransformStream,
+} = require('stream/web');
+
+const buf = Buffer.from('hellothere');
+const kArrayBuffer =
+ buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength);
+
+{
+ const passthrough = new PassThrough();
+
+ blob(passthrough).then(common.mustCall(async (blob) => {
+ assert.strictEqual(blob.size, 10);
+ assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer);
+ }));
+
+ passthrough.write('hello');
+ setTimeout(() => passthrough.end('there'), 10);
+}
+
+{
+ const passthrough = new PassThrough();
+
+ arrayBuffer(passthrough).then(common.mustCall(async (ab) => {
+ assert.strictEqual(ab.byteLength, 10);
+ assert.deepStrictEqual(ab, kArrayBuffer);
+ }));
+
+ passthrough.write('hello');
+ setTimeout(() => passthrough.end('there'), 10);
+}
+
+{
+ const passthrough = new PassThrough();
+
+ buffer(passthrough).then(common.mustCall(async (buf) => {
+ assert.strictEqual(buf.byteLength, 10);
+ assert.deepStrictEqual(buf.buffer, kArrayBuffer);
+ }));
+
+ passthrough.write('hello');
+ setTimeout(() => passthrough.end('there'), 10);
+}
+
+
+{
+ const passthrough = new PassThrough();
+
+ text(passthrough).then(common.mustCall(async (str) => {
+ assert.strictEqual(str.length, 10);
+ assert.deepStrictEqual(str, 'hellothere');
+ }));
+
+ passthrough.write('hello');
+ setTimeout(() => passthrough.end('there'), 10);
+}
+
+{
+ const passthrough = new PassThrough();
+
+ json(passthrough).then(common.mustCall(async (str) => {
+ assert.strictEqual(str.length, 10);
+ assert.deepStrictEqual(str, 'hellothere');
+ }));
+
+ passthrough.write('"hello');
+ setTimeout(() => passthrough.end('there"'), 10);
+}
+
+{
+ const { writable, readable } = new TransformStream();
+
+ blob(readable).then(common.mustCall(async (blob) => {
+ assert.strictEqual(blob.size, 10);
+ assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer);
+ }));
+
+ const writer = writable.getWriter();
+ writer.write('hello');
+ setTimeout(() => {
+ writer.write('there');
+ writer.close();
+ }, 10);
+
+ assert.rejects(blob(readable), { code: 'ERR_INVALID_STATE' });
+}
+
+{
+ const { writable, readable } = new TransformStream();
+
+ arrayBuffer(readable).then(common.mustCall(async (ab) => {
+ assert.strictEqual(ab.byteLength, 10);
+ assert.deepStrictEqual(ab, kArrayBuffer);
+ }));
+
+ const writer = writable.getWriter();
+ writer.write('hello');
+ setTimeout(() => {
+ writer.write('there');
+ writer.close();
+ }, 10);
+
+ assert.rejects(arrayBuffer(readable), { code: 'ERR_INVALID_STATE' });
+}
+
+{
+ const { writable, readable } = new TransformStream();
+
+ text(readable).then(common.mustCall(async (str) => {
+ assert.strictEqual(str.length, 10);
+ assert.deepStrictEqual(str, 'hellothere');
+ }));
+
+ const writer = writable.getWriter();
+ writer.write('hello');
+ setTimeout(() => {
+ writer.write('there');
+ writer.close();
+ }, 10);
+
+ assert.rejects(text(readable), { code: 'ERR_INVALID_STATE' });
+}
+
+{
+ const { writable, readable } = new TransformStream();
+
+ json(readable).then(common.mustCall(async (str) => {
+ assert.strictEqual(str.length, 10);
+ assert.deepStrictEqual(str, 'hellothere');
+ }));
+
+ const writer = writable.getWriter();
+ writer.write('"hello');
+ setTimeout(() => {
+ writer.write('there"');
+ writer.close();
+ }, 10);
+
+ assert.rejects(json(readable), { code: 'ERR_INVALID_STATE' });
+}
+
+{
+ const stream = new PassThrough({
+ readableObjectMode: true,
+ writableObjectMode: true,
+ });
+
+ blob(stream).then(common.mustCall((blob) => {
+ assert.strictEqual(blob.size, 30);
+ }));
+
+ stream.write({});
+ stream.end({});
+}
+
+{
+ const stream = new PassThrough({
+ readableObjectMode: true,
+ writableObjectMode: true,
+ });
+
+ arrayBuffer(stream).then(common.mustCall((ab) => {
+ assert.strictEqual(ab.byteLength, 30);
+ assert.strictEqual(
+ Buffer.from(ab).toString(),
+ '[object Object][object Object]');
+ }));
+
+ stream.write({});
+ stream.end({});
+}
+
+{
+ const stream = new PassThrough({
+ readableObjectMode: true,
+ writableObjectMode: true,
+ });
+
+ buffer(stream).then(common.mustCall((buf) => {
+ assert.strictEqual(buf.byteLength, 30);
+ assert.strictEqual(
+ buf.toString(),
+ '[object Object][object Object]');
+ }));
+
+ stream.write({});
+ stream.end({});
+}
+
+{
+ const stream = new PassThrough({
+ readableObjectMode: true,
+ writableObjectMode: true,
+ });
+
+ assert.rejects(text(stream), {
+ code: 'ERR_INVALID_ARG_TYPE',
+ });
+
+ stream.write({});
+ stream.end({});
+}
+
+{
+ const stream = new PassThrough({
+ readableObjectMode: true,
+ writableObjectMode: true,
+ });
+
+ assert.rejects(json(stream), {
+ code: 'ERR_INVALID_ARG_TYPE',
+ });
+
+ stream.write({});
+ stream.end({});
+}