diff options
-rw-r--r-- | doc/api/webstreams.md | 73 | ||||
-rw-r--r-- | lib/stream/consumers.js | 84 | ||||
-rw-r--r-- | test/parallel/test-stream-consumers.js | 234 |
3 files changed, 391 insertions, 0 deletions
diff --git a/doc/api/webstreams.md b/doc/api/webstreams.md index d99a66a716..67675d6297 100644 --- a/doc/api/webstreams.md +++ b/doc/api/webstreams.md @@ -1270,5 +1270,78 @@ added: REPLACEME * Type: {WritableStream} +### Utility Consumers +<!-- YAML +added: REPLACEME +--> + +The utility consumer functions provide common options for consuming +streams. + +They are accessed using: + +```mjs +import { + arrayBuffer, + blob, + json, + text, +} from 'node:stream/consumers'; +``` + +```cjs +const { + arrayBuffer, + blob, + json, + text, +} = require('stream/consumers'); +``` + +#### `streamConsumers.arrayBuffer(stream)` +<!-- YAML +added: REPLACEME +--> + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with an `ArrayBuffer` containing the full + contents of the stream. + +#### `streamConsumers.blob(stream)` +<!-- YAML +added: REPLACEME +--> + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with a {Blob} containing the full contents + of the stream. + +#### `streamConsumers.buffer(stream)` +<!-- YAML +added: REPLACEME +--> + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with a {Buffer} containing the full + contents of the stream. + +#### `streamConsumers.json(stream)` +<!-- YAML +added: REPLACEME +--> + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with the contents of the stream parsed as a + UTF-8 encoded string that is then passed through `JSON.parse()`. + +#### `streamConsumers.text(stream)` +<!-- YAML +added: REPLACEME +--> + +* `stream` {ReadableStream|stream.Readable|AsyncIterator} +* Returns: {Promise} Fulfills with the contents of the stream parsed as a + UTF-8 encoded string. + [Streams]: stream.md [WHATWG Streams Standard]: https://streams.spec.whatwg.org/ diff --git a/lib/stream/consumers.js b/lib/stream/consumers.js new file mode 100644 index 0000000000..ffe6e53120 --- /dev/null +++ b/lib/stream/consumers.js @@ -0,0 +1,84 @@ +'use strict'; + +const { + JSONParse, +} = primordials; + +const { + TextDecoder, +} = require('internal/encoding'); + +const { + Blob, +} = require('internal/blob'); + +const { + Buffer, +} = require('buffer'); + +/** + * @typedef {import('../internal/webstreams/readablestream').ReadableStream + * } ReadableStream + * @typedef {import('../internal/streams/readable')} Readable + */ + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise<Blob>} + */ +async function blob(stream) { + const chunks = []; + for await (const chunk of stream) + chunks.push(chunk); + return new Blob(chunks); +} + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise<ArrayBuffer>} + */ +async function arrayBuffer(stream) { + const ret = await blob(stream); + return ret.arrayBuffer(); +} + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise<Buffer>} + */ +async function buffer(stream) { + return Buffer.from(await arrayBuffer(stream)); +} + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise<string>} + */ +async function text(stream) { + const dec = new TextDecoder(); + let str = ''; + for await (const chunk of stream) { + if (typeof chunk === 'string') + str += chunk; + else + str += dec.decode(chunk, { stream: true }); + } + return str; +} + +/** + * @param {AsyncIterable|ReadableStream|Readable} stream + * @returns {Promise<any>} + */ +async function json(stream) { + const str = await text(stream); + return JSONParse(str); +} + +module.exports = { + arrayBuffer, + blob, + buffer, + text, + json, +}; diff --git a/test/parallel/test-stream-consumers.js b/test/parallel/test-stream-consumers.js new file mode 100644 index 0000000000..8f6a9deb1c --- /dev/null +++ b/test/parallel/test-stream-consumers.js @@ -0,0 +1,234 @@ +// Flags: --no-warnings +'use strict'; + +const common = require('../common'); +const assert = require('assert'); + +const { + arrayBuffer, + blob, + buffer, + text, + json, +} = require('stream/consumers'); + +const { + PassThrough +} = require('stream'); + +const { + TransformStream, +} = require('stream/web'); + +const buf = Buffer.from('hellothere'); +const kArrayBuffer = + buf.buffer.slice(buf.byteOffset, buf.byteOffset + buf.byteLength); + +{ + const passthrough = new PassThrough(); + + blob(passthrough).then(common.mustCall(async (blob) => { + assert.strictEqual(blob.size, 10); + assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer); + })); + + passthrough.write('hello'); + setTimeout(() => passthrough.end('there'), 10); +} + +{ + const passthrough = new PassThrough(); + + arrayBuffer(passthrough).then(common.mustCall(async (ab) => { + assert.strictEqual(ab.byteLength, 10); + assert.deepStrictEqual(ab, kArrayBuffer); + })); + + passthrough.write('hello'); + setTimeout(() => passthrough.end('there'), 10); +} + +{ + const passthrough = new PassThrough(); + + buffer(passthrough).then(common.mustCall(async (buf) => { + assert.strictEqual(buf.byteLength, 10); + assert.deepStrictEqual(buf.buffer, kArrayBuffer); + })); + + passthrough.write('hello'); + setTimeout(() => passthrough.end('there'), 10); +} + + +{ + const passthrough = new PassThrough(); + + text(passthrough).then(common.mustCall(async (str) => { + assert.strictEqual(str.length, 10); + assert.deepStrictEqual(str, 'hellothere'); + })); + + passthrough.write('hello'); + setTimeout(() => passthrough.end('there'), 10); +} + +{ + const passthrough = new PassThrough(); + + json(passthrough).then(common.mustCall(async (str) => { + assert.strictEqual(str.length, 10); + assert.deepStrictEqual(str, 'hellothere'); + })); + + passthrough.write('"hello'); + setTimeout(() => passthrough.end('there"'), 10); +} + +{ + const { writable, readable } = new TransformStream(); + + blob(readable).then(common.mustCall(async (blob) => { + assert.strictEqual(blob.size, 10); + assert.deepStrictEqual(await blob.arrayBuffer(), kArrayBuffer); + })); + + const writer = writable.getWriter(); + writer.write('hello'); + setTimeout(() => { + writer.write('there'); + writer.close(); + }, 10); + + assert.rejects(blob(readable), { code: 'ERR_INVALID_STATE' }); +} + +{ + const { writable, readable } = new TransformStream(); + + arrayBuffer(readable).then(common.mustCall(async (ab) => { + assert.strictEqual(ab.byteLength, 10); + assert.deepStrictEqual(ab, kArrayBuffer); + })); + + const writer = writable.getWriter(); + writer.write('hello'); + setTimeout(() => { + writer.write('there'); + writer.close(); + }, 10); + + assert.rejects(arrayBuffer(readable), { code: 'ERR_INVALID_STATE' }); +} + +{ + const { writable, readable } = new TransformStream(); + + text(readable).then(common.mustCall(async (str) => { + assert.strictEqual(str.length, 10); + assert.deepStrictEqual(str, 'hellothere'); + })); + + const writer = writable.getWriter(); + writer.write('hello'); + setTimeout(() => { + writer.write('there'); + writer.close(); + }, 10); + + assert.rejects(text(readable), { code: 'ERR_INVALID_STATE' }); +} + +{ + const { writable, readable } = new TransformStream(); + + json(readable).then(common.mustCall(async (str) => { + assert.strictEqual(str.length, 10); + assert.deepStrictEqual(str, 'hellothere'); + })); + + const writer = writable.getWriter(); + writer.write('"hello'); + setTimeout(() => { + writer.write('there"'); + writer.close(); + }, 10); + + assert.rejects(json(readable), { code: 'ERR_INVALID_STATE' }); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + blob(stream).then(common.mustCall((blob) => { + assert.strictEqual(blob.size, 30); + })); + + stream.write({}); + stream.end({}); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + arrayBuffer(stream).then(common.mustCall((ab) => { + assert.strictEqual(ab.byteLength, 30); + assert.strictEqual( + Buffer.from(ab).toString(), + '[object Object][object Object]'); + })); + + stream.write({}); + stream.end({}); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + buffer(stream).then(common.mustCall((buf) => { + assert.strictEqual(buf.byteLength, 30); + assert.strictEqual( + buf.toString(), + '[object Object][object Object]'); + })); + + stream.write({}); + stream.end({}); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + assert.rejects(text(stream), { + code: 'ERR_INVALID_ARG_TYPE', + }); + + stream.write({}); + stream.end({}); +} + +{ + const stream = new PassThrough({ + readableObjectMode: true, + writableObjectMode: true, + }); + + assert.rejects(json(stream), { + code: 'ERR_INVALID_ARG_TYPE', + }); + + stream.write({}); + stream.end({}); +} |