summaryrefslogtreecommitdiff
path: root/app/workers/container_registry/migration/enqueuer_worker.rb
blob: 1dd29eff86e25083115c6e7e3e7c58b19a75ad42 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
# frozen_string_literal: true

module ContainerRegistry
  module Migration
    class EnqueuerWorker
      include ApplicationWorker
      include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
      include Gitlab::Utils::StrongMemoize
      include ExclusiveLeaseGuard

      DEFAULT_LEASE_TIMEOUT = 30.minutes.to_i.freeze

      data_consistency :always
      feature_category :container_registry
      urgency :low
      deduplicate :until_executing, ttl: DEFAULT_LEASE_TIMEOUT
      idempotent!

      def perform
        try_obtain_lease do
          while runnable? && Time.zone.now < loop_deadline
            repository_handled = handle_aborted_migration || handle_next_migration

            # no repository was found: stop the loop
            break unless repository_handled

            # we're going for another iteration so we need to clear memoization
            clear_memoization(:next_repository)
            clear_memoization(:next_aborted_repository)
            clear_memoization(:last_step_completed_repository)
          end
        end
      end

      def self.enqueue_a_job
        perform_async
      end

      private

      def handle_aborted_migration
        return unless next_aborted_repository

        next_aborted_repository.retry_aborted_migration

        true
      rescue StandardError => e
        Gitlab::ErrorTracking.log_exception(e, next_aborted_repository_id: next_aborted_repository&.id)

        false
      ensure
        log_repository_info(next_aborted_repository, import_type: 'retry')
      end

      def handle_next_migration
        return unless next_repository

        # We return true because the repository was successfully processed (migration_state is changed)
        return true if tag_count_too_high?
        return unless next_repository.start_pre_import

        true
      rescue StandardError => e
        Gitlab::ErrorTracking.log_exception(e, next_repository_id: next_repository&.id)
        next_repository&.abort_import

        false
      ensure
        log_repository_info(next_repository, import_type: 'next')
      end

      def tag_count_too_high?
        return false if migration.max_tags_count == 0
        return false unless next_repository.tags_count > migration.max_tags_count

        next_repository.skip_import(reason: :too_many_tags)

        true
      end

      def below_capacity?
        current_capacity < maximum_capacity
      end

      def waiting_time_passed?
        delay = migration.enqueue_waiting_time
        return true if delay == 0
        return true unless last_step_completed_repository&.last_import_step_done_at

        last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay
      end

      def runnable?
        unless migration.enabled?
          log_extra_metadata_on_done(:migration_enabled, false)
          return false
        end

        unless below_capacity?
          log_extra_metadata_on_done(:max_capacity_setting, maximum_capacity)
          log_extra_metadata_on_done(:below_capacity, false)

          return false
        end

        unless waiting_time_passed?
          log_extra_metadata_on_done(:waiting_time_passed, false)
          log_extra_metadata_on_done(:current_waiting_time_setting, migration.enqueue_waiting_time)

          return false
        end

        true
      end

      def current_capacity
        ContainerRepository.with_migration_states(
          %w[pre_importing pre_import_done importing]
        ).count
      end

      def maximum_capacity
        migration.capacity
      end

      def next_repository
        strong_memoize(:next_repository) do
          # Using .limit(25)[0] instead of take here. Using a LIMIT 1 and 2 caused the query planner to
          # use an inefficient sequential scan instead of picking an index. LIMIT 25 works around
          # this issue.
          # See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/87733 and
          # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/90735 for details.
          ContainerRepository.ready_for_import.limit(25)[0] # rubocop:disable CodeReuse/ActiveRecord
        end
      end

      def next_aborted_repository
        strong_memoize(:next_aborted_repository) do
          ContainerRepository.with_migration_state('import_aborted').limit(25)[0] # rubocop:disable CodeReuse/ActiveRecord
        end
      end

      def last_step_completed_repository
        strong_memoize(:last_step_completed_repository) do
          ContainerRepository.recently_done_migration_step.first
        end
      end

      def migration
        ::ContainerRegistry::Migration
      end

      def re_enqueue_if_capacity
        return unless below_capacity?

        self.class.enqueue_a_job
      end

      def log_info(extras)
        logger.info(structured_payload(extras))
      end

      def log_repository_info(repository, extras = {})
        return unless repository

        repository_info = {
          container_repository_id: repository.id,
          container_repository_path: repository.path,
          container_repository_migration_state: repository.migration_state
        }

        if repository.import_skipped?
          repository_info[:container_repository_migration_skipped_reason] = repository.migration_skipped_reason
        end

        log_info(extras.merge(repository_info))
      end

      def loop_deadline
        strong_memoize(:loop_deadline) do
          250.seconds.from_now
        end
      end

      # used by ExclusiveLeaseGuard
      def lease_key
        'container_registry:migration:enqueuer_worker'
      end

      # used by ExclusiveLeaseGuard
      def lease_timeout
        DEFAULT_LEASE_TIMEOUT
      end
    end
  end
end