summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenjamin Gruenbaum <benjamingr@gmail.com>2022-01-17 20:21:51 +0200
committerBenjamin Gruenbaum <benjamingr@gmail.com>2022-01-21 12:10:17 +0200
commit5badf46f2a9363e6762900e18d4f85541738f738 (patch)
tree293209a8b1789e97949bc8f35586e2f8bf2978ff
parentef35175527971c44deed68574c1026fa8c4a8f9a (diff)
downloadnode-new-5badf46f2a9363e6762900e18d4f85541738f738.tar.gz
stream: support some and every
This continues on the iterator-helpers work by adding `.some` and `.every` to readable streams. Co-Authored-By: Robert Nagy <ronagy@icloud.com> PR-URL: https://github.com/nodejs/node/pull/41573 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
-rw-r--r--doc/api/stream.md100
-rw-r--r--lib/internal/streams/operators.js41
-rw-r--r--test/parallel/test-stream-some-every.js95
3 files changed, 235 insertions, 1 deletions
diff --git a/doc/api/stream.md b/doc/api/stream.md
index 6f0cb937d5..6ec019875d 100644
--- a/doc/api/stream.md
+++ b/doc/api/stream.md
@@ -1918,7 +1918,7 @@ import { Resolver } from 'dns/promises';
await Readable.from([1, 2, 3, 4]).toArray(); // [1, 2, 3, 4]
// Make dns queries concurrently using .map and collect
-// the results into an aray using toArray
+// the results into an array using toArray
const dnsResults = await Readable.from([
'nodejs.org',
'openjsf.org',
@@ -1929,6 +1929,104 @@ const dnsResults = await Readable.from([
}, { concurrency: 2 }).toArray();
```
+### `readable.some(fn[, options])`
+
+<!-- YAML
+added: REPLACEME
+-->
+
+> Stability: 1 - Experimental
+
+* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
+ * `data` {any} a chunk of data from the stream.
+ * `options` {Object}
+ * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
+ abort the `fn` call early.
+* `options` {Object}
+ * `concurrency` {number} the maximum concurrent invocation of `fn` to call
+ on the stream at once. **Default:** `1`.
+ * `signal` {AbortSignal} allows destroying the stream if the signal is
+ aborted.
+* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
+ value for at least one of the chunks.
+
+This method is similar to `Array.prototype.some` and calls `fn` on each chunk
+in the stream until the awaited return value is `true` (or any truthy value).
+Once an `fn` call on a chunk awaited return value is truthy, the stream is
+destroyed and the promise is fulfilled with `true`. If none of the `fn`
+calls on the chunks return a truthy value, the promise is fulfilled with
+`false`.
+
+```mjs
+import { Readable } from 'stream';
+import { stat } from 'fs/promises';
+
+// With a synchronous predicate.
+await Readable.from([1, 2, 3, 4]).some((x) => x > 2); // true
+await Readable.from([1, 2, 3, 4]).some((x) => x < 0); // false
+
+// With an asynchronous predicate, making at most 2 file checks at a time.
+const anyBigFile = await Readable.from([
+ 'file1',
+ 'file2',
+ 'file3',
+]).some(async (fileName) => {
+ const stats = await stat(fileName);
+ return stat.size > 1024 * 1024;
+}, { concurrency: 2 });
+console.log(anyBigFile); // `true` if any file in the list is bigger than 1MB
+console.log('done'); // Stream has finished
+```
+
+### `readable.every(fn[, options])`
+
+<!-- YAML
+added: REPLACEME
+-->
+
+> Stability: 1 - Experimental
+
+* `fn` {Function|AsyncFunction} a function to call on each item of the stream.
+ * `data` {any} a chunk of data from the stream.
+ * `options` {Object}
+ * `signal` {AbortSignal} aborted if the stream is destroyed allowing to
+ abort the `fn` call early.
+* `options` {Object}
+ * `concurrency` {number} the maximum concurrent invocation of `fn` to call
+ on the stream at once. **Default:** `1`.
+ * `signal` {AbortSignal} allows destroying the stream if the signal is
+ aborted.
+* Returns: {Promise} a promise evaluating to `true` if `fn` returned a truthy
+ value for all of the chunks.
+
+This method is similar to `Array.prototype.every` and calls `fn` on each chunk
+in the stream to check if all awaited return values are truthy value for `fn`.
+Once an `fn` call on a chunk awaited return value is falsy, the stream is
+destroyed and the promise is fulfilled with `false`. If all of the `fn` calls
+on the chunks return a truthy value, the promise is fulfilled with `true`.
+
+```mjs
+import { Readable } from 'stream';
+import { stat } from 'fs/promises';
+
+// With a synchronous predicate.
+await Readable.from([1, 2, 3, 4]).every((x) => x > 2); // false
+await Readable.from([1, 2, 3, 4]).every((x) => x > 0); // true
+
+// With an asynchronous predicate, making at most 2 file checks at a time.
+const allBigFiles = await Readable.from([
+ 'file1',
+ 'file2',
+ 'file3',
+]).every(async (fileName) => {
+ const stats = await stat(fileName);
+ return stat.size > 1024 * 1024;
+}, { concurrency: 2 });
+// `true` if all files in the list are bigger than 1MiB
+console.log(allBigFiles);
+console.log('done'); // Stream has finished
+```
+
### Duplex and transform streams
#### Class: `stream.Duplex`
diff --git a/lib/internal/streams/operators.js b/lib/internal/streams/operators.js
index 2649966fd4..9c50865f3d 100644
--- a/lib/internal/streams/operators.js
+++ b/lib/internal/streams/operators.js
@@ -10,6 +10,7 @@ const {
AbortError,
} = require('internal/errors');
const { validateInteger } = require('internal/validators');
+const { kWeakHandler } = require('internal/event_target');
const {
ArrayPrototypePush,
@@ -47,6 +48,10 @@ async function * map(fn, options) {
const signalOpt = { signal };
const abort = () => ac.abort();
+ if (options?.signal?.aborted) {
+ abort();
+ }
+
options?.signal?.addEventListener('abort', abort);
let next;
@@ -150,6 +155,40 @@ async function * map(fn, options) {
}
}
+async function some(fn, options) {
+ // https://tc39.es/proposal-iterator-helpers/#sec-iteratorprototype.some
+ // Note that some does short circuit but also closes the iterator if it does
+ const ac = new AbortController();
+ if (options?.signal) {
+ if (options.signal.aborted) {
+ ac.abort();
+ }
+ options.signal.addEventListener('abort', () => ac.abort(), {
+ [kWeakHandler]: this,
+ once: true,
+ });
+ }
+ const mapped = this.map(fn, { ...options, signal: ac.signal });
+ for await (const result of mapped) {
+ if (result) {
+ ac.abort();
+ return true;
+ }
+ }
+ return false;
+}
+
+async function every(fn, options) {
+ if (typeof fn !== 'function') {
+ throw new ERR_INVALID_ARG_TYPE(
+ 'fn', ['Function', 'AsyncFunction'], fn);
+ }
+ // https://en.wikipedia.org/wiki/De_Morgan%27s_laws
+ return !(await some.call(this, async (x) => {
+ return !(await fn(x));
+ }, options));
+}
+
async function forEach(fn, options) {
if (typeof fn !== 'function') {
throw new ERR_INVALID_ARG_TYPE(
@@ -196,6 +235,8 @@ module.exports.streamReturningOperators = {
};
module.exports.promiseReturningOperators = {
+ every,
forEach,
toArray,
+ some,
};
diff --git a/test/parallel/test-stream-some-every.js b/test/parallel/test-stream-some-every.js
new file mode 100644
index 0000000000..c2be5ea955
--- /dev/null
+++ b/test/parallel/test-stream-some-every.js
@@ -0,0 +1,95 @@
+'use strict';
+
+const common = require('../common');
+const {
+ Readable,
+} = require('stream');
+const assert = require('assert');
+
+function oneTo5() {
+ return Readable.from([1, 2, 3, 4, 5]);
+}
+
+function oneTo5Async() {
+ return oneTo5().map(async (x) => {
+ await Promise.resolve();
+ return x;
+ });
+}
+{
+ // Some and every work with a synchronous stream and predicate
+ (async () => {
+ assert.strictEqual(await oneTo5().some((x) => x > 3), true);
+ assert.strictEqual(await oneTo5().every((x) => x > 3), false);
+ assert.strictEqual(await oneTo5().some((x) => x > 6), false);
+ assert.strictEqual(await oneTo5().every((x) => x < 6), true);
+ assert.strictEqual(await Readable.from([]).some((x) => true), false);
+ assert.strictEqual(await Readable.from([]).every((x) => true), true);
+ })().then(common.mustCall());
+}
+
+{
+ // Some and every work with an asynchronous stream and synchronous predicate
+ (async () => {
+ assert.strictEqual(await oneTo5Async().some((x) => x > 3), true);
+ assert.strictEqual(await oneTo5Async().every((x) => x > 3), false);
+ assert.strictEqual(await oneTo5Async().some((x) => x > 6), false);
+ assert.strictEqual(await oneTo5Async().every((x) => x < 6), true);
+ })().then(common.mustCall());
+}
+
+{
+ // Some and every work on asynchronous streams with an asynchronous predicate
+ (async () => {
+ assert.strictEqual(await oneTo5().some(async (x) => x > 3), true);
+ assert.strictEqual(await oneTo5().every(async (x) => x > 3), false);
+ assert.strictEqual(await oneTo5().some(async (x) => x > 6), false);
+ assert.strictEqual(await oneTo5().every(async (x) => x < 6), true);
+ })().then(common.mustCall());
+}
+
+{
+ // Some and every short circuit
+ (async () => {
+ await oneTo5().some(common.mustCall((x) => x > 2, 3));
+ await oneTo5().every(common.mustCall((x) => x < 3, 3));
+ // When short circuit isn't possible the whole stream is iterated
+ await oneTo5().some(common.mustCall((x) => x > 6, 5));
+ // The stream is destroyed afterwards
+ const stream = oneTo5();
+ await stream.some(common.mustCall((x) => x > 2, 3));
+ assert.strictEqual(stream.destroyed, true);
+ })().then(common.mustCall());
+}
+
+{
+ // Support for AbortSignal
+ const ac = new AbortController();
+ assert.rejects(Readable.from([1, 2, 3]).some(
+ () => new Promise(() => {}),
+ { signal: ac.signal }
+ ), {
+ name: 'AbortError',
+ }).then(common.mustCall());
+ ac.abort();
+}
+{
+ // Support for pre-aborted AbortSignal
+ assert.rejects(Readable.from([1, 2, 3]).some(
+ () => new Promise(() => {}),
+ { signal: AbortSignal.abort() }
+ ), {
+ name: 'AbortError',
+ }).then(common.mustCall());
+}
+{
+ // Error cases
+ assert.rejects(async () => {
+ await Readable.from([1]).every(1);
+ }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
+ assert.rejects(async () => {
+ await Readable.from([1]).every((x) => x, {
+ concurrency: 'Foo'
+ });
+ }, /ERR_OUT_OF_RANGE/).then(common.mustCall());
+}