diff options
Diffstat (limited to 'app/workers')
20 files changed, 463 insertions, 20 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 11bf797fb90..bdcb31b8d46 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -211,6 +211,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: cronjob:member_invitation_reminder_emails + :feature_category: :subgroups + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] - :name: cronjob:metrics_dashboard_schedule_annotations_prune :feature_category: :metrics :has_external_dependencies: @@ -723,6 +731,14 @@ :weight: 2 :idempotent: true :tags: [] +- :name: incident_management:incident_management_add_severity_system_note + :feature_category: :incident_management + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 2 + :idempotent: + :tags: [] - :name: incident_management:incident_management_pager_duty_process_incident :feature_category: :incident_management :has_external_dependencies: @@ -1324,6 +1340,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: design_management_copy_design_collection + :feature_category: :design_management + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: design_management_new_version :feature_category: :design_management :has_external_dependencies: @@ -1532,6 +1556,14 @@ :weight: 1 :idempotent: true :tags: [] +- :name: metrics_dashboard_sync_dashboards + :feature_category: :metrics + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: migrate_external_diffs :feature_category: :source_code_management :has_external_dependencies: @@ -1708,6 +1740,30 @@ :weight: 1 :idempotent: true :tags: [] +- :name: propagate_integration_group + :feature_category: :integrations + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] +- :name: propagate_integration_inherit + :feature_category: :integrations + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] +- :name: propagate_integration_project + :feature_category: :integrations + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: propagate_service_template :feature_category: :integrations :has_external_dependencies: diff --git a/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb b/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb index a9976c6e5cb..01bddfea7de 100644 --- a/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb +++ b/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb @@ -17,10 +17,9 @@ module Analytics return if Feature.disabled?(:store_instance_statistics_measurements, default_enabled: true) recorded_at = Time.zone.now - measurement_identifiers = Analytics::InstanceStatistics::Measurement.identifiers worker_arguments = Gitlab::Analytics::InstanceStatistics::WorkersArgumentBuilder.new( - measurement_identifiers: measurement_identifiers.values, + measurement_identifiers: ::Analytics::InstanceStatistics::Measurement.measurement_identifier_values, recorded_at: recorded_at ).execute diff --git a/app/workers/authorized_project_update/periodic_recalculate_worker.rb b/app/workers/authorized_project_update/periodic_recalculate_worker.rb index 0d1ad67d7bb..78ffdbca4d6 100644 --- a/app/workers/authorized_project_update/periodic_recalculate_worker.rb +++ b/app/workers/authorized_project_update/periodic_recalculate_worker.rb @@ -12,9 +12,7 @@ module AuthorizedProjectUpdate idempotent! def perform - if ::Feature.enabled?(:periodic_project_authorization_recalculation, default_enabled: true) - AuthorizedProjectUpdate::PeriodicRecalculateService.new.execute - end + AuthorizedProjectUpdate::PeriodicRecalculateService.new.execute end end end diff --git a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb index 336b1c5443e..9bd1ad2ed30 100644 --- a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb +++ b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb @@ -12,9 +12,7 @@ module AuthorizedProjectUpdate idempotent! def perform(start_user_id, end_user_id) - if ::Feature.enabled?(:periodic_project_authorization_recalculation, default_enabled: true) - AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute - end + AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute end end end diff --git a/app/workers/cleanup_container_repository_worker.rb b/app/workers/cleanup_container_repository_worker.rb index 4469ea8cff9..80cc296fff5 100644 --- a/app/workers/cleanup_container_repository_worker.rb +++ b/app/workers/cleanup_container_repository_worker.rb @@ -16,9 +16,17 @@ class CleanupContainerRepositoryWorker # rubocop:disable Scalability/IdempotentW return unless valid? - Projects::ContainerRepository::CleanupTagsService + if run_by_container_expiration_policy? + container_repository.start_expiration_policy! + end + + result = Projects::ContainerRepository::CleanupTagsService .new(project, current_user, params) .execute(container_repository) + + if run_by_container_expiration_policy? && result[:status] == :success + container_repository.reset_expiration_policy_started_at! + end end private @@ -30,7 +38,7 @@ class CleanupContainerRepositoryWorker # rubocop:disable Scalability/IdempotentW end def run_by_container_expiration_policy? - @params['container_expiration_policy'] && container_repository && project + @params['container_expiration_policy'] && container_repository.present? && project.present? end def project diff --git a/app/workers/concerns/limited_capacity/job_tracker.rb b/app/workers/concerns/limited_capacity/job_tracker.rb new file mode 100644 index 00000000000..96b6e1a2024 --- /dev/null +++ b/app/workers/concerns/limited_capacity/job_tracker.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true +module LimitedCapacity + class JobTracker # rubocop:disable Scalability/IdempotentWorker + include Gitlab::Utils::StrongMemoize + + def initialize(namespace) + @namespace = namespace + end + + def register(jid) + _added, @count = with_redis_pipeline do |redis| + register_job_keys(redis, jid) + get_job_count(redis) + end + end + + def remove(jid) + _removed, @count = with_redis_pipeline do |redis| + remove_job_keys(redis, jid) + get_job_count(redis) + end + end + + def clean_up + completed_jids = Gitlab::SidekiqStatus.completed_jids(running_jids) + return unless completed_jids.any? + + _removed, @count = with_redis_pipeline do |redis| + remove_job_keys(redis, completed_jids) + get_job_count(redis) + end + end + + def count + @count ||= with_redis { |redis| get_job_count(redis) } + end + + def running_jids + with_redis do |redis| + redis.smembers(counter_key) + end + end + + private + + attr_reader :namespace + + def counter_key + "worker:#{namespace.to_s.underscore}:running" + end + + def get_job_count(redis) + redis.scard(counter_key) + end + + def register_job_keys(redis, keys) + redis.sadd(counter_key, keys) + end + + def remove_job_keys(redis, keys) + redis.srem(counter_key, keys) + end + + def with_redis(&block) + Gitlab::Redis::Queues.with(&block) # rubocop: disable CodeReuse/ActiveRecord + end + + def with_redis_pipeline(&block) + with_redis do |redis| + redis.pipelined(&block) + end + end + end +end diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb new file mode 100644 index 00000000000..c0d6bfff2f5 --- /dev/null +++ b/app/workers/concerns/limited_capacity/worker.rb @@ -0,0 +1,164 @@ +# frozen_string_literal: true + +# Usage: +# +# Worker that performs the tasks: +# +# class DummyWorker +# include ApplicationWorker +# include LimitedCapacity::Worker +# +# # For each job that raises any error, a worker instance will be disabled +# # until the next schedule-run. +# # If you wish to get around this, exceptions must by handled by the implementer. +# # +# def perform_work(*args) +# end +# +# def remaining_work_count(*args) +# 5 +# end +# +# def max_running_jobs +# 25 +# end +# end +# +# Cron worker to fill the pool of regular workers: +# +# class ScheduleDummyCronWorker +# include ApplicationWorker +# include CronjobQueue +# +# def perform(*args) +# DummyWorker.perform_with_capacity(*args) +# end +# end +# + +module LimitedCapacity + module Worker + extend ActiveSupport::Concern + include Gitlab::Utils::StrongMemoize + + included do + # Disable Sidekiq retries, log the error, and send the job to the dead queue. + # This is done to have only one source that produces jobs and because the slot + # would be occupied by a job that will be performed in the distant future. + # We let the cron worker enqueue new jobs, this could be seen as our retry and + # back off mechanism because the job might fail again if executed immediately. + sidekiq_options retry: 0 + deduplicate :none + end + + class_methods do + def perform_with_capacity(*args) + worker = self.new + worker.remove_failed_jobs + worker.report_prometheus_metrics(*args) + required_jobs_count = worker.required_jobs_count(*args) + + arguments = Array.new(required_jobs_count) { args } + self.bulk_perform_async(arguments) # rubocop:disable Scalability/BulkPerformWithContext + end + end + + def perform(*args) + return unless has_capacity? + + job_tracker.register(jid) + perform_work(*args) + rescue => exception + raise + ensure + job_tracker.remove(jid) + report_prometheus_metrics + re_enqueue(*args) unless exception + end + + def perform_work(*args) + raise NotImplementedError + end + + def remaining_work_count(*args) + raise NotImplementedError + end + + def max_running_jobs + raise NotImplementedError + end + + def has_capacity? + remaining_capacity > 0 + end + + def remaining_capacity + [ + max_running_jobs - running_jobs_count - self.class.queue_size, + 0 + ].max + end + + def has_work?(*args) + remaining_work_count(*args) > 0 + end + + def remove_failed_jobs + job_tracker.clean_up + end + + def report_prometheus_metrics(*args) + running_jobs_gauge.set(prometheus_labels, running_jobs_count) + remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args)) + max_running_jobs_gauge.set(prometheus_labels, max_running_jobs) + end + + def required_jobs_count(*args) + [ + remaining_work_count(*args), + remaining_capacity + ].min + end + + private + + def running_jobs_count + job_tracker.count + end + + def job_tracker + strong_memoize(:job_tracker) do + JobTracker.new(self.class.name) + end + end + + def re_enqueue(*args) + return unless has_capacity? + return unless has_work?(*args) + + self.class.perform_async(*args) + end + + def running_jobs_gauge + strong_memoize(:running_jobs_gauge) do + Gitlab::Metrics.gauge(:limited_capacity_worker_running_jobs, 'Number of running jobs') + end + end + + def max_running_jobs_gauge + strong_memoize(:max_running_jobs_gauge) do + Gitlab::Metrics.gauge(:limited_capacity_worker_max_running_jobs, 'Maximum number of running jobs') + end + end + + def remaining_work_gauge + strong_memoize(:remaining_work_gauge) do + Gitlab::Metrics.gauge(:limited_capacity_worker_remaining_work_count, 'Number of jobs waiting to be enqueued') + end + end + + def prometheus_labels + { worker: self.class.name } + end + end +end diff --git a/app/workers/design_management/copy_design_collection_worker.rb b/app/workers/design_management/copy_design_collection_worker.rb new file mode 100644 index 00000000000..0a6e23fe9da --- /dev/null +++ b/app/workers/design_management/copy_design_collection_worker.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module DesignManagement + class CopyDesignCollectionWorker + include ApplicationWorker + + feature_category :design_management + idempotent! + urgency :low + + def perform(user_id, issue_id, target_issue_id) + user = User.find(user_id) + issue = Issue.find(issue_id) + target_issue = Issue.find(target_issue_id) + + response = DesignManagement::CopyDesignCollection::CopyService.new( + target_issue.project, + user, + issue: issue, + target_issue: target_issue + ).execute + + Gitlab::AppLogger.warn(response.message) if response.error? + end + end +end diff --git a/app/workers/design_management/new_version_worker.rb b/app/workers/design_management/new_version_worker.rb index 3634dcbcebd..4fbf2067be4 100644 --- a/app/workers/design_management/new_version_worker.rb +++ b/app/workers/design_management/new_version_worker.rb @@ -9,10 +9,10 @@ module DesignManagement # `GenerateImageVersionsService` resizing designs worker_resource_boundary :memory - def perform(version_id) + def perform(version_id, skip_system_notes = false) version = DesignManagement::Version.find(version_id) - add_system_note(version) + add_system_note(version) unless skip_system_notes generate_image_versions(version) rescue ActiveRecord::RecordNotFound => e Sidekiq.logger.warn(e) diff --git a/app/workers/git_garbage_collect_worker.rb b/app/workers/git_garbage_collect_worker.rb index b0307571448..e66bad3962f 100644 --- a/app/workers/git_garbage_collect_worker.rb +++ b/app/workers/git_garbage_collect_worker.rb @@ -91,10 +91,12 @@ class GitGarbageCollectWorker # rubocop:disable Scalability/IdempotentWorker end def cleanup_orphan_lfs_file_references(project) - return unless Feature.enabled?(:cleanup_lfs_during_gc, project) return if Gitlab::Database.read_only? # GitGarbageCollectWorker may be run on a Geo secondary ::Gitlab::Cleanup::OrphanLfsFileReferences.new(project, dry_run: false, logger: logger).run! + rescue => err + Gitlab::GitLogger.warn(message: "Cleaning up orphan LFS objects files failed", error: err.message) + Gitlab::ErrorTracking.track_and_raise_for_dev_exception(err) end def flush_ref_caches(project) diff --git a/app/workers/group_import_worker.rb b/app/workers/group_import_worker.rb index 36d81468d55..494d9a3e46f 100644 --- a/app/workers/group_import_worker.rb +++ b/app/workers/group_import_worker.rb @@ -9,7 +9,7 @@ class GroupImportWorker # rubocop:disable Scalability/IdempotentWorker def perform(user_id, group_id) current_user = User.find(user_id) group = Group.find(group_id) - group_import_state = group.import_state || group.build_import_state + group_import_state = group.import_state group_import_state.jid = self.jid group_import_state.start! diff --git a/app/workers/incident_management/add_severity_system_note_worker.rb b/app/workers/incident_management/add_severity_system_note_worker.rb new file mode 100644 index 00000000000..9f132531562 --- /dev/null +++ b/app/workers/incident_management/add_severity_system_note_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module IncidentManagement + class AddSeveritySystemNoteWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + queue_namespace :incident_management + feature_category :incident_management + + def perform(incident_id, user_id) + return if incident_id.blank? || user_id.blank? + + incident = Issue.with_issue_type(:incident).find_by_id(incident_id) + return unless incident + + user = User.find_by_id(user_id) + return unless user + + SystemNoteService.change_incident_severity(incident, user) + end + end +end diff --git a/app/workers/issue_placement_worker.rb b/app/workers/issue_placement_worker.rb index a8d59e9125c..5b547ab0c8d 100644 --- a/app/workers/issue_placement_worker.rb +++ b/app/workers/issue_placement_worker.rb @@ -36,14 +36,14 @@ class IssuePlacementWorker Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, project_id: project_id) IssueRebalancingWorker.perform_async(nil, project_id.presence || issue.project_id) end - # rubocop: enable CodeReuse/ActiveRecord def find_issue(issue_id, project_id) - return Issue.id_in(issue_id).first if issue_id + return Issue.id_in(issue_id).take if issue_id - project = Project.id_in(project_id).first + project = Project.id_in(project_id).take return unless project - project.issues.first + project.issues.take end + # rubocop: enable CodeReuse/ActiveRecord end diff --git a/app/workers/issue_rebalancing_worker.rb b/app/workers/issue_rebalancing_worker.rb index 032ba5534e6..a9ad66198f3 100644 --- a/app/workers/issue_rebalancing_worker.rb +++ b/app/workers/issue_rebalancing_worker.rb @@ -11,7 +11,8 @@ class IssueRebalancingWorker return if project_id.nil? project = Project.find(project_id) - issue = project.issues.first # All issues are equivalent as far as we are concerned + # All issues are equivalent as far as we are concerned + issue = project.issues.take # rubocop: disable CodeReuse/ActiveRecord IssueRebalancingService.new(issue).execute rescue ActiveRecord::RecordNotFound, IssueRebalancingService::TooManyIssues => e diff --git a/app/workers/member_invitation_reminder_emails_worker.rb b/app/workers/member_invitation_reminder_emails_worker.rb new file mode 100644 index 00000000000..69d7f8ac8f6 --- /dev/null +++ b/app/workers/member_invitation_reminder_emails_worker.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class MemberInvitationReminderEmailsWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + + feature_category :subgroups + urgency :low + + def perform + return unless Gitlab::Experimentation.enabled?(:invitation_reminders) + + # To keep this MR small, implementation will be done in another MR: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/42981/diffs?commit_id=8063606e0f83957b2dd38d660ee986f24dee6138 + end +end diff --git a/app/workers/metrics/dashboard/sync_dashboards_worker.rb b/app/workers/metrics/dashboard/sync_dashboards_worker.rb new file mode 100644 index 00000000000..7a124a33f9e --- /dev/null +++ b/app/workers/metrics/dashboard/sync_dashboards_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module Metrics + module Dashboard + class SyncDashboardsWorker + include ApplicationWorker + + feature_category :metrics + + idempotent! + + def perform(project_id) + project = Project.find(project_id) + dashboard_paths = ::Gitlab::Metrics::Dashboard::RepoDashboardFinder.list_dashboards(project) + + dashboard_paths.each do |dashboard_path| + ::Gitlab::Metrics::Dashboard::Importer.new(dashboard_path, project).execute! + end + end + end + end +end diff --git a/app/workers/propagate_integration_group_worker.rb b/app/workers/propagate_integration_group_worker.rb new file mode 100644 index 00000000000..e539c6d4719 --- /dev/null +++ b/app/workers/propagate_integration_group_worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +class PropagateIntegrationGroupWorker + include ApplicationWorker + + feature_category :integrations + idempotent! + + # rubocop: disable CodeReuse/ActiveRecord + def perform(integration_id, min_id, max_id) + integration = Service.find_by_id(integration_id) + return unless integration + + batch = Group.where(id: min_id..max_id).without_integration(integration) + + BulkCreateIntegrationService.new(integration, batch, 'group').execute + end + # rubocop: enable CodeReuse/ActiveRecord +end diff --git a/app/workers/propagate_integration_inherit_worker.rb b/app/workers/propagate_integration_inherit_worker.rb new file mode 100644 index 00000000000..ef3132202f6 --- /dev/null +++ b/app/workers/propagate_integration_inherit_worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +class PropagateIntegrationInheritWorker + include ApplicationWorker + + feature_category :integrations + idempotent! + + # rubocop: disable CodeReuse/ActiveRecord + def perform(integration_id, min_id, max_id) + integration = Service.find_by_id(integration_id) + return unless integration + + services = Service.where(id: min_id..max_id).by_type(integration.type).inherit_from_id(integration.id) + + BulkUpdateIntegrationService.new(integration, services).execute + end + # rubocop: enable CodeReuse/ActiveRecord +end diff --git a/app/workers/propagate_integration_project_worker.rb b/app/workers/propagate_integration_project_worker.rb new file mode 100644 index 00000000000..c1e286b24fc --- /dev/null +++ b/app/workers/propagate_integration_project_worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +class PropagateIntegrationProjectWorker + include ApplicationWorker + + feature_category :integrations + idempotent! + + # rubocop: disable CodeReuse/ActiveRecord + def perform(integration_id, min_id, max_id) + integration = Service.find_by_id(integration_id) + return unless integration + + batch = Project.where(id: min_id..max_id).without_integration(integration) + + BulkCreateIntegrationService.new(integration, batch, 'project').execute + end + # rubocop: enable CodeReuse/ActiveRecord +end diff --git a/app/workers/propagate_integration_worker.rb b/app/workers/propagate_integration_worker.rb index 68e38386372..bb954b12a25 100644 --- a/app/workers/propagate_integration_worker.rb +++ b/app/workers/propagate_integration_worker.rb @@ -7,7 +7,8 @@ class PropagateIntegrationWorker idempotent! loggable_arguments 1 - # Keep overwrite parameter for backwards compatibility. + # TODO: Keep overwrite parameter for backwards compatibility. Remove after >= 14.0 + # https://gitlab.com/gitlab-org/gitlab/-/issues/255382 def perform(integration_id, overwrite = nil) Admin::PropagateIntegrationService.propagate(Service.find(integration_id)) end |