summaryrefslogtreecommitdiff
path: root/test/parallel/test-stream-map.js
diff options
context:
space:
mode:
authorBenjamin Gruenbaum <benjamingr@gmail.com>2022-01-22 10:25:18 +0200
committerBenjamin Gruenbaum <benjamingr@gmail.com>2022-01-24 20:15:58 +0200
commitce41395f89414dfd459084ea61a7eeac1f67713a (patch)
tree04f72943dc715b00400a149149a9247452b80403 /test/parallel/test-stream-map.js
parent3ee8c3e45e5f38126f6cb2b181fc3c88d96f0503 (diff)
downloadnode-new-ce41395f89414dfd459084ea61a7eeac1f67713a.tar.gz
test: add stream map tests
Add more tests to check and enforce the behavior of the map method. Co-Authored-By: Antoine du Hamel <duhamelantoine1995@gmail.com> PR-URL: https://github.com/nodejs/node/pull/41642 Reviewed-By: Matteo Collina <matteo.collina@gmail.com> Reviewed-By: Robert Nagy <ronagy@icloud.com> Reviewed-By: Antoine du Hamel <duhamelantoine1995@gmail.com>
Diffstat (limited to 'test/parallel/test-stream-map.js')
-rw-r--r--test/parallel/test-stream-map.js121
1 files changed, 105 insertions, 16 deletions
diff --git a/test/parallel/test-stream-map.js b/test/parallel/test-stream-map.js
index 2d5c5894e1..c3a994ae39 100644
--- a/test/parallel/test-stream-map.js
+++ b/test/parallel/test-stream-map.js
@@ -5,16 +5,14 @@ const {
Readable,
} = require('stream');
const assert = require('assert');
+const { once } = require('events');
const { setTimeout } = require('timers/promises');
{
// Map works on synchronous streams with a synchronous mapper
const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => x + x);
- const result = [2, 4, 6, 8, 10];
(async () => {
- for await (const item of stream) {
- assert.strictEqual(item, result.shift());
- }
+ assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
})().then(common.mustCall());
}
@@ -24,7 +22,49 @@ const { setTimeout } = require('timers/promises');
await Promise.resolve();
return x + x;
});
- const result = [2, 4, 6, 8, 10];
+ (async () => {
+ assert.deepStrictEqual(await stream.toArray(), [2, 4, 6, 8, 10]);
+ })().then(common.mustCall());
+}
+
+{
+ // Map works on asynchronous streams with a asynchronous mapper
+ const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
+ return x + x;
+ }).map((x) => x + x);
+ (async () => {
+ assert.deepStrictEqual(await stream.toArray(), [4, 8, 12, 16, 20]);
+ })().then(common.mustCall());
+}
+
+{
+ // Map works on an infinite stream
+ const stream = Readable.from(async function* () {
+ while (true) yield 1;
+ }()).map(common.mustCall(async (x) => {
+ return x + x;
+ }, 5));
+ (async () => {
+ let i = 1;
+ for await (const item of stream) {
+ assert.strictEqual(item, 2);
+ if (++i === 5) break;
+ }
+ })().then(common.mustCall());
+}
+
+{
+ // Map works on non-objectMode streams
+ const stream = new Readable({
+ read() {
+ this.push(Uint8Array.from([1]));
+ this.push(Uint8Array.from([2]));
+ this.push(null);
+ }
+ }).map(async ([x]) => {
+ return x + x;
+ }).map((x) => x + x);
+ const result = [4, 8];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
@@ -33,11 +73,19 @@ const { setTimeout } = require('timers/promises');
}
{
- // Map works on asynchronous streams with a asynchronous mapper
- const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
+ // Does not care about data events
+ const source = new Readable({
+ read() {
+ this.push(Uint8Array.from([1]));
+ this.push(Uint8Array.from([2]));
+ this.push(null);
+ }
+ });
+ setImmediate(() => stream.emit('data', Uint8Array.from([1])));
+ const stream = source.map(async ([x]) => {
return x + x;
}).map((x) => x + x);
- const result = [4, 8, 12, 16, 20];
+ const result = [4, 8];
(async () => {
for await (const item of stream) {
assert.strictEqual(item, result.shift());
@@ -46,18 +94,60 @@ const { setTimeout } = require('timers/promises');
}
{
+ // Emitting an error during `map`
+ const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
+ if (x === 3) {
+ stream.emit('error', new Error('boom'));
+ }
+ return x + x;
+ });
+ assert.rejects(
+ stream.map((x) => x + x).toArray(),
+ /boom/,
+ ).then(common.mustCall());
+}
+
+{
+ // Throwing an error during `map` (sync)
+ const stream = Readable.from([1, 2, 3, 4, 5]).map((x) => {
+ if (x === 3) {
+ throw new Error('boom');
+ }
+ return x + x;
+ });
+ assert.rejects(
+ stream.map((x) => x + x).toArray(),
+ /boom/,
+ ).then(common.mustCall());
+}
+
+
+{
+ // Throwing an error during `map` (async)
+ const stream = Readable.from([1, 2, 3, 4, 5]).map(async (x) => {
+ if (x === 3) {
+ throw new Error('boom');
+ }
+ return x + x;
+ });
+ assert.rejects(
+ stream.map((x) => x + x).toArray(),
+ /boom/,
+ ).then(common.mustCall());
+}
+
+{
// Concurrency + AbortSignal
const ac = new AbortController();
- let calls = 0;
- const stream = Readable.from([1, 2, 3, 4, 5]).map(async (_, { signal }) => {
- calls++;
- await setTimeout(100, { signal });
- }, { signal: ac.signal, concurrency: 2 });
+ const range = Readable.from([1, 2, 3, 4, 5]);
+ const stream = range.map(common.mustCall(async (_, { signal }) => {
+ await once(signal, 'abort');
+ throw signal.reason;
+ }, 2), { signal: ac.signal, concurrency: 2 });
// pump
assert.rejects(async () => {
for await (const item of stream) {
- // nope
- console.log(item);
+ assert.fail('should not reach here, got ' + item);
}
}, {
name: 'AbortError',
@@ -65,7 +155,6 @@ const { setTimeout } = require('timers/promises');
setImmediate(() => {
ac.abort();
- assert.strictEqual(calls, 2);
});
}