summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Gruenbaum <benjamingr@gmail.com>2022-01-16 14:09:27 +0200
committerBenjamin Gruenbaum <benjamingr@gmail.com>2022-01-20 11:24:29 +0200
commit5a407d606abec9f111247fd568f9028e0da52995 (patch)
treea8df3ec50e90f8ef8e4d220c0d42f7b793903fd6
parentde9dc417fe1fc554c8cc34e51616668bfe012548 (diff)
downloadnode-new-5a407d606abec9f111247fd568f9028e0da52995.tar.gz
stream: add toArray
Add the toArray method from the TC39 iterator helper proposal to Readable streams. This also enables a common-use case of converting a stream to an array. Co-Authored-By: Robert Nagy <ronagy@icloud.com> PR-URL: https://github.com/nodejs/node/pull/41553 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
-rw-r--r--doc/api/stream.md40
-rw-r--r--lib/internal/streams/operators.js17
-rw-r--r--test/parallel/test-stream-toArray.js79
3 files changed, 136 insertions, 0 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 6444df9f3f..6f0cb937d5 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -1889,6 +1889,46 @@ await dnsResults.forEach((result) => {
console.log('done'); // Stream has finished
```
+### `readable.toArray([options])`
+
+<!-- YAML
+added: REPLACEME
+-->
+
+> Stability: 1 - Experimental
+
+* `options` {Object}
+ * `signal` {AbortSignal} allows cancelling the toArray operation if the
+ signal is aborted.
+* Returns: {Promise} a promise containing an array (if the stream is in
+ object mode) or Buffer with the contents of the stream.
+
+This method allows easily obtaining the contents of a stream. If the
+stream is in [object mode][object-mode] an array of its contents is returned.
+If the stream is not in object mode a Buffer containing its data is returned.
+
+As this method reads the entire stream into memory, it negates the benefits of
+streams. It's intended for interoperability and convenience, not as the primary
+way to consume streams.
+
+```mjs
+import { Readable } from 'stream';
+import { Resolver } from 'dns/promises';
+
+await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
+
+// Make dns queries concurrently using .map and collect
+// the results into an aray using toArray
+const dnsResults = await Readable.from([
+ 'nodejs.org',
+ 'openjsf.org',
+ 'www.linuxfoundation.org',
+]).map(async (domain) => {
+ const { address } = await resolver.resolve4(domain, { ttl: true });
+ return address;
+}, { concurrency: 2 }).toArray();
+```
+
### Duplex and transform streams
#### Class: `stream.Duplex`
diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js
index c9581f7b6d..2649966fd4 100644
--- a/lib/internal/streams/operators.js
+++ b/lib/internal/streams/operators.js
@@ -1,6 +1,8 @@
'use strict';
const { AbortController } = require('internal/abort_controller');
+const { Buffer } = require('buffer');
+
const {
codes: {
ERR_INVALID_ARG_TYPE,
@@ -10,6 +12,7 @@ const {
const { validateInteger } = require('internal/validators');
const {
+ ArrayPrototypePush,
MathFloor,
Promise,
PromiseReject,
@@ -174,6 +177,19 @@ async function * filter(fn, options) {
yield* this.map(filterFn, options);
}
+async function toArray(options) {
+ const result = [];
+ for await (const val of this) {
+ if (options?.signal?.aborted) {
+ throw new AbortError({ cause: options.signal.reason });
+ }
+ ArrayPrototypePush(result, val);
+ }
+ if (!this.readableObjectMode) {
+ return Buffer.concat(result);
+ }
+ return result;
+}
module.exports.streamReturningOperators = {
filter,
map,
@@ -181,4 +197,5 @@ module.exports.streamReturningOperators = {
module.exports.promiseReturningOperators = {
forEach,
+ toArray,
};
diff --git a/test/parallel/test-stream-toArray.js b/test/parallel/test-stream-toArray.js
new file mode 100644
index 0000000000..3bd15e7c0f
--- /dev/null
+++ b/test/parallel/test-stream-toArray.js
@@ -0,0 +1,79 @@
+'use strict';
+
+const common = require('../common');
+const {
+ Readable,
+} = require('stream');
+const assert = require('assert');
+
+{
+ // Works on a synchronous stream
+ (async () => {
+ const tests = [
+ [],
+ [1],
+ [1, 2, 3],
+ Array(100).fill().map((_, i) => i),
+ ];
+ for (const test of tests) {
+ const stream = Readable.from(test);
+ const result = await stream.toArray();
+ assert.deepStrictEqual(result, test);
+ }
+ })().then(common.mustCall());
+}
+
+{
+ // Works on a non-object-mode stream and flattens it
+ (async () => {
+ const stream = Readable.from(
+ [Buffer.from([1, 2, 3]), Buffer.from([4, 5, 6])]
+ , { objectMode: false });
+ const result = await stream.toArray();
+ assert.strictEqual(Buffer.isBuffer(result), true);
+ assert.deepStrictEqual(Array.from(result), [1, 2, 3, 4, 5, 6]);
+ })().then(common.mustCall());
+}
+
+{
+ // Works on an asynchronous stream
+ (async () => {
+ const tests = [
+ [],
+ [1],
+ [1, 2, 3],
+ Array(100).fill().map((_, i) => i),
+ ];
+ for (const test of tests) {
+ const stream = Readable.from(test).map((x) => Promise.resolve(x));
+ const result = await stream.toArray();
+ assert.deepStrictEqual(result, test);
+ }
+ })().then(common.mustCall());
+}
+
+{
+ // Support for AbortSignal
+ const ac = new AbortController();
+ let stream;
+ assert.rejects(async () => {
+ stream = Readable.from([1, 2, 3]).map(async (x) => {
+ if (x === 3) {
+ await new Promise(() => {}); // Explicitly do not pass signal here
+ }
+ return Promise.resolve(x);
+ });
+ await stream.toArray({ signal: ac.signal });
+ }, {
+ name: 'AbortError',
+ }).then(common.mustCall(() => {
+ // Only stops toArray, does not destory the stream
+ assert(stream.destroyed, false);
+ }));
+ ac.abort();
+}
+{
+ // Test result is a Promise
+ const result = Readable.from([1, 2, 3, 4, 5]).toArray();
+ assert.strictEqual(result instanceof Promise, true);
+}