summaryrefslogtreecommitdiff
path: root/app/workers/container_registry/migration/enqueuer_worker.rb
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/container_registry/migration/enqueuer_worker.rb')
-rw-r--r--app/workers/container_registry/migration/enqueuer_worker.rb116
1 files changed, 116 insertions, 0 deletions
diff --git a/app/workers/container_registry/migration/enqueuer_worker.rb b/app/workers/container_registry/migration/enqueuer_worker.rb
new file mode 100644
index 00000000000..5feaba870e6
--- /dev/null
+++ b/app/workers/container_registry/migration/enqueuer_worker.rb
@@ -0,0 +1,116 @@
+# frozen_string_literal: true
+
+module ContainerRegistry
+ module Migration
+ class EnqueuerWorker
+ include ApplicationWorker
+ include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+ include Gitlab::Utils::StrongMemoize
+
+ data_consistency :always
+ feature_category :container_registry
+ urgency :low
+ deduplicate :until_executing, including_scheduled: true
+ idempotent!
+
+ def perform
+ return unless migration.enabled?
+ return unless below_capacity?
+ return unless waiting_time_passed?
+
+ re_enqueue_if_capacity if handle_aborted_migration || handle_next_migration
+ rescue StandardError => e
+ Gitlab::ErrorTracking.log_exception(
+ e,
+ next_repository_id: next_repository&.id,
+ next_aborted_repository_id: next_aborted_repository&.id
+ )
+
+ next_repository&.abort_import
+ end
+
+ private
+
+ def handle_aborted_migration
+ return unless next_aborted_repository&.retry_aborted_migration
+
+ log_extra_metadata_on_done(:container_repository_id, next_aborted_repository.id)
+ log_extra_metadata_on_done(:import_type, 'retry')
+
+ true
+ end
+
+ def handle_next_migration
+ return unless next_repository
+ # We return true because the repository was successfully processed (migration_state is changed)
+ return true if tag_count_too_high?
+ return unless next_repository.start_pre_import
+
+ log_extra_metadata_on_done(:container_repository_id, next_repository.id)
+ log_extra_metadata_on_done(:import_type, 'next')
+
+ true
+ end
+
+ def tag_count_too_high?
+ return false unless next_repository.tags_count > migration.max_tags_count
+
+ next_repository.skip_import(reason: :too_many_tags)
+
+ true
+ end
+
+ def below_capacity?
+ current_capacity <= maximum_capacity
+ end
+
+ def waiting_time_passed?
+ delay = migration.enqueue_waiting_time
+ return true if delay == 0
+ return true unless last_step_completed_repository
+
+ last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay
+ end
+
+ def current_capacity
+ strong_memoize(:current_capacity) do
+ ContainerRepository.with_migration_states(
+ %w[pre_importing pre_import_done importing]
+ ).count
+ end
+ end
+
+ def maximum_capacity
+ migration.capacity
+ end
+
+ def next_repository
+ strong_memoize(:next_repository) do
+ ContainerRepository.ready_for_import.take # rubocop:disable CodeReuse/ActiveRecord
+ end
+ end
+
+ def next_aborted_repository
+ strong_memoize(:next_aborted_repository) do
+ ContainerRepository.with_migration_state('import_aborted').take # rubocop:disable CodeReuse/ActiveRecord
+ end
+ end
+
+ def last_step_completed_repository
+ strong_memoize(:last_step_completed_repository) do
+ ContainerRepository.recently_done_migration_step.first
+ end
+ end
+
+ def migration
+ ::ContainerRegistry::Migration
+ end
+
+ def re_enqueue_if_capacity
+ return unless current_capacity < maximum_capacity
+
+ self.class.perform_async
+ end
+ end
+ end
+end