summaryrefslogtreecommitdiff
path: root/test/parallel/test-stream-map.js
diff options
context:
space:
mode:
authorBenjamin Gruenbaum <benjamingr@gmail.com>2021-11-15 15:39:05 +0200
committerRobert Nagy <ronagy@icloud.com>2021-12-29 20:32:36 +0100
commitb97b81d4ec674437f49931268c328703683125ee (patch)
treedcddf803fc74d7a42ab520fbcf37f1714412587d /test/parallel/test-stream-map.js
parentf81c62704fc4e3e0eb1b1c813854ad52fbb8fe75 (diff)
downloadnode-new-b97b81d4ec674437f49931268c328703683125ee.tar.gz
stream: add map method to Readable
Implement the map method on readable stream. This starts the alignment with the tc39-iterator-helpers proposal and adds a `.map` method to every Node.js readable stream. Co-Authored-By: Robert Nagy <ronag@icloud.com> PR-URL: https://github.com/nodejs/node/pull/40815 Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Matteo Collina <matteo.collina@gmail.com>
Diffstat (limited to 'test/parallel/test-stream-map.js')
-rw-r--r--test/parallel/test-stream-map.js108
1 files changed, 108 insertions, 0 deletions
diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js
new file mode 100644
index 0000000000..2d5c5894e1
--- /dev/null
+++ b/test/parallel/test-stream-map.js
@@ -0,0 +1,108 @@
+'use strict';
+
+const common = require('../common');
+const {
+ Readable,
+} = require('stream');
+const assert = require('assert');
+const { setTimeout } = require('timers/promises');
+
+{
+ // Map works on synchronous streams with a synchronous mapper
+ const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
+ const result = [2, 4, 6, 8, 10];
+ (async () => {
+ for await (const item of stream) {
+ assert.strictEqual(item, result.shift());
+ }
+ })().then(common.mustCall());
+}
+
+{
+ // Map works on synchronous streams with an asynchronous mapper
+ const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
+ await Promise.resolve();
+ return x + x;
+ });
+ const result = [2, 4, 6, 8, 10];
+ (async () => {
+ for await (const item of stream) {
+ assert.strictEqual(item, result.shift());
+ }
+ })().then(common.mustCall());
+}
+
+{
+ // Map works on asynchronous streams with a asynchronous mapper
+ const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
+ return x + x;
+ }).map((x) => x + x);
+ const result = [4, 8, 12, 16, 20];
+ (async () => {
+ for await (const item of stream) {
+ assert.strictEqual(item, result.shift());
+ }
+ })().then(common.mustCall());
+}
+
+{
+ // Concurrency + AbortSignal
+ const ac = new AbortController();
+ let calls = 0;
+ const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
+ calls++;
+ await setTimeout(100, { signal });
+ }, { signal: ac.signal, concurrency: 2 });
+ // pump
+ assert.rejects(async () => {
+ for await (const item of stream) {
+ // nope
+ console.log(item);
+ }
+ }, {
+ name: 'AbortError',
+ }).then(common.mustCall());
+
+ setImmediate(() => {
+ ac.abort();
+ assert.strictEqual(calls, 2);
+ });
+}
+
+{
+ // Concurrency result order
+ const stream = Readable.from([1, 2]).map(async (item, { signal }) => {
+ await setTimeout(10 - item, { signal });
+ return item;
+ }, { concurrency: 2 });
+
+ (async () => {
+ const expected = [1, 2];
+ for await (const item of stream) {
+ assert.strictEqual(item, expected.shift());
+ }
+ })().then(common.mustCall());
+}
+
+{
+ // Error cases
+ assert.rejects(async () => {
+ // eslint-disable-next-line no-unused-vars
+ for await (const unused of Readable.from([1]).map(1));
+ }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
+ assert.rejects(async () => {
+ // eslint-disable-next-line no-unused-vars
+ for await (const _ of Readable.from([1]).map((x) => x, {
+ concurrency: 'Foo'
+ }));
+ }, /ERR_OUT_OF_RANGE/).then(common.mustCall());
+ assert.rejects(async () => {
+ // eslint-disable-next-line no-unused-vars
+ for await (const _ of Readable.from([1]).map((x) => x, 1));
+ }, /ERR_INVALID_ARG_TYPE/).then(common.mustCall());
+}
+{
+ // Test result is a Readable
+ const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x);
+ assert.strictEqual(stream.readable, true);
+}