diff options
Diffstat (limited to 'app/workers')
46 files changed, 393 insertions, 627 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index b216c2bff28..31c590183d1 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -21,6 +21,24 @@ :weight: 1 :idempotent: true :tags: [] +- :name: authorized_project_update:authorized_project_update_project_recalculate + :worker_name: AuthorizedProjectUpdate::ProjectRecalculateWorker + :feature_category: :authentication_and_authorization + :has_external_dependencies: + :urgency: :high + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] +- :name: authorized_project_update:authorized_project_update_user_refresh_from_replica + :worker_name: AuthorizedProjectUpdate::UserRefreshFromReplicaWorker + :feature_category: :authentication_and_authorization + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: authorized_project_update:authorized_project_update_user_refresh_over_user_range :worker_name: AuthorizedProjectUpdate::UserRefreshOverUserRangeWorker :feature_category: :authentication_and_authorization @@ -144,16 +162,6 @@ :weight: 1 :idempotent: :tags: [] -- :name: cronjob:analytics_instance_statistics_count_job_trigger - :worker_name: Analytics::InstanceStatistics::CountJobTriggerWorker - :feature_category: :devops_reports - :has_external_dependencies: - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: - - :exclude_from_kubernetes - :name: cronjob:analytics_usage_trends_count_job_trigger :worker_name: Analytics::UsageTrends::CountJobTriggerWorker :feature_category: :devops_reports @@ -423,15 +431,6 @@ :weight: 1 :idempotent: :tags: [] -- :name: cronjob:prune_web_hook_logs - :worker_name: PruneWebHookLogsWorker - :feature_category: :integrations - :has_external_dependencies: - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: - :tags: [] - :name: cronjob:releases_manage_evidence :worker_name: Releases::ManageEvidenceWorker :feature_category: :release_evidence @@ -477,7 +476,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: + :idempotent: true :tags: [] - :name: cronjob:repository_archive_cache :worker_name: RepositoryArchiveCacheWorker @@ -647,15 +646,6 @@ :idempotent: :tags: - :exclude_from_kubernetes -- :name: deployment:deployments_execute_hooks - :worker_name: Deployments::ExecuteHooksWorker - :feature_category: :continuous_delivery - :has_external_dependencies: - :urgency: :low - :resource_boundary: :cpu - :weight: 3 - :idempotent: - :tags: [] - :name: deployment:deployments_finished :worker_name: Deployments::FinishedWorker :feature_category: :continuous_delivery @@ -827,15 +817,6 @@ :weight: 1 :idempotent: :tags: [] -- :name: gcp_cluster:clusters_cleanup_app - :worker_name: Clusters::Cleanup::AppWorker - :feature_category: :kubernetes_management - :has_external_dependencies: true - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: - :tags: [] - :name: gcp_cluster:clusters_cleanup_project_namespace :worker_name: Clusters::Cleanup::ProjectNamespaceWorker :feature_category: :kubernetes_management @@ -1088,15 +1069,6 @@ :weight: 2 :idempotent: :tags: [] -- :name: incident_management:incident_management_process_alert - :worker_name: IncidentManagement::ProcessAlertWorker - :feature_category: :incident_management - :has_external_dependencies: - :urgency: :low - :resource_boundary: :unknown - :weight: 2 - :idempotent: - :tags: [] - :name: incident_management:incident_management_process_alert_worker_v2 :worker_name: IncidentManagement::ProcessAlertWorkerV2 :feature_category: :incident_management @@ -1106,15 +1078,6 @@ :weight: 2 :idempotent: true :tags: [] -- :name: incident_management:incident_management_process_prometheus_alert - :worker_name: IncidentManagement::ProcessPrometheusAlertWorker - :feature_category: :incident_management - :has_external_dependencies: - :urgency: :low - :resource_boundary: :cpu - :weight: 2 - :idempotent: - :tags: [] - :name: jira_connect:jira_connect_sync_branch :worker_name: JiraConnect::SyncBranchWorker :feature_category: :integrations @@ -1317,6 +1280,15 @@ :weight: 1 :idempotent: :tags: [] +- :name: package_repositories:packages_debian_generate_distribution + :worker_name: Packages::Debian::GenerateDistributionWorker + :feature_category: :package_registry + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: package_repositories:packages_debian_process_changes :worker_name: Packages::Debian::ProcessChangesWorker :feature_category: :package_registry @@ -1401,8 +1373,7 @@ :resource_boundary: :unknown :weight: 1 :idempotent: true - :tags: - - :exclude_from_kubernetes + :tags: [] - :name: pipeline_background:ci_pipeline_artifacts_create_quality_report :worker_name: Ci::PipelineArtifacts::CreateQualityReportWorker :feature_category: :code_testing @@ -1457,7 +1428,7 @@ :urgency: :high :resource_boundary: :cpu :weight: 3 - :idempotent: true + :idempotent: :tags: [] - :name: pipeline_creation:create_pipeline :worker_name: CreatePipelineWorker @@ -1564,7 +1535,7 @@ :worker_name: PipelineHooksWorker :feature_category: :continuous_integration :has_external_dependencies: - :urgency: :high + :urgency: :low :resource_boundary: :cpu :weight: 2 :idempotent: @@ -1639,15 +1610,6 @@ :urgency: :high :resource_boundary: :unknown :weight: 5 - :idempotent: - :tags: [] -- :name: pipeline_processing:pipeline_update - :worker_name: PipelineUpdateWorker - :feature_category: :continuous_integration - :has_external_dependencies: - :urgency: :high - :resource_boundary: :unknown - :weight: 5 :idempotent: true :tags: [] - :name: pipeline_processing:stage_update @@ -1777,16 +1739,6 @@ :weight: 1 :idempotent: true :tags: [] -- :name: analytics_instance_statistics_counter_job - :worker_name: Analytics::InstanceStatistics::CounterJobWorker - :feature_category: :devops_reports - :has_external_dependencies: - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: - - :exclude_from_kubernetes - :name: analytics_usage_trends_counter_job :worker_name: Analytics::UsageTrends::CounterJobWorker :feature_category: :devops_reports @@ -2116,15 +2068,6 @@ :idempotent: true :tags: - :exclude_from_kubernetes -- :name: git_garbage_collect - :worker_name: GitGarbageCollectWorker - :feature_category: :gitaly - :has_external_dependencies: - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: - :tags: [] - :name: github_import_advance_stage :worker_name: Gitlab::GithubImport::AdvanceStageWorker :feature_category: :importers @@ -2292,15 +2235,6 @@ :weight: 1 :idempotent: true :tags: [] -- :name: merge_requests_assignees_change - :worker_name: MergeRequests::AssigneesChangeWorker - :feature_category: :source_code_management - :has_external_dependencies: - :urgency: :high - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: merge_requests_delete_source_branch :worker_name: MergeRequests::DeleteSourceBranchWorker :feature_category: :source_code_management @@ -2570,15 +2504,6 @@ :weight: 1 :idempotent: :tags: [] -- :name: project_schedule_bulk_repository_shard_moves - :worker_name: ProjectScheduleBulkRepositoryShardMovesWorker - :feature_category: :gitaly - :has_external_dependencies: - :urgency: :throttled - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: project_service :worker_name: ProjectServiceWorker :feature_category: :integrations @@ -2588,15 +2513,6 @@ :weight: 1 :idempotent: :tags: [] -- :name: project_update_repository_storage - :worker_name: ProjectUpdateRepositoryStorageWorker - :feature_category: :gitaly - :has_external_dependencies: - :urgency: :throttled - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: projects_git_garbage_collect :worker_name: Projects::GitGarbageCollectWorker :feature_category: :gitaly @@ -2811,24 +2727,6 @@ :weight: 1 :idempotent: :tags: [] -- :name: snippet_schedule_bulk_repository_shard_moves - :worker_name: SnippetScheduleBulkRepositoryShardMovesWorker - :feature_category: :gitaly - :has_external_dependencies: - :urgency: :throttled - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] -- :name: snippet_update_repository_storage - :worker_name: SnippetUpdateRepositoryStorageWorker - :feature_category: :gitaly - :has_external_dependencies: - :urgency: :throttled - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: [] - :name: snippets_schedule_bulk_repository_shard_moves :worker_name: Snippets::ScheduleBulkRepositoryShardMovesWorker :feature_category: :gitaly @@ -2901,16 +2799,6 @@ :weight: 1 :idempotent: :tags: [] -- :name: users_update_open_issue_count - :worker_name: Users::UpdateOpenIssueCountWorker - :feature_category: :users - :has_external_dependencies: - :urgency: :low - :resource_boundary: :unknown - :weight: 1 - :idempotent: true - :tags: - - :exclude_from_kubernetes - :name: web_hook :worker_name: WebHookWorker :feature_category: :integrations @@ -2930,6 +2818,15 @@ :idempotent: true :tags: - :exclude_from_kubernetes +- :name: web_hooks_log_execution + :worker_name: WebHooks::LogExecutionWorker + :feature_category: :integrations + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: wikis_git_garbage_collect :worker_name: Wikis::GitGarbageCollectWorker :feature_category: :gitaly diff --git a/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb b/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb deleted file mode 100644 index 083c01b166d..00000000000 --- a/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb +++ /dev/null @@ -1,24 +0,0 @@ -# frozen_string_literal: true - -module Analytics - module InstanceStatistics - # This worker will be removed in 14.0 - class CountJobTriggerWorker - include ApplicationWorker - - sidekiq_options retry: 3 - include CronjobQueue # rubocop:disable Scalability/CronWorkerContext - - feature_category :devops_reports - tags :exclude_from_kubernetes - urgency :low - - idempotent! - - def perform - # Delegate to the new worker - Analytics::UsageTrends::CountJobTriggerWorker.new.perform - end - end - end -end diff --git a/app/workers/analytics/instance_statistics/counter_job_worker.rb b/app/workers/analytics/instance_statistics/counter_job_worker.rb deleted file mode 100644 index a4dda45ff72..00000000000 --- a/app/workers/analytics/instance_statistics/counter_job_worker.rb +++ /dev/null @@ -1,23 +0,0 @@ -# frozen_string_literal: true - -module Analytics - module InstanceStatistics - # This worker will be removed in 14.0 - class CounterJobWorker - include ApplicationWorker - - sidekiq_options retry: 3 - - feature_category :devops_reports - urgency :low - tags :exclude_from_kubernetes - - idempotent! - - def perform(*args) - # Delegate to the new worker - Analytics::UsageTrends::CounterJobWorker.new.perform(*args) - end - end - end -end diff --git a/app/workers/authorized_project_update/project_recalculate_worker.rb b/app/workers/authorized_project_update/project_recalculate_worker.rb new file mode 100644 index 00000000000..3f0672992ef --- /dev/null +++ b/app/workers/authorized_project_update/project_recalculate_worker.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module AuthorizedProjectUpdate + class ProjectRecalculateWorker + include ApplicationWorker + include Gitlab::ExclusiveLeaseHelpers + + feature_category :authentication_and_authorization + urgency :high + queue_namespace :authorized_project_update + + deduplicate :until_executing, including_scheduled: true + idempotent! + + def perform(project_id) + project = Project.find_by_id(project_id) + return unless project + + in_lock(lock_key(project), ttl: 10.seconds) do + AuthorizedProjectUpdate::ProjectRecalculateService.new(project).execute + end + end + + private + + def lock_key(project) + "#{self.class.name.underscore}/#{project.root_namespace.id}" + end + end +end diff --git a/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb b/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb new file mode 100644 index 00000000000..5ca9de63fd7 --- /dev/null +++ b/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module AuthorizedProjectUpdate + class UserRefreshFromReplicaWorker < ::AuthorizedProjectsWorker + feature_category :authentication_and_authorization + urgency :low + queue_namespace :authorized_project_update + deduplicate :until_executing, including_scheduled: true + + idempotent! + + # This worker will start reading data from the replica database soon + # Issue: https://gitlab.com/gitlab-org/gitlab/-/issues/333219 + end +end diff --git a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb index 2e4e2dd3232..ab4d9c13422 100644 --- a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb +++ b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb @@ -2,10 +2,9 @@ module AuthorizedProjectUpdate class UserRefreshOverUserRangeWorker # rubocop:disable Scalability/IdempotentWorker - # When the feature flag named `periodic_project_authorization_update_via_replica` is enabled, - # this worker checks if a specific user requires an update to their project_authorizations records. + # This worker checks if users requires an update to their project_authorizations records. # This check is done via the data read from the database replica (and not from the primary). - # If this check returns true, a completely new Sidekiq job is enqueued for this specific user + # If this check returns true, a completely new Sidekiq job is enqueued for a specific user # so as to update its project_authorizations records. # There is a possibility that the data in the replica is lagging behind the primary @@ -24,25 +23,16 @@ module AuthorizedProjectUpdate # `data_consistency :delayed` and not `idempotent!` # See https://gitlab.com/gitlab-org/gitlab/-/issues/325291 deduplicate :until_executing, including_scheduled: true - data_consistency :delayed, feature_flag: :delayed_consistency_for_user_refresh_over_range_worker + data_consistency :delayed def perform(start_user_id, end_user_id) - if Feature.enabled?(:periodic_project_authorization_update_via_replica) - User.where(id: start_user_id..end_user_id).find_each do |user| # rubocop: disable CodeReuse/ActiveRecord - enqueue_project_authorizations_refresh(user) if project_authorizations_needs_refresh?(user) - end - else - use_primary_database - AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute + User.where(id: start_user_id..end_user_id).find_each do |user| # rubocop: disable CodeReuse/ActiveRecord + enqueue_project_authorizations_refresh(user) if project_authorizations_needs_refresh?(user) end end private - def use_primary_database - # no-op in CE, overriden in EE - end - def project_authorizations_needs_refresh?(user) AuthorizedProjectUpdate::FindRecordsDueForRefreshService.new(user).needs_refresh? end @@ -54,5 +44,3 @@ module AuthorizedProjectUpdate end end end - -AuthorizedProjectUpdate::UserRefreshOverUserRangeWorker.prepend_mod_with('AuthorizedProjectUpdate::UserRefreshOverUserRangeWorker') diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb index be79d6b2afb..a0d1d9dca45 100644 --- a/app/workers/build_hooks_worker.rb +++ b/app/workers/build_hooks_worker.rb @@ -9,17 +9,7 @@ class BuildHooksWorker # rubocop:disable Scalability/IdempotentWorker queue_namespace :pipeline_hooks feature_category :continuous_integration urgency :high - data_consistency :delayed, feature_flag: :load_balancing_for_build_hooks_worker - - DATA_CONSISTENCY_DELAY = 3 - - def self.perform_async(*args) - if Feature.enabled?(:delayed_perform_for_build_hooks_worker, default_enabled: :yaml) - perform_in(DATA_CONSISTENCY_DELAY.seconds, *args) - else - super - end - end + data_consistency :delayed # rubocop: disable CodeReuse/ActiveRecord def perform(build_id) diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb index e9bb2d88a81..aa3c03f773e 100644 --- a/app/workers/build_queue_worker.rb +++ b/app/workers/build_queue_worker.rb @@ -10,11 +10,12 @@ class BuildQueueWorker # rubocop:disable Scalability/IdempotentWorker feature_category :continuous_integration urgency :high worker_resource_boundary :cpu + data_consistency :sticky, feature_flag: :load_balancing_for_build_queue_worker # rubocop: disable CodeReuse/ActiveRecord def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| - Ci::UpdateBuildQueueService.new.execute(build) + Ci::UpdateBuildQueueService.new.tick(build) end end # rubocop: enable CodeReuse/ActiveRecord diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index 8ad31c68374..25a86ead76e 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -15,7 +15,8 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker @bulk_import = BulkImport.find_by_id(bulk_import_id) return unless @bulk_import - return if @bulk_import.finished? + return if @bulk_import.finished? || @bulk_import.failed? + return @bulk_import.fail_op! if all_entities_failed? return @bulk_import.finish! if all_entities_processed? && @bulk_import.started? return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running @@ -55,6 +56,10 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker entities.all? { |entity| entity.finished? || entity.failed? } end + def all_entities_failed? + entities.all? { |entity| entity.failed? } + end + def max_batch_size_exceeded? started_entities.count >= DEFAULT_BATCH_SIZE end diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb index cccc24d3bdc..24e75ad0f85 100644 --- a/app/workers/bulk_imports/export_request_worker.rb +++ b/app/workers/bulk_imports/export_request_worker.rb @@ -24,7 +24,7 @@ module BulkImports end def http_client(configuration) - @client ||= Clients::Http.new( + @client ||= Clients::HTTP.new( uri: configuration.url, token: configuration.access_token ) diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 256301bf097..d3297017714 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -4,6 +4,8 @@ module BulkImports class PipelineWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker + NDJSON_PIPELINE_PERFORM_DELAY = 1.minute + feature_category :importers tags :exclude_from_kubernetes @@ -40,6 +42,15 @@ module BulkImports private def run(pipeline_tracker) + if ndjson_pipeline?(pipeline_tracker) + status = ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class.relation) + + raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?(pipeline_tracker) + raise(Pipeline::FailedError, status.error) if status.failed? + + return reenqueue(pipeline_tracker) if status.started? + end + pipeline_tracker.update!(status_event: 'start', jid: jid) context = ::BulkImports::Pipeline::Context.new(pipeline_tracker) @@ -48,7 +59,7 @@ module BulkImports pipeline_tracker.finish! rescue StandardError => e - pipeline_tracker.fail_op! + pipeline_tracker.update!(status_event: 'fail_op', jid: jid) logger.error( worker: self.class.name, @@ -67,5 +78,17 @@ module BulkImports def logger @logger ||= Gitlab::Import::Logger.build end + + def ndjson_pipeline?(pipeline_tracker) + pipeline_tracker.pipeline_class.ndjson_pipeline? + end + + def job_timeout?(pipeline_tracker) + (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT + end + + def reenqueue(pipeline_tracker) + self.class.perform_in(NDJSON_PIPELINE_PERFORM_DELAY, pipeline_tracker.id, pipeline_tracker.stage, pipeline_tracker.entity.id) + end end end diff --git a/app/workers/ci/initial_pipeline_process_worker.rb b/app/workers/ci/initial_pipeline_process_worker.rb index 4dace43298d..ca41a7fb577 100644 --- a/app/workers/ci/initial_pipeline_process_worker.rb +++ b/app/workers/ci/initial_pipeline_process_worker.rb @@ -15,7 +15,7 @@ module Ci def perform(pipeline_id) Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| - Ci::ProcessPipelineService + Ci::PipelineCreation::StartPipelineService .new(pipeline) .execute end diff --git a/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb index dd7bfff4eb1..ec0cb69d0c7 100644 --- a/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb +++ b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb @@ -9,7 +9,6 @@ module Ci include PipelineBackgroundQueue feature_category :code_testing - tags :exclude_from_kubernetes idempotent! diff --git a/app/workers/clusters/cleanup/app_worker.rb b/app/workers/clusters/cleanup/app_worker.rb deleted file mode 100644 index 1d01cec174b..00000000000 --- a/app/workers/clusters/cleanup/app_worker.rb +++ /dev/null @@ -1,19 +0,0 @@ -# frozen_string_literal: true - -module Clusters - module Cleanup - class AppWorker # rubocop:disable Scalability/IdempotentWorker - include ClusterCleanupMethods - - def perform(cluster_id, execution_count = 0) - Clusters::Cluster.with_persisted_applications.find_by_id(cluster_id).try do |cluster| - break unless cluster.cleanup_uninstalling_applications? - - break exceeded_execution_limit(cluster) if exceeded_execution_limit?(execution_count) - - ::Clusters::Cleanup::AppService.new(cluster, execution_count).execute - end - end - end - end -end diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 843be4896a3..3cba1eb31c5 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -13,6 +13,7 @@ module ApplicationWorker include Gitlab::SidekiqVersioning::Worker LOGGING_EXTRA_KEY = 'extra' + DEFAULT_DELAY_INTERVAL = 1 included do set_queue @@ -51,6 +52,16 @@ module ApplicationWorker subclass.after_set_class_attribute { subclass.set_queue } end + def perform_async(*args) + # Worker execution for workers with data_consistency set to :delayed or :sticky + # will be delayed to give replication enough time to complete + if utilizes_load_balancing_capabilities? + perform_in(delay_interval, *args) + else + super + end + end + def set_queue queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self) sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue @@ -111,5 +122,11 @@ module ApplicationWorker Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule) end end + + protected + + def delay_interval + DEFAULT_DELAY_INTERVAL.seconds + end end end diff --git a/app/workers/concerns/security_scans_queue.rb b/app/workers/concerns/security_scans_queue.rb index f731317bb37..27e97169926 100644 --- a/app/workers/concerns/security_scans_queue.rb +++ b/app/workers/concerns/security_scans_queue.rb @@ -8,6 +8,6 @@ module SecurityScansQueue included do queue_namespace :security_scans - feature_category :static_application_security_testing + feature_category :vulnerability_management end end diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index 6dee9402691..096be808787 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -71,6 +71,20 @@ module WorkerAttributes class_attributes[:urgency] || :low end + # Allows configuring worker's data_consistency. + # + # Worker can utilize Sidekiq readonly database replicas capabilities by setting data_consistency attribute. + # Workers with data_consistency set to :delayed or :sticky, calling #perform_async + # will be delayed in order to give replication process enough time to complete. + # + # - *data_consistency* values: + # - 'always' - The job is required to use the primary database (default). + # - 'sticky' - The uses a replica as long as possible. It switches to primary either on write or long replication lag. + # - 'delayed' - The job would switch to primary only on write. It would use replica always. + # If there's a long replication lag the job will be delayed, and only if the replica is not up to date on the next retry, + # it will switch to the primary. + # - *feature_flag* - allows you to toggle a job's `data_consistency, which permits you to safely toggle load balancing capabilities for a specific job. + # If disabled, job will default to `:always`, which means that the job will always use the primary. def data_consistency(data_consistency, feature_flag: nil) raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency) raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency] @@ -85,11 +99,16 @@ module WorkerAttributes # Since the deduplication should always take into account the latest binary replication pointer into account, # not the first one, the deduplication will not work with sticky or delayed. # Follow up issue to improve this: https://gitlab.com/gitlab-org/gitlab/-/issues/325291 - if idempotent? && get_data_consistency != :always + if idempotent? && utilizes_load_balancing_capabilities? raise ArgumentError, "Class can't be marked as idempotent if data_consistency is not set to :always" end end + # If data_consistency is not set to :always, worker will try to utilize load balancing capabilities and use the replica + def utilizes_load_balancing_capabilities? + get_data_consistency != :always + end + def get_data_consistency class_attributes[:data_consistency] || :always end @@ -167,6 +186,12 @@ module WorkerAttributes class_attributes[:deduplication_options] || {} end + def deduplication_enabled? + return true unless get_deduplication_options[:feature_flag] + + Feature.enabled?(get_deduplication_options[:feature_flag], default_enabled: :yaml) + end + def big_payload! set_class_attribute(:big_payload, true) end diff --git a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb index 40cc233307a..3027d46b8b1 100644 --- a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb +++ b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb @@ -65,19 +65,9 @@ module ContainerExpirationPolicies def container_repository strong_memoize(:container_repository) do ContainerRepository.transaction do - # rubocop: disable CodeReuse/ActiveRecord # We need a lock to prevent two workers from picking up the same row - container_repository = if loopless_enabled? - next_container_repository - else - ContainerRepository.waiting_for_cleanup - .order(:expiration_policy_cleanup_status, :expiration_policy_started_at) - .limit(1) - .lock('FOR UPDATE SKIP LOCKED') - .first - end - - # rubocop: enable CodeReuse/ActiveRecord + container_repository = next_container_repository + container_repository&.tap(&:cleanup_ongoing!) end end @@ -102,28 +92,20 @@ module ContainerExpirationPolicies def cleanup_scheduled_count strong_memoize(:cleanup_scheduled_count) do - if loopless_enabled? - limit = max_running_jobs + 1 - ContainerExpirationPolicy.with_container_repositories - .runnable_schedules - .limit(limit) - .count - else - ContainerRepository.cleanup_scheduled.count - end + limit = max_running_jobs + 1 + ContainerExpirationPolicy.with_container_repositories + .runnable_schedules + .limit(limit) + .count end end def cleanup_unfinished_count strong_memoize(:cleanup_unfinished_count) do - if loopless_enabled? - limit = max_running_jobs + 1 - ContainerRepository.with_unfinished_cleanup - .limit(limit) - .count - else - ContainerRepository.cleanup_unfinished.count - end + limit = max_running_jobs + 1 + ContainerRepository.with_unfinished_cleanup + .limit(limit) + .count end end @@ -132,21 +114,13 @@ module ContainerExpirationPolicies now = Time.zone.now - if loopless_enabled? - policy.next_run_at < now || (now + max_cleanup_execution_time.seconds < policy.next_run_at) - else - now + max_cleanup_execution_time.seconds < policy.next_run_at - end + policy.next_run_at < now || (now + max_cleanup_execution_time.seconds < policy.next_run_at) end def throttling_enabled? Feature.enabled?(:container_registry_expiration_policies_throttling) end - def loopless_enabled? - Feature.enabled?(:container_registry_expiration_policies_loopless) - end - def max_cleanup_execution_time ::Gitlab::CurrentSettings.container_registry_delete_tags_service_timeout end diff --git a/app/workers/container_expiration_policy_worker.rb b/app/workers/container_expiration_policy_worker.rb index dec13485d13..b15d1bf90bd 100644 --- a/app/workers/container_expiration_policy_worker.rb +++ b/app/workers/container_expiration_policy_worker.rb @@ -14,11 +14,18 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo BATCH_SIZE = 1000 def perform + process_stale_ongoing_cleanups throttling_enabled? ? perform_throttled : perform_unthrottled end private + def process_stale_ongoing_cleanups + threshold = delete_tags_service_timeout.seconds + 30.minutes + ContainerRepository.with_stale_ongoing_cleanup(threshold.ago) + .update_all(expiration_policy_cleanup_status: :cleanup_unfinished) + end + def perform_unthrottled with_runnable_policy(preloaded: true) do |policy| with_context(project: policy.project, @@ -31,18 +38,6 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo def perform_throttled try_obtain_lease do - unless loopless_enabled? - with_runnable_policy do |policy| - ContainerExpirationPolicy.transaction do - policy.schedule_next_run! - ContainerRepository.for_project_id(policy.id) - .each_batch do |relation| - relation.update_all(expiration_policy_cleanup_status: :cleanup_scheduled) - end - end - end - end - ContainerExpirationPolicies::CleanupContainerRepositoryWorker.perform_with_capacity end end @@ -79,11 +74,11 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo Feature.enabled?(:container_registry_expiration_policies_throttling) end - def loopless_enabled? - Feature.enabled?(:container_registry_expiration_policies_loopless) - end - def lease_timeout 5.hours end + + def delete_tags_service_timeout + ::Gitlab::CurrentSettings.current_application_settings.container_registry_delete_tags_service_timeout || 0 + end end diff --git a/app/workers/deployments/execute_hooks_worker.rb b/app/workers/deployments/execute_hooks_worker.rb deleted file mode 100644 index 3046aa28e20..00000000000 --- a/app/workers/deployments/execute_hooks_worker.rb +++ /dev/null @@ -1,20 +0,0 @@ -# frozen_string_literal: true - -module Deployments - # TODO: remove in https://gitlab.com/gitlab-org/gitlab/-/issues/329360 - class ExecuteHooksWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - sidekiq_options retry: 3 - - queue_namespace :deployment - feature_category :continuous_delivery - worker_resource_boundary :cpu - - def perform(deployment_id) - if (deploy = Deployment.find_by_id(deployment_id)) - deploy.execute_hooks(Time.current) - end - end - end -end diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index 3c48c4ba3cd..9702fac39ba 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -1,5 +1,6 @@ # frozen_string_literal: true +# rubocop: disable Scalability/IdempotentWorker class ExpirePipelineCacheWorker include ApplicationWorker @@ -9,8 +10,12 @@ class ExpirePipelineCacheWorker queue_namespace :pipeline_cache urgency :high worker_resource_boundary :cpu + data_consistency :delayed, feature_flag: :load_balancing_for_expire_pipeline_cache_worker - idempotent! + # This worker _should_ be idempotent, but due to us moving this to data_consistency :delayed + # and an ongoing incompatibility between the two switches, we need to disable this. + # Uncomment once https://gitlab.com/gitlab-org/gitlab/-/issues/325291 is resolved + # idempotent! # rubocop: disable CodeReuse/ActiveRecord def perform(pipeline_id) @@ -21,3 +26,4 @@ class ExpirePipelineCacheWorker end # rubocop: enable CodeReuse/ActiveRecord end +# rubocop:enable Scalability/IdempotentWorker diff --git a/app/workers/git_garbage_collect_worker.rb b/app/workers/git_garbage_collect_worker.rb deleted file mode 100644 index a2aab23db7b..00000000000 --- a/app/workers/git_garbage_collect_worker.rb +++ /dev/null @@ -1,19 +0,0 @@ -# frozen_string_literal: true - -# According to our docs, we can only remove workers on major releases -# https://docs.gitlab.com/ee/development/sidekiq_style_guide.html#removing-workers. -# -# We need to still maintain this until 14.0 but with the current functionality. -# -# In https://gitlab.com/gitlab-org/gitlab/-/issues/299290 we track that removal. -class GitGarbageCollectWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - sidekiq_options retry: false - feature_category :gitaly - loggable_arguments 1, 2, 3 - - def perform(project_id, task = :gc, lease_key = nil, lease_uuid = nil) - ::Projects::GitGarbageCollectWorker.new.perform(project_id, task, lease_key, lease_uuid) - end -end diff --git a/app/workers/incident_management/process_alert_worker.rb b/app/workers/incident_management/process_alert_worker.rb deleted file mode 100644 index 3b90e296ad4..00000000000 --- a/app/workers/incident_management/process_alert_worker.rb +++ /dev/null @@ -1,56 +0,0 @@ -# frozen_string_literal: true - -module IncidentManagement - class ProcessAlertWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - sidekiq_options retry: 3 - - queue_namespace :incident_management - feature_category :incident_management - - # `project_id` and `alert_payload` are deprecated and can be removed - # starting from 14.0 release - # https://gitlab.com/gitlab-org/gitlab/-/issues/224500 - # - # This worker is not scheduled anymore since - # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/60285 - # and will be removed completely via - # https://gitlab.com/gitlab-org/gitlab/-/issues/224500 - # in 14.0. - def perform(_project_id = nil, _alert_payload = nil, alert_id = nil) - return unless alert_id - - alert = find_alert(alert_id) - return unless alert - - result = create_issue_for(alert) - return if result.success? - - log_warning(alert, result) - end - - private - - def find_alert(alert_id) - AlertManagement::Alert.find_by_id(alert_id) - end - - def create_issue_for(alert) - AlertManagement::CreateAlertIssueService - .new(alert, User.alert_bot) - .execute - end - - def log_warning(alert, result) - issue_id = result.payload[:issue]&.id - - Gitlab::AppLogger.warn( - message: 'Cannot process an Incident', - issue_id: issue_id, - alert_id: alert.id, - errors: result.message - ) - end - end -end diff --git a/app/workers/incident_management/process_prometheus_alert_worker.rb b/app/workers/incident_management/process_prometheus_alert_worker.rb deleted file mode 100644 index 7b5c6fd9001..00000000000 --- a/app/workers/incident_management/process_prometheus_alert_worker.rb +++ /dev/null @@ -1,23 +0,0 @@ -# frozen_string_literal: true - -module IncidentManagement - class ProcessPrometheusAlertWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - sidekiq_options retry: 3 - - queue_namespace :incident_management - feature_category :incident_management - worker_resource_boundary :cpu - - def perform(project_id, alert_hash) - # no-op - # - # This worker is not scheduled anymore since - # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/35943 - # and will be removed completely via - # https://gitlab.com/gitlab-org/gitlab/-/issues/227146 - # in 14.0. - end - end -end diff --git a/app/workers/issue_placement_worker.rb b/app/workers/issue_placement_worker.rb index dba791c3f05..8166dda135e 100644 --- a/app/workers/issue_placement_worker.rb +++ b/app/workers/issue_placement_worker.rb @@ -41,7 +41,7 @@ class IssuePlacementWorker IssuePlacementWorker.perform_async(nil, leftover.project_id) if leftover.present? rescue RelativePositioning::NoSpaceLeft => e Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, project_id: project_id) - IssueRebalancingWorker.perform_async(nil, project_id.presence || issue.project_id) + IssueRebalancingWorker.perform_async(nil, *root_namespace_id_to_rebalance(issue, project_id)) end def find_issue(issue_id, project_id) @@ -53,4 +53,11 @@ class IssuePlacementWorker project.issues.take end # rubocop: enable CodeReuse/ActiveRecord + + private + + def root_namespace_id_to_rebalance(issue, project_id) + project_id = project_id.presence || issue.project_id + Project.find(project_id)&.self_or_root_group_ids + end end diff --git a/app/workers/issue_rebalancing_worker.rb b/app/workers/issue_rebalancing_worker.rb index 9eac451f107..66ef7dd3152 100644 --- a/app/workers/issue_rebalancing_worker.rb +++ b/app/workers/issue_rebalancing_worker.rb @@ -9,21 +9,44 @@ class IssueRebalancingWorker urgency :low feature_category :issue_tracking tags :exclude_from_kubernetes + deduplicate :until_executed, including_scheduled: true - def perform(ignore = nil, project_id = nil) - return if project_id.nil? + def perform(ignore = nil, project_id = nil, root_namespace_id = nil) + # we need to have exactly one of the project_id and root_namespace_id params be non-nil + raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id + return if project_id.nil? && root_namespace_id.nil? - project = Project.find(project_id) + # pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce) + # or the root namespace, this also makes the worker backward compatible with previous version where a project_id was + # passed as the param + projects_to_rebalance = projects_collection(project_id, root_namespace_id) - # Temporary disable reabalancing for performance reasons + # something might have happened with the namespace between scheduling the worker and actually running it, + # maybe it was removed. + if projects_to_rebalance.blank? + Gitlab::ErrorTracking.log_exception( + ArgumentError.new("Projects to be rebalanced not found for arguments: project_id #{project_id}, root_namespace_id: #{root_namespace_id}"), + { project_id: project_id, root_namespace_id: root_namespace_id }) + + return + end + + # Temporary disable rebalancing for performance reasons # For more information check https://gitlab.com/gitlab-com/gl-infra/production/-/issues/4321 - return if project.root_namespace&.issue_repositioning_disabled? + return if projects_to_rebalance.take&.root_namespace&.issue_repositioning_disabled? # rubocop:disable CodeReuse/ActiveRecord + + IssueRebalancingService.new(projects_to_rebalance).execute + rescue IssueRebalancingService::TooManyIssues => e + Gitlab::ErrorTracking.log_exception(e, root_namespace_id: root_namespace_id, project_id: project_id) + end + + private - # All issues are equivalent as far as we are concerned - issue = project.issues.take # rubocop: disable CodeReuse/ActiveRecord + def projects_collection(project_id, root_namespace_id) + # we can have either project_id(older version) or project_id if project is part of a user namespace and not a group + # or root_namespace_id(newer version) never both. + return Project.id_in([project_id]) if project_id - IssueRebalancingService.new(issue).execute - rescue ActiveRecord::RecordNotFound, IssueRebalancingService::TooManyIssues => e - Gitlab::ErrorTracking.log_exception(e, project_id: project_id) + Namespace.find_by_id(root_namespace_id)&.all_projects end end diff --git a/app/workers/jira_connect/sync_branch_worker.rb b/app/workers/jira_connect/sync_branch_worker.rb index b8211286d1c..4e8566d86c9 100644 --- a/app/workers/jira_connect/sync_branch_worker.rb +++ b/app/workers/jira_connect/sync_branch_worker.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module JiraConnect - class SyncBranchWorker # rubocop:disable Scalability/IdempotentWorker + class SyncBranchWorker include ApplicationWorker sidekiq_options retry: 3 diff --git a/app/workers/jira_connect/sync_merge_request_worker.rb b/app/workers/jira_connect/sync_merge_request_worker.rb index 6b3a6ae84ad..bf31df2271f 100644 --- a/app/workers/jira_connect/sync_merge_request_worker.rb +++ b/app/workers/jira_connect/sync_merge_request_worker.rb @@ -1,7 +1,7 @@ # frozen_string_literal: true module JiraConnect - class SyncMergeRequestWorker # rubocop:disable Scalability/IdempotentWorker + class SyncMergeRequestWorker include ApplicationWorker sidekiq_options retry: 3 diff --git a/app/workers/merge_requests/assignees_change_worker.rb b/app/workers/merge_requests/assignees_change_worker.rb deleted file mode 100644 index fe39f20151f..00000000000 --- a/app/workers/merge_requests/assignees_change_worker.rb +++ /dev/null @@ -1,28 +0,0 @@ -# frozen_string_literal: true - -class MergeRequests::AssigneesChangeWorker - include ApplicationWorker - - sidekiq_options retry: 3 - - feature_category :source_code_management - urgency :high - deduplicate :until_executed - idempotent! - - def perform(merge_request_id, user_id, old_assignee_ids) - merge_request = MergeRequest.find(merge_request_id) - current_user = User.find(user_id) - - # if a user was added and then removed, or removed and then added - # while waiting for this job to run, assume that nothing happened. - users = User.id_in(old_assignee_ids - merge_request.assignee_ids) - - return if users.blank? - - ::MergeRequests::HandleAssigneesChangeService - .new(project: merge_request.target_project, current_user: current_user) - .execute(merge_request, users, execute_hooks: true) - rescue ActiveRecord::RecordNotFound - end -end diff --git a/app/workers/packages/debian/generate_distribution_worker.rb b/app/workers/packages/debian/generate_distribution_worker.rb new file mode 100644 index 00000000000..68fdd80ffb1 --- /dev/null +++ b/app/workers/packages/debian/generate_distribution_worker.rb @@ -0,0 +1,44 @@ +# frozen_string_literal: true + +module Packages + module Debian + class GenerateDistributionWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + include Gitlab::Utils::StrongMemoize + + # The worker is idempotent, by reusing component files with the same file_sha256. + # + # See GenerateDistributionService#find_or_create_component_file + deduplicate :until_executed + idempotent! + + queue_namespace :package_repositories + feature_category :package_registry + + loggable_arguments 0 + + def perform(container_type, distribution_id) + @container_type = container_type + @distribution_id = distribution_id + + return unless distribution + + ::Packages::Debian::GenerateDistributionService.new(distribution).execute + end + + private + + def container_class + return ::Packages::Debian::GroupDistribution if @container_type == :group + + ::Packages::Debian::ProjectDistribution + end + + def distribution + strong_memoize(:distribution) do + container_class.find_by_id(@distribution_id) + end + end + end + end +end diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb index fbb672f52e3..97e6adbbf18 100644 --- a/app/workers/pipeline_hooks_worker.rb +++ b/app/workers/pipeline_hooks_worker.rb @@ -7,8 +7,8 @@ class PipelineHooksWorker # rubocop:disable Scalability/IdempotentWorker include PipelineQueue queue_namespace :pipeline_hooks - urgency :high worker_resource_boundary :cpu + data_consistency :delayed, feature_flag: :load_balancing_for_pipeline_hooks_worker # rubocop: disable CodeReuse/ActiveRecord def perform(pipeline_id) diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index dc14789fe73..a35b32c35f2 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -class PipelineProcessWorker # rubocop:disable Scalability/IdempotentWorker +class PipelineProcessWorker include ApplicationWorker sidekiq_options retry: 3 @@ -10,12 +10,12 @@ class PipelineProcessWorker # rubocop:disable Scalability/IdempotentWorker feature_category :continuous_integration urgency :high loggable_arguments 1 - data_consistency :delayed, feature_flag: :load_balancing_for_pipeline_process_worker + + idempotent! + deduplicate :until_executing, feature_flag: :ci_idempotent_pipeline_process_worker # rubocop: disable CodeReuse/ActiveRecord - # `_build_ids` is deprecated and will be removed in 14.0 - # See: https://gitlab.com/gitlab-org/gitlab/-/issues/232806 - def perform(pipeline_id, _build_ids = nil) + def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| Ci::ProcessPipelineService .new(pipeline) diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb deleted file mode 100644 index e8feb4f2db2..00000000000 --- a/app/workers/pipeline_update_worker.rb +++ /dev/null @@ -1,19 +0,0 @@ -# frozen_string_literal: true - -# This worker is deprecated and will be removed in 14.0 -# See: https://gitlab.com/gitlab-org/gitlab/-/issues/232806 -class PipelineUpdateWorker - include ApplicationWorker - - sidekiq_options retry: 3 - include PipelineQueue - - queue_namespace :pipeline_processing - urgency :high - - idempotent! - - def perform(_pipeline_id) - # no-op - end -end diff --git a/app/workers/project_schedule_bulk_repository_shard_moves_worker.rb b/app/workers/project_schedule_bulk_repository_shard_moves_worker.rb deleted file mode 100644 index 23d1594e4d9..00000000000 --- a/app/workers/project_schedule_bulk_repository_shard_moves_worker.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -# This is a compatibility class to avoid calling a non-existent -# class from sidekiq during deployment. -# -# This class was moved to a namespace in https://gitlab.com/gitlab-org/gitlab/-/issues/299853. -# we cannot remove this class entirely because there can be jobs -# referencing it. -# -# We can get rid of this class in 14.0 -# https://gitlab.com/gitlab-org/gitlab/-/issues/322393 -class ProjectScheduleBulkRepositoryShardMovesWorker < Projects::ScheduleBulkRepositoryShardMovesWorker - idempotent! - urgency :throttled -end diff --git a/app/workers/project_update_repository_storage_worker.rb b/app/workers/project_update_repository_storage_worker.rb deleted file mode 100644 index 0d68c0e16f8..00000000000 --- a/app/workers/project_update_repository_storage_worker.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -# This is a compatibility class to avoid calling a non-existent -# class from sidekiq during deployment. -# -# This class was moved to a namespace in https://gitlab.com/gitlab-org/gitlab/-/issues/299853. -# we cannot remove this class entirely because there can be jobs -# referencing it. -# -# We can get rid of this class in 14.0 -# https://gitlab.com/gitlab-org/gitlab/-/issues/322393 -class ProjectUpdateRepositoryStorageWorker < Projects::UpdateRepositoryStorageWorker - idempotent! - urgency :throttled -end diff --git a/app/workers/propagate_integration_worker.rb b/app/workers/propagate_integration_worker.rb index 5e694529bc0..0f8229bdf09 100644 --- a/app/workers/propagate_integration_worker.rb +++ b/app/workers/propagate_integration_worker.rb @@ -9,9 +9,7 @@ class PropagateIntegrationWorker idempotent! loggable_arguments 1 - # TODO: Keep overwrite parameter for backwards compatibility. Remove after >= 14.0 - # https://gitlab.com/gitlab-org/gitlab/-/issues/255382 - def perform(integration_id, overwrite = nil) + def perform(integration_id) Admin::PropagateIntegrationService.propagate(Integration.find(integration_id)) end end diff --git a/app/workers/prune_web_hook_logs_worker.rb b/app/workers/prune_web_hook_logs_worker.rb deleted file mode 100644 index abfaabbf01d..00000000000 --- a/app/workers/prune_web_hook_logs_worker.rb +++ /dev/null @@ -1,24 +0,0 @@ -# frozen_string_literal: true - -# Worker that deletes a fixed number of outdated rows from the "web_hook_logs" -# table. -class PruneWebHookLogsWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - sidekiq_options retry: 3 - # rubocop:disable Scalability/CronWorkerContext - # This worker does not perform work scoped to a context - include CronjobQueue - # rubocop:enable Scalability/CronWorkerContext - - feature_category :integrations - - # The maximum number of rows to remove in a single job. - DELETE_LIMIT = 50_000 - - def perform - cutoff_date = 90.days.ago.beginning_of_day - - WebHookLog.created_before(cutoff_date).delete_with_limit(DELETE_LIMIT) - end -end diff --git a/app/workers/remove_unreferenced_lfs_objects_worker.rb b/app/workers/remove_unreferenced_lfs_objects_worker.rb index b42883549ca..ca4b70a0485 100644 --- a/app/workers/remove_unreferenced_lfs_objects_worker.rb +++ b/app/workers/remove_unreferenced_lfs_objects_worker.rb @@ -1,6 +1,6 @@ # frozen_string_literal: true -class RemoveUnreferencedLfsObjectsWorker # rubocop:disable Scalability/IdempotentWorker +class RemoveUnreferencedLfsObjectsWorker include ApplicationWorker sidekiq_options retry: 3 @@ -10,8 +10,16 @@ class RemoveUnreferencedLfsObjectsWorker # rubocop:disable Scalability/Idempoten # rubocop:enable Scalability/CronWorkerContext feature_category :git_lfs + deduplicate :until_executed + idempotent! def perform - LfsObject.destroy_unreferenced + number_of_removed_files = 0 + + LfsObject.unreferenced_in_batches do |lfs_objects_without_projects| + number_of_removed_files += lfs_objects_without_projects.destroy_all.count # rubocop: disable Cop/DestroyAll + end + + number_of_removed_files end end diff --git a/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb b/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb deleted file mode 100644 index 94a6b22538b..00000000000 --- a/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb +++ /dev/null @@ -1,16 +0,0 @@ -# frozen_string_literal: true - -# This is a compatibility class to avoid calling a non-existent -# class from sidekiq during deployment. -# -# This class was moved to a namespace in https://gitlab.com/gitlab-org/gitlab/-/issues/299853. -# we cannot remove this class entirely because there can be jobs -# referencing it. -# -# We can get rid of this class in 14.0 -# https://gitlab.com/gitlab-org/gitlab/-/issues/322393 -class SnippetScheduleBulkRepositoryShardMovesWorker < Snippets::ScheduleBulkRepositoryShardMovesWorker - idempotent! - feature_category :gitaly - urgency :throttled -end diff --git a/app/workers/snippet_update_repository_storage_worker.rb b/app/workers/snippet_update_repository_storage_worker.rb deleted file mode 100644 index befae6db4f4..00000000000 --- a/app/workers/snippet_update_repository_storage_worker.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -# This is a compatibility class to avoid calling a non-existent -# class from sidekiq during deployment. -# -# This class was moved to a namespace in https://gitlab.com/gitlab-org/gitlab/-/issues/299853. -# we cannot remove this class entirely because there can be jobs -# referencing it. -# -# We can get rid of this class in 14.0 -# https://gitlab.com/gitlab-org/gitlab/-/issues/322393 -class SnippetUpdateRepositoryStorageWorker < Snippets::UpdateRepositoryStorageWorker # rubocop:disable Scalability/IdempotentWorker - idempotent! - urgency :throttled -end diff --git a/app/workers/ssh_keys/expired_notification_worker.rb b/app/workers/ssh_keys/expired_notification_worker.rb index 9d5143fe655..b67849942b0 100644 --- a/app/workers/ssh_keys/expired_notification_worker.rb +++ b/app/workers/ssh_keys/expired_notification_worker.rb @@ -11,20 +11,37 @@ module SshKeys tags :exclude_from_kubernetes idempotent! + BATCH_SIZE = 500 + + # rubocop: disable CodeReuse/ActiveRecord def perform - return unless ::Feature.enabled?(:ssh_key_expiration_email_notification, default_enabled: :yaml) + order = Gitlab::Pagination::Keyset::Order.build([ + Gitlab::Pagination::Keyset::ColumnOrderDefinition.new( + attribute_name: 'expires_at_utc', + order_expression: Arel.sql("date(expires_at AT TIME ZONE 'UTC')").asc, + nullable: :not_nullable, + distinct: false, + add_to_projections: true + ), + Gitlab::Pagination::Keyset::ColumnOrderDefinition.new( + attribute_name: 'id', + order_expression: Key.arel_table[:id].asc + ) + ]) - # rubocop:disable CodeReuse/ActiveRecord - User.with_ssh_key_expired_today.find_each(batch_size: 10_000) do |user| - with_context(user: user) do - Gitlab::AppLogger.info "#{self.class}: Notifying User #{user.id} about expired ssh key(s)" + scope = Key.expired_and_not_notified.order(order) - keys = user.expired_today_and_unnotified_keys + iterator = Gitlab::Pagination::Keyset::Iterator.new(scope: scope, use_union_optimization: true) + iterator.each_batch(of: BATCH_SIZE) do |relation| + users = User.where(id: relation.map(&:user_id)) # Keyset pagination will load the rows - Keys::ExpiryNotificationService.new(user, { keys: keys, expiring_soon: false }).execute + users.each do |user| + with_context(user: user) do + Keys::ExpiryNotificationService.new(user, { keys: user.expired_and_unnotified_keys, expiring_soon: false }).execute + end end - # rubocop:enable CodeReuse/ActiveRecord end end + # rubocop: enable CodeReuse/ActiveRecord end end diff --git a/app/workers/ssh_keys/expiring_soon_notification_worker.rb b/app/workers/ssh_keys/expiring_soon_notification_worker.rb index 1ec655b5cf5..d87e31c36a5 100644 --- a/app/workers/ssh_keys/expiring_soon_notification_worker.rb +++ b/app/workers/ssh_keys/expiring_soon_notification_worker.rb @@ -12,8 +12,6 @@ module SshKeys idempotent! def perform - return unless ::Feature.enabled?(:ssh_key_expiration_email_notification, default_enabled: :yaml) - # rubocop:disable CodeReuse/ActiveRecord User.with_ssh_key_expiring_soon.find_each(batch_size: 10_000) do |user| with_context(user: user) do diff --git a/app/workers/stuck_ci_jobs_worker.rb b/app/workers/stuck_ci_jobs_worker.rb index 6b9f90ce1fc..b3b3d6e7554 100644 --- a/app/workers/stuck_ci_jobs_worker.rb +++ b/app/workers/stuck_ci_jobs_worker.rb @@ -15,22 +15,46 @@ class StuckCiJobsWorker # rubocop:disable Scalability/IdempotentWorker BUILD_PENDING_OUTDATED_TIMEOUT = 1.day BUILD_SCHEDULED_OUTDATED_TIMEOUT = 1.hour BUILD_PENDING_STUCK_TIMEOUT = 1.hour + BUILD_LOOKBACK = 5.days def perform return unless try_obtain_lease Gitlab::AppLogger.info "#{self.class}: Cleaning stuck builds" - drop :running, BUILD_RUNNING_OUTDATED_TIMEOUT, 'ci_builds.updated_at < ?', :stuck_or_timeout_failure - drop :pending, BUILD_PENDING_OUTDATED_TIMEOUT, 'ci_builds.updated_at < ?', :stuck_or_timeout_failure - drop :scheduled, BUILD_SCHEDULED_OUTDATED_TIMEOUT, 'scheduled_at IS NOT NULL AND scheduled_at < ?', :stale_schedule - drop_stuck :pending, BUILD_PENDING_STUCK_TIMEOUT, 'ci_builds.updated_at < ?', :stuck_or_timeout_failure + drop(running_timed_out_builds, failure_reason: :stuck_or_timeout_failure) + + drop( + Ci::Build.pending.updated_before(lookback: BUILD_LOOKBACK.ago, timeout: BUILD_PENDING_OUTDATED_TIMEOUT.ago), + failure_reason: :stuck_or_timeout_failure + ) + + drop(scheduled_timed_out_builds, failure_reason: :stale_schedule) + + drop_stuck( + Ci::Build.pending.updated_before(lookback: BUILD_LOOKBACK.ago, timeout: BUILD_PENDING_STUCK_TIMEOUT.ago), + failure_reason: :stuck_or_timeout_failure + ) remove_lease end private + def scheduled_timed_out_builds + Ci::Build.where(status: :scheduled).where( # rubocop: disable CodeReuse/ActiveRecord + 'ci_builds.scheduled_at IS NOT NULL AND ci_builds.scheduled_at < ?', + BUILD_SCHEDULED_OUTDATED_TIMEOUT.ago + ) + end + + def running_timed_out_builds + Ci::Build.running.where( # rubocop: disable CodeReuse/ActiveRecord + 'ci_builds.updated_at < ?', + BUILD_RUNNING_OUTDATED_TIMEOUT.ago + ) + end + def try_obtain_lease @uuid = Gitlab::ExclusiveLease.new(EXCLUSIVE_LEASE_KEY, timeout: 30.minutes).try_obtain end @@ -39,28 +63,27 @@ class StuckCiJobsWorker # rubocop:disable Scalability/IdempotentWorker Gitlab::ExclusiveLease.cancel(EXCLUSIVE_LEASE_KEY, @uuid) end - def drop(status, timeout, condition, reason) - search(status, timeout, condition) do |build| - drop_build :outdated, build, status, timeout, reason + def drop(builds, failure_reason:) + fetch(builds) do |build| + drop_build :outdated, build, failure_reason end end - def drop_stuck(status, timeout, condition, reason) - search(status, timeout, condition) do |build| + def drop_stuck(builds, failure_reason:) + fetch(builds) do |build| break unless build.stuck? - drop_build :stuck, build, status, timeout, reason + drop_build :stuck, build, failure_reason end end # rubocop: disable CodeReuse/ActiveRecord - def search(status, timeout, condition) + def fetch(builds) loop do - jobs = Ci::Build.where(status: status) - .where(condition, timeout.ago) - .includes(:tags, :runner, project: [:namespace, :route]) + jobs = builds.includes(:tags, :runner, project: [:namespace, :route]) .limit(100) .to_a + break if jobs.empty? jobs.each do |job| @@ -70,8 +93,8 @@ class StuckCiJobsWorker # rubocop:disable Scalability/IdempotentWorker end # rubocop: enable CodeReuse/ActiveRecord - def drop_build(type, build, status, timeout, reason) - Gitlab::AppLogger.info "#{self.class}: Dropping #{type} build #{build.id} for runner #{build.runner_id} (status: #{status}, timeout: #{timeout}, reason: #{reason})" + def drop_build(type, build, reason) + Gitlab::AppLogger.info "#{self.class}: Dropping #{type} build #{build.id} for runner #{build.runner_id} (status: #{build.status}, failure_reason: #{reason})" Gitlab::OptimisticLocking.retry_lock(build, 3, name: 'stuck_ci_jobs_worker_drop_build') do |b| b.drop(reason) end diff --git a/app/workers/users/update_open_issue_count_worker.rb b/app/workers/users/update_open_issue_count_worker.rb deleted file mode 100644 index d9e313d53df..00000000000 --- a/app/workers/users/update_open_issue_count_worker.rb +++ /dev/null @@ -1,26 +0,0 @@ -# frozen_string_literal: true - -module Users - class UpdateOpenIssueCountWorker - include ApplicationWorker - - feature_category :users - tags :exclude_from_kubernetes - idempotent! - - def perform(target_user_ids) - target_user_ids = Array.wrap(target_user_ids) - - raise ArgumentError, 'No target user ID provided' if target_user_ids.empty? - - target_users = User.id_in(target_user_ids) - raise ArgumentError, 'No valid target user ID provided' if target_users.empty? - - target_users.each do |user| - Users::UpdateAssignedOpenIssueCountService.new(target_user: user).execute - end - rescue StandardError => exception - Gitlab::ErrorTracking.track_and_raise_for_dev_exception(exception) - end - end -end diff --git a/app/workers/web_hook_worker.rb b/app/workers/web_hook_worker.rb index dffab61dd0e..3480f49d640 100644 --- a/app/workers/web_hook_worker.rb +++ b/app/workers/web_hook_worker.rb @@ -8,6 +8,7 @@ class WebHookWorker feature_category :integrations worker_has_external_dependencies! loggable_arguments 2 + data_consistency :delayed, feature_flag: :load_balancing_for_web_hook_worker sidekiq_options retry: 4, dead: false @@ -15,7 +16,7 @@ class WebHookWorker hook = WebHook.find(hook_id) data = data.with_indifferent_access - WebHookService.new(hook, data, hook_name).execute + WebHookService.new(hook, data, hook_name, jid).execute end end # rubocop:enable Scalability/IdempotentWorker diff --git a/app/workers/web_hooks/log_execution_worker.rb b/app/workers/web_hooks/log_execution_worker.rb new file mode 100644 index 00000000000..58059370200 --- /dev/null +++ b/app/workers/web_hooks/log_execution_worker.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module WebHooks + class LogExecutionWorker + include ApplicationWorker + + idempotent! + feature_category :integrations + urgency :low + + # This worker accepts an extra argument. This enables us to + # treat this worker as idempotent. Currently this is set to + # the Job ID (jid) of the parent worker. + def perform(hook_id, log_data, response_category, _unique_by) + hook = WebHook.find_by_id(hook_id) + + return unless hook # hook has been deleted before we could run. + + ::WebHooks::LogExecutionService + .new(hook: hook, log_data: log_data, response_category: response_category.to_sym) + .execute + end + end +end |