summaryrefslogtreecommitdiff
path: root/app/assets/javascripts/streaming/chunk_writer.js
blob: 4bbd0a5f8438f29d7629ab2ad694e66d826051a4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
import { throttle } from 'lodash';
import { RenderBalancer } from '~/streaming/render_balancer';
import {
  BALANCE_RATE,
  HIGH_FRAME_TIME,
  LOW_FRAME_TIME,
  MAX_CHUNK_SIZE,
  MIN_CHUNK_SIZE,
  TIMEOUT,
} from '~/streaming/constants';

const defaultConfig = {
  balanceRate: BALANCE_RATE,
  minChunkSize: MIN_CHUNK_SIZE,
  maxChunkSize: MAX_CHUNK_SIZE,
  lowFrameTime: LOW_FRAME_TIME,
  highFrameTime: HIGH_FRAME_TIME,
  timeout: TIMEOUT,
};

function concatUint8Arrays(a, b) {
  const array = new Uint8Array(a.length + b.length);
  array.set(a, 0);
  array.set(b, a.length);
  return array;
}

// This class is used to write chunks with a balanced size
// to avoid blocking main thread for too long.
//
// A chunk can be:
//   1. Too small
//   2. Too large
//   3. Delayed in time
//
// This class resolves all these problems by
//   1. Splitting or concatenating chunks to met the size criteria
//   2. Rendering current chunk buffer immediately if enough time has passed
//
// The size of the chunk is determined by RenderBalancer,
// It measures execution time for each chunk write and adjusts next chunk size.
export class ChunkWriter {
  buffer = null;
  decoder = new TextDecoder('utf-8');
  timeout = null;

  constructor(htmlStream, config) {
    this.htmlStream = htmlStream;

    const { balanceRate, minChunkSize, maxChunkSize, lowFrameTime, highFrameTime, timeout } = {
      ...defaultConfig,
      ...config,
    };

    // ensure we still render chunks over time if the size criteria is not met
    this.scheduleAccumulatorFlush = throttle(this.flushAccumulator.bind(this), timeout);

    const averageSize = Math.round((maxChunkSize + minChunkSize) / 2);
    this.size = Math.max(averageSize, minChunkSize);

    this.balancer = new RenderBalancer({
      lowFrameTime,
      highFrameTime,
      decrease: () => {
        this.size = Math.round(Math.max(this.size / balanceRate, minChunkSize));
      },
      increase: () => {
        this.size = Math.round(Math.min(this.size * balanceRate, maxChunkSize));
      },
    });
  }

  write(chunk) {
    this.scheduleAccumulatorFlush.cancel();

    if (this.buffer) {
      this.buffer = concatUint8Arrays(this.buffer, chunk);
    } else {
      this.buffer = chunk;
    }

    // accumulate chunks until the size is fulfilled
    if (this.size > this.buffer.length) {
      this.scheduleAccumulatorFlush();
      return Promise.resolve();
    }

    return this.balancedWrite();
  }

  balancedWrite() {
    let cursor = 0;

    return this.balancer.render(() => {
      const chunkPart = this.buffer.subarray(cursor, cursor + this.size);
      // accumulate chunks until the size is fulfilled
      // this is a hot path for the last chunkPart of the chunk
      if (chunkPart.length < this.size) {
        this.buffer = chunkPart;
        this.scheduleAccumulatorFlush();
        return false;
      }

      this.writeToDom(chunkPart);

      cursor += this.size;
      if (cursor >= this.buffer.length) {
        this.buffer = null;
        return false;
      }
      // continue render
      return true;
    });
  }

  writeToDom(chunk, stream = true) {
    // stream: true allows us to split chunks with multi-part words
    const decoded = this.decoder.decode(chunk, { stream });
    this.htmlStream.write(decoded);
  }

  flushAccumulator() {
    if (this.buffer) {
      this.writeToDom(this.buffer);
      this.buffer = null;
    }
  }

  close() {
    this.scheduleAccumulatorFlush.cancel();
    if (this.buffer) {
      // last chunk should have stream: false to indicate the end of the stream
      this.writeToDom(this.buffer, false);
      this.buffer = null;
    }
    this.htmlStream.close();
  }

  abort() {
    this.scheduleAccumulatorFlush.cancel();
    this.buffer = null;
    this.htmlStream.abort();
  }
}