summaryrefslogtreecommitdiff
path: root/lib/gitlab/database/background_migration/batched_migration_wrapper.rb
blob: 057f856d85946ca16dd2f486206970a496cb690a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# frozen_string_literal: true

module Gitlab
  module Database
    module BackgroundMigration
      class BatchedMigrationWrapper
        extend Gitlab::Utils::StrongMemoize

        def initialize(connection: ApplicationRecord.connection)
          @connection = connection
        end

        # Wraps the execution of a batched_background_migration.
        #
        # Updates the job's tracking records with the status of the migration
        # when starting and finishing execution, and optionally saves batch_metrics
        # the migration provides, if any are given.
        #
        # The job's batch_metrics are serialized to JSON for storage.
        def perform(batch_tracking_record)
          start_tracking_execution(batch_tracking_record)

          execute_batch(batch_tracking_record)

          batch_tracking_record.succeed!
        rescue Exception => error # rubocop:disable Lint/RescueException
          batch_tracking_record.failure!(error: error)

          raise
        ensure
          track_prometheus_metrics(batch_tracking_record)
        end

        private

        attr_reader :connection

        def start_tracking_execution(tracking_record)
          tracking_record.run!
        end

        def execute_batch(tracking_record)
          job_instance = migration_instance_for(tracking_record.migration_job_class)

          job_instance.perform(
            tracking_record.min_value,
            tracking_record.max_value,
            tracking_record.migration_table_name,
            tracking_record.migration_column_name,
            tracking_record.sub_batch_size,
            tracking_record.pause_ms,
            *tracking_record.migration_job_arguments)

          if job_instance.respond_to?(:batch_metrics)
            tracking_record.metrics = job_instance.batch_metrics
          end
        end

        def migration_instance_for(job_class)
          if job_class < Gitlab::BackgroundMigration::BaseJob
            job_class.new(connection: connection)
          else
            job_class.new
          end
        end

        def track_prometheus_metrics(tracking_record)
          migration = tracking_record.batched_migration
          base_labels = migration.prometheus_labels

          metric_for(:gauge_batch_size).set(base_labels, tracking_record.batch_size)
          metric_for(:gauge_sub_batch_size).set(base_labels, tracking_record.sub_batch_size)
          metric_for(:gauge_interval).set(base_labels, tracking_record.batched_migration.interval)
          metric_for(:gauge_job_duration).set(base_labels, (tracking_record.finished_at - tracking_record.started_at).to_i)
          metric_for(:counter_updated_tuples).increment(base_labels, tracking_record.batch_size)
          metric_for(:gauge_migrated_tuples).set(base_labels, tracking_record.batched_migration.migrated_tuple_count)
          metric_for(:gauge_total_tuple_count).set(base_labels, tracking_record.batched_migration.total_tuple_count)
          metric_for(:gauge_last_update_time).set(base_labels, Time.current.to_i)

          if metrics = tracking_record.metrics
            metrics['timings']&.each do |key, timings|
              summary = metric_for(:histogram_timings)
              labels = base_labels.merge(operation: key)

              timings.each do |timing|
                summary.observe(labels, timing)
              end
            end
          end
        end

        def metric_for(name)
          self.class.metrics[name]
        end

        def self.metrics
          strong_memoize(:metrics) do
            {
              gauge_batch_size: Gitlab::Metrics.gauge(
                :batched_migration_job_batch_size,
                'Batch size for a batched migration job'
              ),
              gauge_sub_batch_size: Gitlab::Metrics.gauge(
                :batched_migration_job_sub_batch_size,
                'Sub-batch size for a batched migration job'
              ),
              gauge_interval: Gitlab::Metrics.gauge(
                :batched_migration_job_interval_seconds,
                'Interval for a batched migration job'
              ),
              gauge_job_duration: Gitlab::Metrics.gauge(
                :batched_migration_job_duration_seconds,
                'Duration for a batched migration job'
              ),
              counter_updated_tuples: Gitlab::Metrics.counter(
                :batched_migration_job_updated_tuples_total,
                'Number of tuples updated by batched migration job'
              ),
              gauge_migrated_tuples: Gitlab::Metrics.gauge(
                :batched_migration_migrated_tuples_total,
                'Total number of tuples migrated by a batched migration'
              ),
              histogram_timings: Gitlab::Metrics.histogram(
                :batched_migration_job_query_duration_seconds,
                'Query timings for a batched migration job',
                {},
                [0.1, 0.25, 0.5, 1, 5].freeze
              ),
              gauge_total_tuple_count: Gitlab::Metrics.gauge(
                :batched_migration_total_tuple_count,
                'Total tuple count the migration needs to touch'
              ),
              gauge_last_update_time: Gitlab::Metrics.gauge(
                :batched_migration_last_update_time_seconds,
                'Unix epoch time in seconds'
              )
            }
          end
        end
      end
    end
  end
end