summaryrefslogtreecommitdiff
path: root/app/workers/container_registry/migration/enqueuer_worker.rb
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/container_registry/migration/enqueuer_worker.rb')
-rw-r--r--app/workers/container_registry/migration/enqueuer_worker.rb107
1 files changed, 82 insertions, 25 deletions
diff --git a/app/workers/container_registry/migration/enqueuer_worker.rb b/app/workers/container_registry/migration/enqueuer_worker.rb
index 5feaba870e6..8705deb0cb2 100644
--- a/app/workers/container_registry/migration/enqueuer_worker.rb
+++ b/app/workers/container_registry/migration/enqueuer_worker.rb
@@ -6,6 +6,9 @@ module ContainerRegistry
include ApplicationWorker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
include Gitlab::Utils::StrongMemoize
+ include ExclusiveLeaseGuard
+
+ DEFAULT_LEASE_TIMEOUT = 30.minutes.to_i.freeze
data_consistency :always
feature_category :container_registry
@@ -14,70 +17,103 @@ module ContainerRegistry
idempotent!
def perform
- return unless migration.enabled?
- return unless below_capacity?
- return unless waiting_time_passed?
+ re_enqueue = false
+ try_obtain_lease do
+ break unless runnable?
- re_enqueue_if_capacity if handle_aborted_migration || handle_next_migration
- rescue StandardError => e
- Gitlab::ErrorTracking.log_exception(
- e,
- next_repository_id: next_repository&.id,
- next_aborted_repository_id: next_aborted_repository&.id
- )
-
- next_repository&.abort_import
+ re_enqueue = handle_aborted_migration || handle_next_migration
+ end
+ re_enqueue_if_capacity if re_enqueue
end
private
def handle_aborted_migration
- return unless next_aborted_repository&.retry_aborted_migration
+ return unless next_aborted_repository
- log_extra_metadata_on_done(:container_repository_id, next_aborted_repository.id)
log_extra_metadata_on_done(:import_type, 'retry')
+ log_repository(next_aborted_repository)
+
+ next_aborted_repository.retry_aborted_migration
+
+ true
+ rescue StandardError => e
+ Gitlab::ErrorTracking.log_exception(e, next_aborted_repository_id: next_aborted_repository&.id)
true
+ ensure
+ log_repository_migration_state(next_aborted_repository)
end
def handle_next_migration
return unless next_repository
+
+ log_extra_metadata_on_done(:import_type, 'next')
+ log_repository(next_repository)
+
# We return true because the repository was successfully processed (migration_state is changed)
return true if tag_count_too_high?
return unless next_repository.start_pre_import
- log_extra_metadata_on_done(:container_repository_id, next_repository.id)
- log_extra_metadata_on_done(:import_type, 'next')
-
true
+ rescue StandardError => e
+ Gitlab::ErrorTracking.log_exception(e, next_repository_id: next_repository&.id)
+ next_repository&.abort_import
+
+ false
+ ensure
+ log_repository_migration_state(next_repository)
end
def tag_count_too_high?
return false unless next_repository.tags_count > migration.max_tags_count
next_repository.skip_import(reason: :too_many_tags)
+ log_extra_metadata_on_done(:tags_count_too_high, true)
+ log_extra_metadata_on_done(:max_tags_count_setting, migration.max_tags_count)
true
end
def below_capacity?
- current_capacity <= maximum_capacity
+ current_capacity < maximum_capacity
end
def waiting_time_passed?
delay = migration.enqueue_waiting_time
return true if delay == 0
- return true unless last_step_completed_repository
+ return true unless last_step_completed_repository&.last_import_step_done_at
last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay
end
- def current_capacity
- strong_memoize(:current_capacity) do
- ContainerRepository.with_migration_states(
- %w[pre_importing pre_import_done importing]
- ).count
+ def runnable?
+ unless migration.enabled?
+ log_extra_metadata_on_done(:migration_enabled, false)
+ return false
+ end
+
+ unless below_capacity?
+ log_extra_metadata_on_done(:max_capacity_setting, maximum_capacity)
+ log_extra_metadata_on_done(:below_capacity, false)
+
+ return false
+ end
+
+ unless waiting_time_passed?
+ log_extra_metadata_on_done(:waiting_time_passed, false)
+ log_extra_metadata_on_done(:current_waiting_time_setting, migration.enqueue_waiting_time)
+
+ return false
end
+
+ true
+ end
+
+ def current_capacity
+ ContainerRepository.with_migration_states(
+ %w[pre_importing pre_import_done importing]
+ ).count
end
def maximum_capacity
@@ -107,10 +143,31 @@ module ContainerRegistry
end
def re_enqueue_if_capacity
- return unless current_capacity < maximum_capacity
+ return unless below_capacity?
self.class.perform_async
end
+
+ def log_repository(repository)
+ log_extra_metadata_on_done(:container_repository_id, repository&.id)
+ log_extra_metadata_on_done(:container_repository_path, repository&.path)
+ end
+
+ def log_repository_migration_state(repository)
+ return unless repository
+
+ log_extra_metadata_on_done(:container_repository_migration_state, repository.migration_state)
+ end
+
+ # used by ExclusiveLeaseGuard
+ def lease_key
+ 'container_registry:migration:enqueuer_worker'
+ end
+
+ # used by ExclusiveLeaseGuard
+ def lease_timeout
+ DEFAULT_LEASE_TIMEOUT
+ end
end
end
end