diff options
Diffstat (limited to 'app/assets/javascripts/streaming/chunk_writer.js')
-rw-r--r-- | app/assets/javascripts/streaming/chunk_writer.js | 144 |
1 files changed, 144 insertions, 0 deletions
diff --git a/app/assets/javascripts/streaming/chunk_writer.js b/app/assets/javascripts/streaming/chunk_writer.js new file mode 100644 index 00000000000..4bbd0a5f843 --- /dev/null +++ b/app/assets/javascripts/streaming/chunk_writer.js @@ -0,0 +1,144 @@ +import { throttle } from 'lodash'; +import { RenderBalancer } from '~/streaming/render_balancer'; +import { + BALANCE_RATE, + HIGH_FRAME_TIME, + LOW_FRAME_TIME, + MAX_CHUNK_SIZE, + MIN_CHUNK_SIZE, + TIMEOUT, +} from '~/streaming/constants'; + +const defaultConfig = { + balanceRate: BALANCE_RATE, + minChunkSize: MIN_CHUNK_SIZE, + maxChunkSize: MAX_CHUNK_SIZE, + lowFrameTime: LOW_FRAME_TIME, + highFrameTime: HIGH_FRAME_TIME, + timeout: TIMEOUT, +}; + +function concatUint8Arrays(a, b) { + const array = new Uint8Array(a.length + b.length); + array.set(a, 0); + array.set(b, a.length); + return array; +} + +// This class is used to write chunks with a balanced size +// to avoid blocking main thread for too long. +// +// A chunk can be: +// 1. Too small +// 2. Too large +// 3. Delayed in time +// +// This class resolves all these problems by +// 1. Splitting or concatenating chunks to met the size criteria +// 2. Rendering current chunk buffer immediately if enough time has passed +// +// The size of the chunk is determined by RenderBalancer, +// It measures execution time for each chunk write and adjusts next chunk size. +export class ChunkWriter { + buffer = null; + decoder = new TextDecoder('utf-8'); + timeout = null; + + constructor(htmlStream, config) { + this.htmlStream = htmlStream; + + const { balanceRate, minChunkSize, maxChunkSize, lowFrameTime, highFrameTime, timeout } = { + ...defaultConfig, + ...config, + }; + + // ensure we still render chunks over time if the size criteria is not met + this.scheduleAccumulatorFlush = throttle(this.flushAccumulator.bind(this), timeout); + + const averageSize = Math.round((maxChunkSize + minChunkSize) / 2); + this.size = Math.max(averageSize, minChunkSize); + + this.balancer = new RenderBalancer({ + lowFrameTime, + highFrameTime, + decrease: () => { + this.size = Math.round(Math.max(this.size / balanceRate, minChunkSize)); + }, + increase: () => { + this.size = Math.round(Math.min(this.size * balanceRate, maxChunkSize)); + }, + }); + } + + write(chunk) { + this.scheduleAccumulatorFlush.cancel(); + + if (this.buffer) { + this.buffer = concatUint8Arrays(this.buffer, chunk); + } else { + this.buffer = chunk; + } + + // accumulate chunks until the size is fulfilled + if (this.size > this.buffer.length) { + this.scheduleAccumulatorFlush(); + return Promise.resolve(); + } + + return this.balancedWrite(); + } + + balancedWrite() { + let cursor = 0; + + return this.balancer.render(() => { + const chunkPart = this.buffer.subarray(cursor, cursor + this.size); + // accumulate chunks until the size is fulfilled + // this is a hot path for the last chunkPart of the chunk + if (chunkPart.length < this.size) { + this.buffer = chunkPart; + this.scheduleAccumulatorFlush(); + return false; + } + + this.writeToDom(chunkPart); + + cursor += this.size; + if (cursor >= this.buffer.length) { + this.buffer = null; + return false; + } + // continue render + return true; + }); + } + + writeToDom(chunk, stream = true) { + // stream: true allows us to split chunks with multi-part words + const decoded = this.decoder.decode(chunk, { stream }); + this.htmlStream.write(decoded); + } + + flushAccumulator() { + if (this.buffer) { + this.writeToDom(this.buffer); + this.buffer = null; + } + } + + close() { + this.scheduleAccumulatorFlush.cancel(); + if (this.buffer) { + // last chunk should have stream: false to indicate the end of the stream + this.writeToDom(this.buffer, false); + this.buffer = null; + } + this.htmlStream.close(); + } + + abort() { + this.scheduleAccumulatorFlush.cancel(); + this.buffer = null; + this.htmlStream.abort(); + } +} |