summaryrefslogtreecommitdiff
path: root/app/workers/container_registry/migration/enqueuer_worker.rb
blob: 5feaba870e6a3144a6714a275b0c8ed73c2cf011 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
# frozen_string_literal: true

module ContainerRegistry
  module Migration
    class EnqueuerWorker
      include ApplicationWorker
      include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
      include Gitlab::Utils::StrongMemoize

      data_consistency :always
      feature_category :container_registry
      urgency :low
      deduplicate :until_executing, including_scheduled: true
      idempotent!

      def perform
        return unless migration.enabled?
        return unless below_capacity?
        return unless waiting_time_passed?

        re_enqueue_if_capacity if handle_aborted_migration || handle_next_migration
      rescue StandardError => e
        Gitlab::ErrorTracking.log_exception(
          e,
          next_repository_id: next_repository&.id,
          next_aborted_repository_id: next_aborted_repository&.id
        )

        next_repository&.abort_import
      end

      private

      def handle_aborted_migration
        return unless next_aborted_repository&.retry_aborted_migration

        log_extra_metadata_on_done(:container_repository_id, next_aborted_repository.id)
        log_extra_metadata_on_done(:import_type, 'retry')

        true
      end

      def handle_next_migration
        return unless next_repository
        # We return true because the repository was successfully processed (migration_state is changed)
        return true if tag_count_too_high?
        return unless next_repository.start_pre_import

        log_extra_metadata_on_done(:container_repository_id, next_repository.id)
        log_extra_metadata_on_done(:import_type, 'next')

        true
      end

      def tag_count_too_high?
        return false unless next_repository.tags_count > migration.max_tags_count

        next_repository.skip_import(reason: :too_many_tags)

        true
      end

      def below_capacity?
        current_capacity <= maximum_capacity
      end

      def waiting_time_passed?
        delay = migration.enqueue_waiting_time
        return true if delay == 0
        return true unless last_step_completed_repository

        last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay
      end

      def current_capacity
        strong_memoize(:current_capacity) do
          ContainerRepository.with_migration_states(
            %w[pre_importing pre_import_done importing]
          ).count
        end
      end

      def maximum_capacity
        migration.capacity
      end

      def next_repository
        strong_memoize(:next_repository) do
          ContainerRepository.ready_for_import.take # rubocop:disable CodeReuse/ActiveRecord
        end
      end

      def next_aborted_repository
        strong_memoize(:next_aborted_repository) do
          ContainerRepository.with_migration_state('import_aborted').take # rubocop:disable CodeReuse/ActiveRecord
        end
      end

      def last_step_completed_repository
        strong_memoize(:last_step_completed_repository) do
          ContainerRepository.recently_done_migration_step.first
        end
      end

      def migration
        ::ContainerRegistry::Migration
      end

      def re_enqueue_if_capacity
        return unless current_capacity < maximum_capacity

        self.class.perform_async
      end
    end
  end
end