summaryrefslogtreecommitdiff
path: root/app/workers/database/batched_background_migration/execution_worker.rb
blob: 37b40c73ca657ff5ff0b81c5a70c0dead77361e6 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
# frozen_string_literal: true

module Database
  module BatchedBackgroundMigration
    module ExecutionWorker
      extend ActiveSupport::Concern
      include ExclusiveLeaseGuard
      include Gitlab::Utils::StrongMemoize
      include ApplicationWorker
      include LimitedCapacity::Worker

      INTERVAL_VARIANCE = 5.seconds.freeze
      LEASE_TIMEOUT_MULTIPLIER = 3
      MAX_RUNNING_MIGRATIONS = 4

      included do
        data_consistency :always
        feature_category :database
        queue_namespace :batched_background_migrations
      end

      class_methods do
        def max_running_jobs
          MAX_RUNNING_MIGRATIONS
        end

        # We have to overirde this one, as we want
        # arguments passed as is, and not duplicated
        def perform_with_capacity(args)
          worker = new
          worker.remove_failed_jobs

          bulk_perform_async(args) # rubocop:disable Scalability/BulkPerformWithContext
        end
      end

      def remaining_work_count(*args)
        0 # the cron worker is the only source of new jobs
      end

      def max_running_jobs
        self.class.max_running_jobs
      end

      def perform_work(database_name, migration_id)
        self.database_name = database_name

        return unless enabled?
        return if shares_db_config?

        Gitlab::Database::SharedModel.using_connection(base_model.connection) do
          self.migration = find_migration(migration_id)

          break unless migration

          try_obtain_lease do
            run_migration_job if executable_migration?
          end
        end
      end

      private

      attr_accessor :database_name, :migration

      def enabled?
        Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops)
      end

      def shares_db_config?
        Gitlab::Database.db_config_share_with(base_model.connection_db_config).present?
      end

      def base_model
        strong_memoize(:base_model) do
          Gitlab::Database.database_base_models[database_name]
        end
      end

      def find_migration(id)
        Gitlab::Database::BackgroundMigration::BatchedMigration.find_executable(id, connection: base_model.connection)
      end

      def lease_key
        @lease_key ||= [
          self.class.name.underscore,
          'database_name',
          database_name,
          'table_name',
          migration.table_name
        ].join(':')
      end

      def lease_timeout
        migration.interval * LEASE_TIMEOUT_MULTIPLIER
      end

      def executable_migration?
        migration.active? && migration.interval_elapsed?(variance: INTERVAL_VARIANCE)
      end

      def run_migration_job
        Gitlab::Database::BackgroundMigration::BatchedMigrationRunner.new(connection: base_model.connection)
          .run_migration_job(migration)
      end
    end
  end
end