summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGuy Bedford <guybedford@gmail.com>2019-05-12 19:00:53 +0200
committerBeth Griggs <Bethany.Griggs@uk.ibm.com>2019-10-17 17:50:42 +0100
commitddb5152e9b8a914fd62b85e19455d5885fe30faf (patch)
tree1516ba599525e945798e4705c816b87178c9129e
parent333963ef73794e94ee5815a1b6c280f48cf9ac4a (diff)
downloadnode-new-ddb5152e9b8a914fd62b85e19455d5885fe30faf.tar.gz
stream: implement Readable.from async iterator utility
PR-URL: https://github.com/nodejs/node/pull/27660 Reviewed-By: Gus Caplan <me@gus.host> Reviewed-By: James M Snell <jasnell@gmail.com> Reviewed-By: Michaƫl Zasso <targos@protonmail.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Benjamin Gruenbaum <benjamingr@gmail.com> Reviewed-By: Anna Henningsen <anna@addaleax.net>
-rw-r--r--doc/api/stream.md113
-rw-r--r--lib/_stream_readable.js39
-rw-r--r--test/parallel/test-events-once.js2
-rw-r--r--test/parallel/test-readable-from.js163
4 files changed, 314 insertions, 3 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index dc6a79db69..24f3ee8900 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -46,8 +46,8 @@ There are four fundamental stream types within Node.js:
* [`Transform`][] - `Duplex` streams that can modify or transform the data as it
is written and read (for example, [`zlib.createDeflate()`][]).
-Additionally, this module includes the utility functions [pipeline][] and
-[finished][].
+Additionally, this module includes the utility functions [pipeline][],
+[finished][] and [Readable.from][].
### Object Mode
@@ -1445,6 +1445,31 @@ async function run() {
run().catch(console.error);
```
+### Readable.from(iterable, [options])
+
+* `iterable` {Iterable} Object implementing the `Symbol.asyncIterator` or
+ `Symbol.iterator` iterable protocol.
+* `options` {Object} Options provided to `new stream.Readable([options])`.
+ By default, `Readable.from()` will set `options.objectMode` to `true`, unless
+ this is explicitly opted out by setting `options.objectMode` to `false`.
+
+A utility method for creating Readable Streams out of iterators.
+
+```js
+const { Readable } = require('stream');
+
+async function * generate() {
+ yield 'hello';
+ yield 'streams';
+}
+
+const readable = Readable.from(generate());
+
+readable.on('data', (chunk) => {
+ console.log(chunk);
+});
+```
+
## API for Stream Implementers
<!--type=misc-->
@@ -2368,6 +2393,89 @@ primarily for examples and testing, but there are some use cases where
<!--type=misc-->
+### Streams Compatibility with Async Generators and Async Iterators
+
+With the support of async generators and iterators in JavaScript, async
+generators are effectively a first-class language-level stream construct at
+this point.
+
+Some common interop cases of using Node.js streams with async generators
+and async iterators are provided below.
+
+#### Consuming Readable Streams with Async Iterators
+
+```js
+(async function() {
+ for await (const chunk of readable) {
+ console.log(chunk);
+ }
+})();
+```
+
+#### Creating Readable Streams with Async Generators
+
+We can construct a Node.js Readable Stream from an asynchronous generator
+using the `Readable.from` utility method:
+
+```js
+const { Readable } = require('stream');
+
+async function * generate() {
+ yield 'a';
+ yield 'b';
+ yield 'c';
+}
+
+const readable = Readable.from(generate());
+
+readable.on('data', (chunk) => {
+ console.log(chunk);
+});
+```
+
+#### Piping to Writable Streams from Async Iterators
+
+In the scenario of writing to a writeable stream from an async iterator,
+it is important to ensure the correct handling of backpressure and errors.
+
+```js
+const { once } = require('events');
+
+const writeable = fs.createWriteStream('./file');
+
+(async function() {
+ for await (const chunk of iterator) {
+ // Handle backpressure on write
+ if (!writeable.write(value))
+ await once(writeable, 'drain');
+ }
+ writeable.end();
+ // Ensure completion without errors
+ await once(writeable, 'finish');
+})();
+```
+
+In the above, errors on the write stream would be caught and thrown by the two
+`once` listeners, since `once` will also handle `'error'` events.
+
+Alternatively the readable stream could be wrapped with `Readable.from` and
+then piped via `.pipe`:
+
+```js
+const { once } = require('events');
+
+const writeable = fs.createWriteStream('./file');
+
+(async function() {
+ const readable = Readable.from(iterator);
+ readable.pipe(writeable);
+ // Ensure completion without errors
+ await once(writeable, 'finish');
+})();
+```
+
+<!--type=misc-->
+
### Compatibility with Older Node.js Versions
<!--type=misc-->
@@ -2504,6 +2612,7 @@ contain multi-byte characters.
[Compatibility]: #stream_compatibility_with_older_node_js_versions
[HTTP requests, on the client]: http.html#http_class_http_clientrequest
[HTTP responses, on the server]: http.html#http_class_http_serverresponse
+[Readable.from]: #readable.from
[TCP sockets]: net.html#net_class_net_socket
[child process stdin]: child_process.html#child_process_subprocess_stdin
[child process stdout and stderr]: child_process.html#child_process_subprocess_stdout
diff --git a/lib/_stream_readable.js b/lib/_stream_readable.js
index 03f7924e5c..bd0185c044 100644
--- a/lib/_stream_readable.js
+++ b/lib/_stream_readable.js
@@ -1154,3 +1154,42 @@ function endReadableNT(state, stream) {
}
}
}
+
+Readable.from = function(iterable, opts) {
+ let iterator;
+ if (iterable && iterable[Symbol.asyncIterator])
+ iterator = iterable[Symbol.asyncIterator]();
+ else if (iterable && iterable[Symbol.iterator])
+ iterator = iterable[Symbol.iterator]();
+ else
+ throw new ERR_INVALID_ARG_TYPE('iterable', ['Iterable'], iterable);
+
+ const readable = new Readable({
+ objectMode: true,
+ ...opts
+ });
+ // Reading boolean to protect against _read
+ // being called before last iteration completion.
+ let reading = false;
+ readable._read = function() {
+ if (!reading) {
+ reading = true;
+ next();
+ }
+ };
+ async function next() {
+ try {
+ const { value, done } = await iterator.next();
+ if (done) {
+ readable.push(null);
+ } else if (readable.push(await value)) {
+ next();
+ } else {
+ reading = false;
+ }
+ } catch (err) {
+ readable.destroy(err);
+ }
+ }
+ return readable;
+};
diff --git a/test/parallel/test-events-once.js b/test/parallel/test-events-once.js
index f99604018a..25ef4e9845 100644
--- a/test/parallel/test-events-once.js
+++ b/test/parallel/test-events-once.js
@@ -90,4 +90,4 @@ Promise.all([
catchesErrors(),
stopListeningAfterCatchingError(),
onceError()
-]);
+]).then(common.mustCall());
diff --git a/test/parallel/test-readable-from.js b/test/parallel/test-readable-from.js
new file mode 100644
index 0000000000..a441f743cc
--- /dev/null
+++ b/test/parallel/test-readable-from.js
@@ -0,0 +1,163 @@
+'use strict';
+
+const { mustCall } = require('../common');
+const { once } = require('events');
+const { Readable } = require('stream');
+const { strictEqual } = require('assert');
+
+async function toReadableBasicSupport() {
+ async function * generate() {
+ yield 'a';
+ yield 'b';
+ yield 'c';
+ }
+
+ const stream = Readable.from(generate());
+
+ const expected = ['a', 'b', 'c'];
+
+ for await (const chunk of stream) {
+ strictEqual(chunk, expected.shift());
+ }
+}
+
+async function toReadableSyncIterator() {
+ function * generate() {
+ yield 'a';
+ yield 'b';
+ yield 'c';
+ }
+
+ const stream = Readable.from(generate());
+
+ const expected = ['a', 'b', 'c'];
+
+ for await (const chunk of stream) {
+ strictEqual(chunk, expected.shift());
+ }
+}
+
+async function toReadablePromises() {
+ const promises = [
+ Promise.resolve('a'),
+ Promise.resolve('b'),
+ Promise.resolve('c')
+ ];
+
+ const stream = Readable.from(promises);
+
+ const expected = ['a', 'b', 'c'];
+
+ for await (const chunk of stream) {
+ strictEqual(chunk, expected.shift());
+ }
+}
+
+async function toReadableString() {
+ const stream = Readable.from('abc');
+
+ const expected = ['a', 'b', 'c'];
+
+ for await (const chunk of stream) {
+ strictEqual(chunk, expected.shift());
+ }
+}
+
+async function toReadableOnData() {
+ async function * generate() {
+ yield 'a';
+ yield 'b';
+ yield 'c';
+ }
+
+ const stream = Readable.from(generate());
+
+ let iterations = 0;
+ const expected = ['a', 'b', 'c'];
+
+ stream.on('data', (chunk) => {
+ iterations++;
+ strictEqual(chunk, expected.shift());
+ });
+
+ await once(stream, 'end');
+
+ strictEqual(iterations, 3);
+}
+
+async function toReadableOnDataNonObject() {
+ async function * generate() {
+ yield 'a';
+ yield 'b';
+ yield 'c';
+ }
+
+ const stream = Readable.from(generate(), { objectMode: false });
+
+ let iterations = 0;
+ const expected = ['a', 'b', 'c'];
+
+ stream.on('data', (chunk) => {
+ iterations++;
+ strictEqual(chunk instanceof Buffer, true);
+ strictEqual(chunk.toString(), expected.shift());
+ });
+
+ await once(stream, 'end');
+
+ strictEqual(iterations, 3);
+}
+
+async function destroysTheStreamWhenThrowing() {
+ async function * generate() {
+ throw new Error('kaboom');
+ }
+
+ const stream = Readable.from(generate());
+
+ stream.read();
+
+ try {
+ await once(stream, 'error');
+ } catch (err) {
+ strictEqual(err.message, 'kaboom');
+ strictEqual(stream.destroyed, true);
+ }
+}
+
+async function asTransformStream() {
+ async function * generate(stream) {
+ for await (const chunk of stream) {
+ yield chunk.toUpperCase();
+ }
+ }
+
+ const source = new Readable({
+ objectMode: true,
+ read() {
+ this.push('a');
+ this.push('b');
+ this.push('c');
+ this.push(null);
+ }
+ });
+
+ const stream = Readable.from(generate(source));
+
+ const expected = ['A', 'B', 'C'];
+
+ for await (const chunk of stream) {
+ strictEqual(chunk, expected.shift());
+ }
+}
+
+Promise.all([
+ toReadableBasicSupport(),
+ toReadableSyncIterator(),
+ toReadablePromises(),
+ toReadableString(),
+ toReadableOnData(),
+ toReadableOnDataNonObject(),
+ destroysTheStreamWhenThrowing(),
+ asTransformStream()
+]).then(mustCall());