diff options
author | Benjamin Gruenbaum <benjamingr@gmail.com> | 2021-11-15 15:39:05 +0200 |
---|---|---|
committer | Robert Nagy <ronagy@icloud.com> | 2021-12-29 20:32:36 +0100 |
commit | b97b81d4ec674437f49931268c328703683125ee (patch) | |
tree | dcddf803fc74d7a42ab520fbcf37f1714412587d /test/parallel/test-stream-map.js | |
parent | f81c62704fc4e3e0eb1b1c813854ad52fbb8fe75 (diff) | |
download | node-new-b97b81d4ec674437f49931268c328703683125ee.tar.gz |
stream: add map method to Readable
Implement the map method on readable stream. This starts the alignment
with the tc39-iterator-helpers proposal and adds a `.map` method to
every Node.js readable stream.
Co-Authored-By: Robert Nagy <ronag@icloud.com>
PR-URL: https://github.com/nodejs/node/pull/40815
Reviewed-By: Robert Nagy <ronagy@icloud.com>
Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'test/parallel/test-stream-map.js')
-rw-r--r-- | test/parallel/test-stream-map.js | 108 |
1 files changed, 108 insertions, 0 deletions
diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js new file mode 100644 index 0000000000..2d5c5894e1 --- /dev/null +++ b/test/parallel/test-stream-map.js @@ -0,0 +1,108 @@ +'use strict'; + +const common = require('../common'); +const { + Readable, +} = require('stream'); +const assert = require('assert'); +const { setTimeout } = require('timers/promises'); + +{ + // Map works on synchronous streams with a synchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x); + const result = [2, 4, 6, 8, 10]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Map works on synchronous streams with an asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + await Promise.resolve(); + return x + x; + }); + const result = [2, 4, 6, 8, 10]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Map works on asynchronous streams with a asynchronous mapper + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => { + return x + x; + }).map((x) => x + x); + const result = [4, 8, 12, 16, 20]; + (async () => { + for await (const item of stream) { + assert.strictEqual(item, result.shift()); + } + })().then(common.mustCall()); +} + +{ + // Concurrency + AbortSignal + const ac = new AbortController(); + let calls = 0; + const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => { + calls++; + await setTimeout(100, { signal }); + }, { signal: ac.signal, concurrency: 2 }); + // pump + assert.rejects(async () => { + for await (const item of stream) { + // nope + console.log(item); + } + }, { + name: 'AbortError', + }).then(common.mustCall()); + + setImmediate(() => { + ac.abort(); + assert.strictEqual(calls, 2); + }); +} + +{ + // Concurrency result order + const stream = Readable.from([1, 2]).map(async (item, { signal }) => { + await setTimeout(10 - item, { signal }); + return item; + }, { concurrency: 2 }); + + (async () => { + const expected = [1, 2]; + for await (const item of stream) { + assert.strictEqual(item, expected.shift()); + } + })().then(common.mustCall()); +} + +{ + // Error cases + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const unused of Readable.from([1]).map(1)); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).map((x) => x, { + concurrency: 'Foo' + })); + }, /ERR_OUT_OF_RANGE/).then(common.mustCall()); + assert.rejects(async () => { + // eslint-disable-next-line no-unused-vars + for await (const _ of Readable.from([1]).map((x) => x, 1)); + }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall()); +} +{ + // Test result is a Readable + const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x); + assert.strictEqual(stream.readable, true); +} |