summaryrefslogtreecommitdiff
path: root/lib/gitlab/database/migrations/test_batched_background_runner.rb
blob: c123d01f32779856e05cc69b4d937cf9bf9c8f10 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
# frozen_string_literal: true

module Gitlab
  module Database
    module Migrations
      class TestBatchedBackgroundRunner < BaseBackgroundRunner
        include Gitlab::Database::DynamicModelHelpers

        def initialize(result_dir:, connection:, from_id:)
          super(result_dir: result_dir, connection: connection)
          @connection = connection
          @from_id = from_id
        end

        def jobs_by_migration_name
          set_shared_model_connection do
            Gitlab::Database::BackgroundMigration::BatchedMigration
              .executable
              .where('id > ?', from_id)
              .to_h do |migration|
              batching_strategy = migration.batch_class.new(connection: connection)

              smallest_batch_start = migration.next_min_value

              table_max_value = define_batchable_model(migration.table_name, connection: connection)
                                  .maximum(migration.column_name)

              largest_batch_start = table_max_value - migration.batch_size

              # variance is the portion of the batch range that we shrink between variance * 0 and variance * 1
              # to pick actual batches to sample.
              variance = largest_batch_start - smallest_batch_start

              batch_starts = uniform_fractions
                               .lazy # frac varies from 0 to 1, values in smallest_batch_start..largest_batch_start
                               .map { |frac| (variance * frac).to_i + smallest_batch_start }

              # Track previously run batches so that we stop sampling if a new batch would intersect an older one
              completed_batches = []

              jobs_to_sample = batch_starts
                                 # Stop sampling if a batch would intersect a previous batch
                                 .take_while { |start| completed_batches.none? { |batch| batch.cover?(start) } }
                                 .map do |batch_start|
                # The current block is lazily evaluated as part of the jobs_to_sample enumerable
                # so it executes after the enclosing using_connection block has already executed
                # Therefore we need to re-associate with the explicit connection again
                Gitlab::Database::SharedModel.using_connection(connection) do
                  next_bounds = batching_strategy.next_batch(
                    migration.table_name,
                    migration.column_name,
                    batch_min_value: batch_start,
                    batch_size: migration.batch_size,
                    job_arguments: migration.job_arguments
                  )

                  batch_min, batch_max = next_bounds

                  job = migration.create_batched_job!(batch_min, batch_max)

                  completed_batches << (batch_min..batch_max)

                  job
                end
              end

              [migration.job_class_name, jobs_to_sample]
            end
          end
        end

        def run_job(job)
          set_shared_model_connection do
            Gitlab::Database::BackgroundMigration::BatchedMigrationWrapper.new(connection: connection).perform(job)
          end
        end

        def uniform_fractions
          Enumerator.new do |y|
            # Generates equally distributed fractions between 0 and 1, with increasing detail as more are pulled from
            # the enumerator.
            # 0, 1 (special case)
            # 1/2
            # 1/4, 3/4
            # 1/8, 3/8, 5/8, 7/8
            # etc.
            # The pattern here is at each outer loop, the denominator multiplies by 2, and at each inner loop,
            # the numerator counts up all odd numbers 1 <= n < denominator.
            y << 0
            y << 1

            # denominators are each increasing power of 2
            denominators = (1..).lazy.map { |exponent| 2**exponent }

            denominators.each do |denominator|
              # Numerators at the current step are all odd numbers between 1 and the denominator
              numerators = (1..denominator).step(2)

              numerators.each do |numerator|
                next_frac = numerator.fdiv(denominator)
                y << next_frac
              end
            end
          end
        end

        private

        attr_reader :from_id

        def set_shared_model_connection(&block)
          Gitlab::Database::SharedModel.using_connection(connection, &block)
        end

        def migration_meta(job)
          set_shared_model_connection do
            job.batched_migration.slice(:max_batch_size, :total_tuple_count, :interval)
          end
        end
      end
    end
  end
end