1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
|
const consumeReadableStream = (stream) => {
return new Promise((resolve, reject) => {
stream.pipeTo(
new WritableStream({
close: resolve,
abort: reject,
}),
);
});
};
const wait = (timeout) =>
new Promise((resolve) => {
setTimeout(resolve, timeout);
});
// this rate-limiting approach is specific to Web Streams
// because streams only resolve when they're fully consumed
// so we need to split each stream into two pieces:
// one for the rate-limiter (wait for all the bytes to be sent)
// another for the original consumer
export const rateLimitStreamRequests = ({
factory,
total,
maxConcurrentRequests,
immediateCount = maxConcurrentRequests,
timeout = 0,
}) => {
if (total === 0) return [];
const unsettled = [];
const pushUnsettled = (promise) => {
let res;
let rej;
const consume = new Promise((resolve, reject) => {
res = resolve;
rej = reject;
});
unsettled.push(consume);
return promise.then((stream) => {
const [first, second] = stream.tee();
// eslint-disable-next-line promise/no-nesting
consumeReadableStream(first)
.then(() => {
unsettled.splice(unsettled.indexOf(consume), 1);
res();
})
.catch(rej);
return second;
}, rej);
};
const immediate = Array.from({ length: Math.min(immediateCount, total) }, (_, i) =>
pushUnsettled(factory(i)),
);
const queue = [];
const flushQueue = () => {
const promises =
unsettled.length > maxConcurrentRequests ? unsettled : [...unsettled, wait(timeout)];
// errors are handled by the caller
// eslint-disable-next-line promise/catch-or-return
Promise.race(promises).then(() => {
const cb = queue.shift();
cb?.();
if (queue.length !== 0) {
// wait for stream consumer promise to be removed from unsettled
queueMicrotask(flushQueue);
}
});
};
const throttled = Array.from({ length: total - immediateCount }, (_, i) => {
return new Promise((resolve, reject) => {
queue.push(() => {
pushUnsettled(factory(i + immediateCount))
.then(resolve)
.catch(reject);
});
});
});
flushQueue();
return [...immediate, ...throttled];
};
|