summaryrefslogtreecommitdiff
path: root/app/workers/container_registry/migration/guard_worker.rb
blob: 1111061a89b64071b67d5fed3cda542fd318d82a (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
# frozen_string_literal: true

module ContainerRegistry
  module Migration
    class GuardWorker
      include ApplicationWorker
      # This is a general worker with no context.
      # It is not scoped to a project, user or group.
      # We don't have a context.
      include CronjobQueue # rubocop:disable Scalability/CronWorkerContext

      data_consistency :always
      feature_category :container_registry
      urgency :low
      worker_resource_boundary :unknown
      deduplicate :until_executed, ttl: 5.minutes
      idempotent!

      def perform
        return unless Gitlab.com?

        repositories = ::ContainerRepository.with_stale_migration(step_before_timestamp)
                                            .limit(max_capacity)
        aborts_count = 0
        long_running_migration_ids = []

        # the #to_a is safe as the amount of entries is limited.
        # In addition, we're calling #each in the next line and we don't want two different SQL queries for these two lines
        log_extra_metadata_on_done(:stale_migrations_count, repositories.to_a.size)

        repositories.each do |repository|
          if actively_importing?(repository)
            # if a repository is actively importing but not yet long_running, do nothing
            if long_running_migration?(repository)
              long_running_migration_ids << repository.id
              cancel_long_running_migration(repository)
              aborts_count += 1
            end
          else
            repository.abort_import
            aborts_count += 1
          end
        end

        log_extra_metadata_on_done(:aborted_stale_migrations_count, aborts_count)

        if long_running_migration_ids.any?
          log_extra_metadata_on_done(:aborted_long_running_migration_ids, long_running_migration_ids)
        end
      end

      private

      # A repository is actively_importing if it has an importing migration state
      # and that state matches the state in the registry
      # TODO We can have an API call n+1 situation here. It can be solved when the
      # endpoint accepts multiple repository paths at once. This is issue
      # https://gitlab.com/gitlab-org/container-registry/-/issues/582
      def actively_importing?(repository)
        return false unless repository.importing? || repository.pre_importing?
        return false unless external_state_matches_migration_state?(repository)

        true
      end

      def long_running_migration?(repository)
        timeout = long_running_migration_threshold

        if Feature.enabled?(:registry_migration_guard_thresholds)
          timeout = if repository.migration_state == 'pre_importing'
                      migration.pre_import_timeout.seconds
                    else
                      migration.import_timeout.seconds
                    end
        end

        migration_start_timestamp(repository).before?(timeout.ago)
      end

      def external_state_matches_migration_state?(repository)
        status = repository.external_import_status

        (status == 'pre_import_in_progress' && repository.pre_importing?) ||
          (status == 'import_in_progress' && repository.importing?)
      end

      def migration_start_timestamp(repository)
        if repository.pre_importing?
          repository.migration_pre_import_started_at
        else
          repository.migration_import_started_at
        end
      end

      def step_before_timestamp
        migration.max_step_duration.seconds.ago
      end

      def max_capacity
        # doubling the actual capacity to prevent issues in case the capacity
        # is not properly applied
        migration.capacity * 2
      end

      def migration
        ::ContainerRegistry::Migration
      end

      def long_running_migration_threshold
        @threshold ||= 10.minutes
      end

      def cancel_long_running_migration(repository)
        result = repository.migration_cancel

        case result[:status]
        when :ok
          if repository.nearing_or_exceeded_retry_limit?
            repository.skip_import(reason: :migration_canceled)
          else
            repository.abort_import
          end
        when :bad_request
          repository.reconcile_import_status(result[:state]) do
            repository.abort_import
          end
        else
          repository.abort_import
        end
      end
    end
  end
end