diff options
Diffstat (limited to 'app/workers/container_registry/migration/enqueuer_worker.rb')
-rw-r--r-- | app/workers/container_registry/migration/enqueuer_worker.rb | 107 |
1 files changed, 82 insertions, 25 deletions
diff --git a/app/workers/container_registry/migration/enqueuer_worker.rb b/app/workers/container_registry/migration/enqueuer_worker.rb index 5feaba870e6..8705deb0cb2 100644 --- a/app/workers/container_registry/migration/enqueuer_worker.rb +++ b/app/workers/container_registry/migration/enqueuer_worker.rb @@ -6,6 +6,9 @@ module ContainerRegistry include ApplicationWorker include CronjobQueue # rubocop:disable Scalability/CronWorkerContext include Gitlab::Utils::StrongMemoize + include ExclusiveLeaseGuard + + DEFAULT_LEASE_TIMEOUT = 30.minutes.to_i.freeze data_consistency :always feature_category :container_registry @@ -14,70 +17,103 @@ module ContainerRegistry idempotent! def perform - return unless migration.enabled? - return unless below_capacity? - return unless waiting_time_passed? + re_enqueue = false + try_obtain_lease do + break unless runnable? - re_enqueue_if_capacity if handle_aborted_migration || handle_next_migration - rescue StandardError => e - Gitlab::ErrorTracking.log_exception( - e, - next_repository_id: next_repository&.id, - next_aborted_repository_id: next_aborted_repository&.id - ) - - next_repository&.abort_import + re_enqueue = handle_aborted_migration || handle_next_migration + end + re_enqueue_if_capacity if re_enqueue end private def handle_aborted_migration - return unless next_aborted_repository&.retry_aborted_migration + return unless next_aborted_repository - log_extra_metadata_on_done(:container_repository_id, next_aborted_repository.id) log_extra_metadata_on_done(:import_type, 'retry') + log_repository(next_aborted_repository) + + next_aborted_repository.retry_aborted_migration + + true + rescue StandardError => e + Gitlab::ErrorTracking.log_exception(e, next_aborted_repository_id: next_aborted_repository&.id) true + ensure + log_repository_migration_state(next_aborted_repository) end def handle_next_migration return unless next_repository + + log_extra_metadata_on_done(:import_type, 'next') + log_repository(next_repository) + # We return true because the repository was successfully processed (migration_state is changed) return true if tag_count_too_high? return unless next_repository.start_pre_import - log_extra_metadata_on_done(:container_repository_id, next_repository.id) - log_extra_metadata_on_done(:import_type, 'next') - true + rescue StandardError => e + Gitlab::ErrorTracking.log_exception(e, next_repository_id: next_repository&.id) + next_repository&.abort_import + + false + ensure + log_repository_migration_state(next_repository) end def tag_count_too_high? return false unless next_repository.tags_count > migration.max_tags_count next_repository.skip_import(reason: :too_many_tags) + log_extra_metadata_on_done(:tags_count_too_high, true) + log_extra_metadata_on_done(:max_tags_count_setting, migration.max_tags_count) true end def below_capacity? - current_capacity <= maximum_capacity + current_capacity < maximum_capacity end def waiting_time_passed? delay = migration.enqueue_waiting_time return true if delay == 0 - return true unless last_step_completed_repository + return true unless last_step_completed_repository&.last_import_step_done_at last_step_completed_repository.last_import_step_done_at < Time.zone.now - delay end - def current_capacity - strong_memoize(:current_capacity) do - ContainerRepository.with_migration_states( - %w[pre_importing pre_import_done importing] - ).count + def runnable? + unless migration.enabled? + log_extra_metadata_on_done(:migration_enabled, false) + return false + end + + unless below_capacity? + log_extra_metadata_on_done(:max_capacity_setting, maximum_capacity) + log_extra_metadata_on_done(:below_capacity, false) + + return false + end + + unless waiting_time_passed? + log_extra_metadata_on_done(:waiting_time_passed, false) + log_extra_metadata_on_done(:current_waiting_time_setting, migration.enqueue_waiting_time) + + return false end + + true + end + + def current_capacity + ContainerRepository.with_migration_states( + %w[pre_importing pre_import_done importing] + ).count end def maximum_capacity @@ -107,10 +143,31 @@ module ContainerRegistry end def re_enqueue_if_capacity - return unless current_capacity < maximum_capacity + return unless below_capacity? self.class.perform_async end + + def log_repository(repository) + log_extra_metadata_on_done(:container_repository_id, repository&.id) + log_extra_metadata_on_done(:container_repository_path, repository&.path) + end + + def log_repository_migration_state(repository) + return unless repository + + log_extra_metadata_on_done(:container_repository_migration_state, repository.migration_state) + end + + # used by ExclusiveLeaseGuard + def lease_key + 'container_registry:migration:enqueuer_worker' + end + + # used by ExclusiveLeaseGuard + def lease_timeout + DEFAULT_LEASE_TIMEOUT + end end end end |