summaryrefslogtreecommitdiff
path: root/app/workers
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml183
-rw-r--r--app/workers/analytics/instance_statistics/count_job_trigger_worker.rb24
-rw-r--r--app/workers/analytics/instance_statistics/counter_job_worker.rb23
-rw-r--r--app/workers/authorized_project_update/project_recalculate_worker.rb30
-rw-r--r--app/workers/authorized_project_update/user_refresh_from_replica_worker.rb15
-rw-r--r--app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb22
-rw-r--r--app/workers/build_hooks_worker.rb12
-rw-r--r--app/workers/build_queue_worker.rb3
-rw-r--r--app/workers/bulk_import_worker.rb7
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb2
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb25
-rw-r--r--app/workers/ci/initial_pipeline_process_worker.rb2
-rw-r--r--app/workers/ci/pipeline_artifacts/coverage_report_worker.rb1
-rw-r--r--app/workers/clusters/cleanup/app_worker.rb19
-rw-r--r--app/workers/concerns/application_worker.rb17
-rw-r--r--app/workers/concerns/security_scans_queue.rb2
-rw-r--r--app/workers/concerns/worker_attributes.rb27
-rw-r--r--app/workers/container_expiration_policies/cleanup_container_repository_worker.rb50
-rw-r--r--app/workers/container_expiration_policy_worker.rb27
-rw-r--r--app/workers/deployments/execute_hooks_worker.rb20
-rw-r--r--app/workers/expire_pipeline_cache_worker.rb8
-rw-r--r--app/workers/git_garbage_collect_worker.rb19
-rw-r--r--app/workers/incident_management/process_alert_worker.rb56
-rw-r--r--app/workers/incident_management/process_prometheus_alert_worker.rb23
-rw-r--r--app/workers/issue_placement_worker.rb9
-rw-r--r--app/workers/issue_rebalancing_worker.rb43
-rw-r--r--app/workers/jira_connect/sync_branch_worker.rb2
-rw-r--r--app/workers/jira_connect/sync_merge_request_worker.rb2
-rw-r--r--app/workers/merge_requests/assignees_change_worker.rb28
-rw-r--r--app/workers/packages/debian/generate_distribution_worker.rb44
-rw-r--r--app/workers/pipeline_hooks_worker.rb2
-rw-r--r--app/workers/pipeline_process_worker.rb10
-rw-r--r--app/workers/pipeline_update_worker.rb19
-rw-r--r--app/workers/project_schedule_bulk_repository_shard_moves_worker.rb15
-rw-r--r--app/workers/project_update_repository_storage_worker.rb15
-rw-r--r--app/workers/propagate_integration_worker.rb4
-rw-r--r--app/workers/prune_web_hook_logs_worker.rb24
-rw-r--r--app/workers/remove_unreferenced_lfs_objects_worker.rb12
-rw-r--r--app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb16
-rw-r--r--app/workers/snippet_update_repository_storage_worker.rb15
-rw-r--r--app/workers/ssh_keys/expired_notification_worker.rb33
-rw-r--r--app/workers/ssh_keys/expiring_soon_notification_worker.rb2
-rw-r--r--app/workers/stuck_ci_jobs_worker.rb55
-rw-r--r--app/workers/users/update_open_issue_count_worker.rb26
-rw-r--r--app/workers/web_hook_worker.rb3
-rw-r--r--app/workers/web_hooks/log_execution_worker.rb24
46 files changed, 393 insertions, 627 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index b216c2bff28..31c590183d1 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -21,6 +21,24 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: authorized_project_update:authorized_project_update_project_recalculate
+ :worker_name: AuthorizedProjectUpdate::ProjectRecalculateWorker
+ :feature_category: :authentication_and_authorization
+ :has_external_dependencies:
+ :urgency: :high
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
+- :name: authorized_project_update:authorized_project_update_user_refresh_from_replica
+ :worker_name: AuthorizedProjectUpdate::UserRefreshFromReplicaWorker
+ :feature_category: :authentication_and_authorization
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: authorized_project_update:authorized_project_update_user_refresh_over_user_range
:worker_name: AuthorizedProjectUpdate::UserRefreshOverUserRangeWorker
:feature_category: :authentication_and_authorization
@@ -144,16 +162,6 @@
:weight: 1
:idempotent:
:tags: []
-- :name: cronjob:analytics_instance_statistics_count_job_trigger
- :worker_name: Analytics::InstanceStatistics::CountJobTriggerWorker
- :feature_category: :devops_reports
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags:
- - :exclude_from_kubernetes
- :name: cronjob:analytics_usage_trends_count_job_trigger
:worker_name: Analytics::UsageTrends::CountJobTriggerWorker
:feature_category: :devops_reports
@@ -423,15 +431,6 @@
:weight: 1
:idempotent:
:tags: []
-- :name: cronjob:prune_web_hook_logs
- :worker_name: PruneWebHookLogsWorker
- :feature_category: :integrations
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent:
- :tags: []
- :name: cronjob:releases_manage_evidence
:worker_name: Releases::ManageEvidenceWorker
:feature_category: :release_evidence
@@ -477,7 +476,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
- :idempotent:
+ :idempotent: true
:tags: []
- :name: cronjob:repository_archive_cache
:worker_name: RepositoryArchiveCacheWorker
@@ -647,15 +646,6 @@
:idempotent:
:tags:
- :exclude_from_kubernetes
-- :name: deployment:deployments_execute_hooks
- :worker_name: Deployments::ExecuteHooksWorker
- :feature_category: :continuous_delivery
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :cpu
- :weight: 3
- :idempotent:
- :tags: []
- :name: deployment:deployments_finished
:worker_name: Deployments::FinishedWorker
:feature_category: :continuous_delivery
@@ -827,15 +817,6 @@
:weight: 1
:idempotent:
:tags: []
-- :name: gcp_cluster:clusters_cleanup_app
- :worker_name: Clusters::Cleanup::AppWorker
- :feature_category: :kubernetes_management
- :has_external_dependencies: true
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent:
- :tags: []
- :name: gcp_cluster:clusters_cleanup_project_namespace
:worker_name: Clusters::Cleanup::ProjectNamespaceWorker
:feature_category: :kubernetes_management
@@ -1088,15 +1069,6 @@
:weight: 2
:idempotent:
:tags: []
-- :name: incident_management:incident_management_process_alert
- :worker_name: IncidentManagement::ProcessAlertWorker
- :feature_category: :incident_management
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 2
- :idempotent:
- :tags: []
- :name: incident_management:incident_management_process_alert_worker_v2
:worker_name: IncidentManagement::ProcessAlertWorkerV2
:feature_category: :incident_management
@@ -1106,15 +1078,6 @@
:weight: 2
:idempotent: true
:tags: []
-- :name: incident_management:incident_management_process_prometheus_alert
- :worker_name: IncidentManagement::ProcessPrometheusAlertWorker
- :feature_category: :incident_management
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :cpu
- :weight: 2
- :idempotent:
- :tags: []
- :name: jira_connect:jira_connect_sync_branch
:worker_name: JiraConnect::SyncBranchWorker
:feature_category: :integrations
@@ -1317,6 +1280,15 @@
:weight: 1
:idempotent:
:tags: []
+- :name: package_repositories:packages_debian_generate_distribution
+ :worker_name: Packages::Debian::GenerateDistributionWorker
+ :feature_category: :package_registry
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: package_repositories:packages_debian_process_changes
:worker_name: Packages::Debian::ProcessChangesWorker
:feature_category: :package_registry
@@ -1401,8 +1373,7 @@
:resource_boundary: :unknown
:weight: 1
:idempotent: true
- :tags:
- - :exclude_from_kubernetes
+ :tags: []
- :name: pipeline_background:ci_pipeline_artifacts_create_quality_report
:worker_name: Ci::PipelineArtifacts::CreateQualityReportWorker
:feature_category: :code_testing
@@ -1457,7 +1428,7 @@
:urgency: :high
:resource_boundary: :cpu
:weight: 3
- :idempotent: true
+ :idempotent:
:tags: []
- :name: pipeline_creation:create_pipeline
:worker_name: CreatePipelineWorker
@@ -1564,7 +1535,7 @@
:worker_name: PipelineHooksWorker
:feature_category: :continuous_integration
:has_external_dependencies:
- :urgency: :high
+ :urgency: :low
:resource_boundary: :cpu
:weight: 2
:idempotent:
@@ -1639,15 +1610,6 @@
:urgency: :high
:resource_boundary: :unknown
:weight: 5
- :idempotent:
- :tags: []
-- :name: pipeline_processing:pipeline_update
- :worker_name: PipelineUpdateWorker
- :feature_category: :continuous_integration
- :has_external_dependencies:
- :urgency: :high
- :resource_boundary: :unknown
- :weight: 5
:idempotent: true
:tags: []
- :name: pipeline_processing:stage_update
@@ -1777,16 +1739,6 @@
:weight: 1
:idempotent: true
:tags: []
-- :name: analytics_instance_statistics_counter_job
- :worker_name: Analytics::InstanceStatistics::CounterJobWorker
- :feature_category: :devops_reports
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags:
- - :exclude_from_kubernetes
- :name: analytics_usage_trends_counter_job
:worker_name: Analytics::UsageTrends::CounterJobWorker
:feature_category: :devops_reports
@@ -2116,15 +2068,6 @@
:idempotent: true
:tags:
- :exclude_from_kubernetes
-- :name: git_garbage_collect
- :worker_name: GitGarbageCollectWorker
- :feature_category: :gitaly
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent:
- :tags: []
- :name: github_import_advance_stage
:worker_name: Gitlab::GithubImport::AdvanceStageWorker
:feature_category: :importers
@@ -2292,15 +2235,6 @@
:weight: 1
:idempotent: true
:tags: []
-- :name: merge_requests_assignees_change
- :worker_name: MergeRequests::AssigneesChangeWorker
- :feature_category: :source_code_management
- :has_external_dependencies:
- :urgency: :high
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
- :name: merge_requests_delete_source_branch
:worker_name: MergeRequests::DeleteSourceBranchWorker
:feature_category: :source_code_management
@@ -2570,15 +2504,6 @@
:weight: 1
:idempotent:
:tags: []
-- :name: project_schedule_bulk_repository_shard_moves
- :worker_name: ProjectScheduleBulkRepositoryShardMovesWorker
- :feature_category: :gitaly
- :has_external_dependencies:
- :urgency: :throttled
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
- :name: project_service
:worker_name: ProjectServiceWorker
:feature_category: :integrations
@@ -2588,15 +2513,6 @@
:weight: 1
:idempotent:
:tags: []
-- :name: project_update_repository_storage
- :worker_name: ProjectUpdateRepositoryStorageWorker
- :feature_category: :gitaly
- :has_external_dependencies:
- :urgency: :throttled
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
- :name: projects_git_garbage_collect
:worker_name: Projects::GitGarbageCollectWorker
:feature_category: :gitaly
@@ -2811,24 +2727,6 @@
:weight: 1
:idempotent:
:tags: []
-- :name: snippet_schedule_bulk_repository_shard_moves
- :worker_name: SnippetScheduleBulkRepositoryShardMovesWorker
- :feature_category: :gitaly
- :has_external_dependencies:
- :urgency: :throttled
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
-- :name: snippet_update_repository_storage
- :worker_name: SnippetUpdateRepositoryStorageWorker
- :feature_category: :gitaly
- :has_external_dependencies:
- :urgency: :throttled
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags: []
- :name: snippets_schedule_bulk_repository_shard_moves
:worker_name: Snippets::ScheduleBulkRepositoryShardMovesWorker
:feature_category: :gitaly
@@ -2901,16 +2799,6 @@
:weight: 1
:idempotent:
:tags: []
-- :name: users_update_open_issue_count
- :worker_name: Users::UpdateOpenIssueCountWorker
- :feature_category: :users
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent: true
- :tags:
- - :exclude_from_kubernetes
- :name: web_hook
:worker_name: WebHookWorker
:feature_category: :integrations
@@ -2930,6 +2818,15 @@
:idempotent: true
:tags:
- :exclude_from_kubernetes
+- :name: web_hooks_log_execution
+ :worker_name: WebHooks::LogExecutionWorker
+ :feature_category: :integrations
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: wikis_git_garbage_collect
:worker_name: Wikis::GitGarbageCollectWorker
:feature_category: :gitaly
diff --git a/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb b/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb
deleted file mode 100644
index 083c01b166d..00000000000
--- a/app/workers/analytics/instance_statistics/count_job_trigger_worker.rb
+++ /dev/null
@@ -1,24 +0,0 @@
-# frozen_string_literal: true
-
-module Analytics
- module InstanceStatistics
- # This worker will be removed in 14.0
- class CountJobTriggerWorker
- include ApplicationWorker
-
- sidekiq_options retry: 3
- include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
-
- feature_category :devops_reports
- tags :exclude_from_kubernetes
- urgency :low
-
- idempotent!
-
- def perform
- # Delegate to the new worker
- Analytics::UsageTrends::CountJobTriggerWorker.new.perform
- end
- end
- end
-end
diff --git a/app/workers/analytics/instance_statistics/counter_job_worker.rb b/app/workers/analytics/instance_statistics/counter_job_worker.rb
deleted file mode 100644
index a4dda45ff72..00000000000
--- a/app/workers/analytics/instance_statistics/counter_job_worker.rb
+++ /dev/null
@@ -1,23 +0,0 @@
-# frozen_string_literal: true
-
-module Analytics
- module InstanceStatistics
- # This worker will be removed in 14.0
- class CounterJobWorker
- include ApplicationWorker
-
- sidekiq_options retry: 3
-
- feature_category :devops_reports
- urgency :low
- tags :exclude_from_kubernetes
-
- idempotent!
-
- def perform(*args)
- # Delegate to the new worker
- Analytics::UsageTrends::CounterJobWorker.new.perform(*args)
- end
- end
- end
-end
diff --git a/app/workers/authorized_project_update/project_recalculate_worker.rb b/app/workers/authorized_project_update/project_recalculate_worker.rb
new file mode 100644
index 00000000000..3f0672992ef
--- /dev/null
+++ b/app/workers/authorized_project_update/project_recalculate_worker.rb
@@ -0,0 +1,30 @@
+# frozen_string_literal: true
+
+module AuthorizedProjectUpdate
+ class ProjectRecalculateWorker
+ include ApplicationWorker
+ include Gitlab::ExclusiveLeaseHelpers
+
+ feature_category :authentication_and_authorization
+ urgency :high
+ queue_namespace :authorized_project_update
+
+ deduplicate :until_executing, including_scheduled: true
+ idempotent!
+
+ def perform(project_id)
+ project = Project.find_by_id(project_id)
+ return unless project
+
+ in_lock(lock_key(project), ttl: 10.seconds) do
+ AuthorizedProjectUpdate::ProjectRecalculateService.new(project).execute
+ end
+ end
+
+ private
+
+ def lock_key(project)
+ "#{self.class.name.underscore}/#{project.root_namespace.id}"
+ end
+ end
+end
diff --git a/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb b/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb
new file mode 100644
index 00000000000..5ca9de63fd7
--- /dev/null
+++ b/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb
@@ -0,0 +1,15 @@
+# frozen_string_literal: true
+
+module AuthorizedProjectUpdate
+ class UserRefreshFromReplicaWorker < ::AuthorizedProjectsWorker
+ feature_category :authentication_and_authorization
+ urgency :low
+ queue_namespace :authorized_project_update
+ deduplicate :until_executing, including_scheduled: true
+
+ idempotent!
+
+ # This worker will start reading data from the replica database soon
+ # Issue: https://gitlab.com/gitlab-org/gitlab/-/issues/333219
+ end
+end
diff --git a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb
index 2e4e2dd3232..ab4d9c13422 100644
--- a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb
+++ b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb
@@ -2,10 +2,9 @@
module AuthorizedProjectUpdate
class UserRefreshOverUserRangeWorker # rubocop:disable Scalability/IdempotentWorker
- # When the feature flag named `periodic_project_authorization_update_via_replica` is enabled,
- # this worker checks if a specific user requires an update to their project_authorizations records.
+ # This worker checks if users requires an update to their project_authorizations records.
# This check is done via the data read from the database replica (and not from the primary).
- # If this check returns true, a completely new Sidekiq job is enqueued for this specific user
+ # If this check returns true, a completely new Sidekiq job is enqueued for a specific user
# so as to update its project_authorizations records.
# There is a possibility that the data in the replica is lagging behind the primary
@@ -24,25 +23,16 @@ module AuthorizedProjectUpdate
# `data_consistency :delayed` and not `idempotent!`
# See https://gitlab.com/gitlab-org/gitlab/-/issues/325291
deduplicate :until_executing, including_scheduled: true
- data_consistency :delayed, feature_flag: :delayed_consistency_for_user_refresh_over_range_worker
+ data_consistency :delayed
def perform(start_user_id, end_user_id)
- if Feature.enabled?(:periodic_project_authorization_update_via_replica)
- User.where(id: start_user_id..end_user_id).find_each do |user| # rubocop: disable CodeReuse/ActiveRecord
- enqueue_project_authorizations_refresh(user) if project_authorizations_needs_refresh?(user)
- end
- else
- use_primary_database
- AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute
+ User.where(id: start_user_id..end_user_id).find_each do |user| # rubocop: disable CodeReuse/ActiveRecord
+ enqueue_project_authorizations_refresh(user) if project_authorizations_needs_refresh?(user)
end
end
private
- def use_primary_database
- # no-op in CE, overriden in EE
- end
-
def project_authorizations_needs_refresh?(user)
AuthorizedProjectUpdate::FindRecordsDueForRefreshService.new(user).needs_refresh?
end
@@ -54,5 +44,3 @@ module AuthorizedProjectUpdate
end
end
end
-
-AuthorizedProjectUpdate::UserRefreshOverUserRangeWorker.prepend_mod_with('AuthorizedProjectUpdate::UserRefreshOverUserRangeWorker')
diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb
index be79d6b2afb..a0d1d9dca45 100644
--- a/app/workers/build_hooks_worker.rb
+++ b/app/workers/build_hooks_worker.rb
@@ -9,17 +9,7 @@ class BuildHooksWorker # rubocop:disable Scalability/IdempotentWorker
queue_namespace :pipeline_hooks
feature_category :continuous_integration
urgency :high
- data_consistency :delayed, feature_flag: :load_balancing_for_build_hooks_worker
-
- DATA_CONSISTENCY_DELAY = 3
-
- def self.perform_async(*args)
- if Feature.enabled?(:delayed_perform_for_build_hooks_worker, default_enabled: :yaml)
- perform_in(DATA_CONSISTENCY_DELAY.seconds, *args)
- else
- super
- end
- end
+ data_consistency :delayed
# rubocop: disable CodeReuse/ActiveRecord
def perform(build_id)
diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb
index e9bb2d88a81..aa3c03f773e 100644
--- a/app/workers/build_queue_worker.rb
+++ b/app/workers/build_queue_worker.rb
@@ -10,11 +10,12 @@ class BuildQueueWorker # rubocop:disable Scalability/IdempotentWorker
feature_category :continuous_integration
urgency :high
worker_resource_boundary :cpu
+ data_consistency :sticky, feature_flag: :load_balancing_for_build_queue_worker
# rubocop: disable CodeReuse/ActiveRecord
def perform(build_id)
Ci::Build.find_by(id: build_id).try do |build|
- Ci::UpdateBuildQueueService.new.execute(build)
+ Ci::UpdateBuildQueueService.new.tick(build)
end
end
# rubocop: enable CodeReuse/ActiveRecord
diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb
index 8ad31c68374..25a86ead76e 100644
--- a/app/workers/bulk_import_worker.rb
+++ b/app/workers/bulk_import_worker.rb
@@ -15,7 +15,8 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
@bulk_import = BulkImport.find_by_id(bulk_import_id)
return unless @bulk_import
- return if @bulk_import.finished?
+ return if @bulk_import.finished? || @bulk_import.failed?
+ return @bulk_import.fail_op! if all_entities_failed?
return @bulk_import.finish! if all_entities_processed? && @bulk_import.started?
return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running
@@ -55,6 +56,10 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
entities.all? { |entity| entity.finished? || entity.failed? }
end
+ def all_entities_failed?
+ entities.all? { |entity| entity.failed? }
+ end
+
def max_batch_size_exceeded?
started_entities.count >= DEFAULT_BATCH_SIZE
end
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index cccc24d3bdc..24e75ad0f85 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -24,7 +24,7 @@ module BulkImports
end
def http_client(configuration)
- @client ||= Clients::Http.new(
+ @client ||= Clients::HTTP.new(
uri: configuration.url,
token: configuration.access_token
)
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 256301bf097..d3297017714 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -4,6 +4,8 @@ module BulkImports
class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
+ NDJSON_PIPELINE_PERFORM_DELAY = 1.minute
+
feature_category :importers
tags :exclude_from_kubernetes
@@ -40,6 +42,15 @@ module BulkImports
private
def run(pipeline_tracker)
+ if ndjson_pipeline?(pipeline_tracker)
+ status = ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class.relation)
+
+ raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?(pipeline_tracker)
+ raise(Pipeline::FailedError, status.error) if status.failed?
+
+ return reenqueue(pipeline_tracker) if status.started?
+ end
+
pipeline_tracker.update!(status_event: 'start', jid: jid)
context = ::BulkImports::Pipeline::Context.new(pipeline_tracker)
@@ -48,7 +59,7 @@ module BulkImports
pipeline_tracker.finish!
rescue StandardError => e
- pipeline_tracker.fail_op!
+ pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error(
worker: self.class.name,
@@ -67,5 +78,17 @@ module BulkImports
def logger
@logger ||= Gitlab::Import::Logger.build
end
+
+ def ndjson_pipeline?(pipeline_tracker)
+ pipeline_tracker.pipeline_class.ndjson_pipeline?
+ end
+
+ def job_timeout?(pipeline_tracker)
+ (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
+ end
+
+ def reenqueue(pipeline_tracker)
+ self.class.perform_in(NDJSON_PIPELINE_PERFORM_DELAY, pipeline_tracker.id, pipeline_tracker.stage, pipeline_tracker.entity.id)
+ end
end
end
diff --git a/app/workers/ci/initial_pipeline_process_worker.rb b/app/workers/ci/initial_pipeline_process_worker.rb
index 4dace43298d..ca41a7fb577 100644
--- a/app/workers/ci/initial_pipeline_process_worker.rb
+++ b/app/workers/ci/initial_pipeline_process_worker.rb
@@ -15,7 +15,7 @@ module Ci
def perform(pipeline_id)
Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
- Ci::ProcessPipelineService
+ Ci::PipelineCreation::StartPipelineService
.new(pipeline)
.execute
end
diff --git a/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb
index dd7bfff4eb1..ec0cb69d0c7 100644
--- a/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb
+++ b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb
@@ -9,7 +9,6 @@ module Ci
include PipelineBackgroundQueue
feature_category :code_testing
- tags :exclude_from_kubernetes
idempotent!
diff --git a/app/workers/clusters/cleanup/app_worker.rb b/app/workers/clusters/cleanup/app_worker.rb
deleted file mode 100644
index 1d01cec174b..00000000000
--- a/app/workers/clusters/cleanup/app_worker.rb
+++ /dev/null
@@ -1,19 +0,0 @@
-# frozen_string_literal: true
-
-module Clusters
- module Cleanup
- class AppWorker # rubocop:disable Scalability/IdempotentWorker
- include ClusterCleanupMethods
-
- def perform(cluster_id, execution_count = 0)
- Clusters::Cluster.with_persisted_applications.find_by_id(cluster_id).try do |cluster|
- break unless cluster.cleanup_uninstalling_applications?
-
- break exceeded_execution_limit(cluster) if exceeded_execution_limit?(execution_count)
-
- ::Clusters::Cleanup::AppService.new(cluster, execution_count).execute
- end
- end
- end
- end
-end
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index 843be4896a3..3cba1eb31c5 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -13,6 +13,7 @@ module ApplicationWorker
include Gitlab::SidekiqVersioning::Worker
LOGGING_EXTRA_KEY = 'extra'
+ DEFAULT_DELAY_INTERVAL = 1
included do
set_queue
@@ -51,6 +52,16 @@ module ApplicationWorker
subclass.after_set_class_attribute { subclass.set_queue }
end
+ def perform_async(*args)
+ # Worker execution for workers with data_consistency set to :delayed or :sticky
+ # will be delayed to give replication enough time to complete
+ if utilizes_load_balancing_capabilities?
+ perform_in(delay_interval, *args)
+ else
+ super
+ end
+ end
+
def set_queue
queue_name = ::Gitlab::SidekiqConfig::WorkerRouter.global.route(self)
sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue
@@ -111,5 +122,11 @@ module ApplicationWorker
Sidekiq::Client.push_bulk('class' => self, 'args' => args_list, 'at' => schedule)
end
end
+
+ protected
+
+ def delay_interval
+ DEFAULT_DELAY_INTERVAL.seconds
+ end
end
end
diff --git a/app/workers/concerns/security_scans_queue.rb b/app/workers/concerns/security_scans_queue.rb
index f731317bb37..27e97169926 100644
--- a/app/workers/concerns/security_scans_queue.rb
+++ b/app/workers/concerns/security_scans_queue.rb
@@ -8,6 +8,6 @@ module SecurityScansQueue
included do
queue_namespace :security_scans
- feature_category :static_application_security_testing
+ feature_category :vulnerability_management
end
end
diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb
index 6dee9402691..096be808787 100644
--- a/app/workers/concerns/worker_attributes.rb
+++ b/app/workers/concerns/worker_attributes.rb
@@ -71,6 +71,20 @@ module WorkerAttributes
class_attributes[:urgency] || :low
end
+ # Allows configuring worker's data_consistency.
+ #
+ # Worker can utilize Sidekiq readonly database replicas capabilities by setting data_consistency attribute.
+ # Workers with data_consistency set to :delayed or :sticky, calling #perform_async
+ # will be delayed in order to give replication process enough time to complete.
+ #
+ # - *data_consistency* values:
+ # - 'always' - The job is required to use the primary database (default).
+ # - 'sticky' - The uses a replica as long as possible. It switches to primary either on write or long replication lag.
+ # - 'delayed' - The job would switch to primary only on write. It would use replica always.
+ # If there's a long replication lag the job will be delayed, and only if the replica is not up to date on the next retry,
+ # it will switch to the primary.
+ # - *feature_flag* - allows you to toggle a job's `data_consistency, which permits you to safely toggle load balancing capabilities for a specific job.
+ # If disabled, job will default to `:always`, which means that the job will always use the primary.
def data_consistency(data_consistency, feature_flag: nil)
raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency)
raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency]
@@ -85,11 +99,16 @@ module WorkerAttributes
# Since the deduplication should always take into account the latest binary replication pointer into account,
# not the first one, the deduplication will not work with sticky or delayed.
# Follow up issue to improve this: https://gitlab.com/gitlab-org/gitlab/-/issues/325291
- if idempotent? && get_data_consistency != :always
+ if idempotent? && utilizes_load_balancing_capabilities?
raise ArgumentError, "Class can't be marked as idempotent if data_consistency is not set to :always"
end
end
+ # If data_consistency is not set to :always, worker will try to utilize load balancing capabilities and use the replica
+ def utilizes_load_balancing_capabilities?
+ get_data_consistency != :always
+ end
+
def get_data_consistency
class_attributes[:data_consistency] || :always
end
@@ -167,6 +186,12 @@ module WorkerAttributes
class_attributes[:deduplication_options] || {}
end
+ def deduplication_enabled?
+ return true unless get_deduplication_options[:feature_flag]
+
+ Feature.enabled?(get_deduplication_options[:feature_flag], default_enabled: :yaml)
+ end
+
def big_payload!
set_class_attribute(:big_payload, true)
end
diff --git a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb
index 40cc233307a..3027d46b8b1 100644
--- a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb
+++ b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb
@@ -65,19 +65,9 @@ module ContainerExpirationPolicies
def container_repository
strong_memoize(:container_repository) do
ContainerRepository.transaction do
- # rubocop: disable CodeReuse/ActiveRecord
# We need a lock to prevent two workers from picking up the same row
- container_repository = if loopless_enabled?
- next_container_repository
- else
- ContainerRepository.waiting_for_cleanup
- .order(:expiration_policy_cleanup_status, :expiration_policy_started_at)
- .limit(1)
- .lock('FOR UPDATE SKIP LOCKED')
- .first
- end
-
- # rubocop: enable CodeReuse/ActiveRecord
+ container_repository = next_container_repository
+
container_repository&.tap(&:cleanup_ongoing!)
end
end
@@ -102,28 +92,20 @@ module ContainerExpirationPolicies
def cleanup_scheduled_count
strong_memoize(:cleanup_scheduled_count) do
- if loopless_enabled?
- limit = max_running_jobs + 1
- ContainerExpirationPolicy.with_container_repositories
- .runnable_schedules
- .limit(limit)
- .count
- else
- ContainerRepository.cleanup_scheduled.count
- end
+ limit = max_running_jobs + 1
+ ContainerExpirationPolicy.with_container_repositories
+ .runnable_schedules
+ .limit(limit)
+ .count
end
end
def cleanup_unfinished_count
strong_memoize(:cleanup_unfinished_count) do
- if loopless_enabled?
- limit = max_running_jobs + 1
- ContainerRepository.with_unfinished_cleanup
- .limit(limit)
- .count
- else
- ContainerRepository.cleanup_unfinished.count
- end
+ limit = max_running_jobs + 1
+ ContainerRepository.with_unfinished_cleanup
+ .limit(limit)
+ .count
end
end
@@ -132,21 +114,13 @@ module ContainerExpirationPolicies
now = Time.zone.now
- if loopless_enabled?
- policy.next_run_at < now || (now + max_cleanup_execution_time.seconds < policy.next_run_at)
- else
- now + max_cleanup_execution_time.seconds < policy.next_run_at
- end
+ policy.next_run_at < now || (now + max_cleanup_execution_time.seconds < policy.next_run_at)
end
def throttling_enabled?
Feature.enabled?(:container_registry_expiration_policies_throttling)
end
- def loopless_enabled?
- Feature.enabled?(:container_registry_expiration_policies_loopless)
- end
-
def max_cleanup_execution_time
::Gitlab::CurrentSettings.container_registry_delete_tags_service_timeout
end
diff --git a/app/workers/container_expiration_policy_worker.rb b/app/workers/container_expiration_policy_worker.rb
index dec13485d13..b15d1bf90bd 100644
--- a/app/workers/container_expiration_policy_worker.rb
+++ b/app/workers/container_expiration_policy_worker.rb
@@ -14,11 +14,18 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo
BATCH_SIZE = 1000
def perform
+ process_stale_ongoing_cleanups
throttling_enabled? ? perform_throttled : perform_unthrottled
end
private
+ def process_stale_ongoing_cleanups
+ threshold = delete_tags_service_timeout.seconds + 30.minutes
+ ContainerRepository.with_stale_ongoing_cleanup(threshold.ago)
+ .update_all(expiration_policy_cleanup_status: :cleanup_unfinished)
+ end
+
def perform_unthrottled
with_runnable_policy(preloaded: true) do |policy|
with_context(project: policy.project,
@@ -31,18 +38,6 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo
def perform_throttled
try_obtain_lease do
- unless loopless_enabled?
- with_runnable_policy do |policy|
- ContainerExpirationPolicy.transaction do
- policy.schedule_next_run!
- ContainerRepository.for_project_id(policy.id)
- .each_batch do |relation|
- relation.update_all(expiration_policy_cleanup_status: :cleanup_scheduled)
- end
- end
- end
- end
-
ContainerExpirationPolicies::CleanupContainerRepositoryWorker.perform_with_capacity
end
end
@@ -79,11 +74,11 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo
Feature.enabled?(:container_registry_expiration_policies_throttling)
end
- def loopless_enabled?
- Feature.enabled?(:container_registry_expiration_policies_loopless)
- end
-
def lease_timeout
5.hours
end
+
+ def delete_tags_service_timeout
+ ::Gitlab::CurrentSettings.current_application_settings.container_registry_delete_tags_service_timeout || 0
+ end
end
diff --git a/app/workers/deployments/execute_hooks_worker.rb b/app/workers/deployments/execute_hooks_worker.rb
deleted file mode 100644
index 3046aa28e20..00000000000
--- a/app/workers/deployments/execute_hooks_worker.rb
+++ /dev/null
@@ -1,20 +0,0 @@
-# frozen_string_literal: true
-
-module Deployments
- # TODO: remove in https://gitlab.com/gitlab-org/gitlab/-/issues/329360
- class ExecuteHooksWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- sidekiq_options retry: 3
-
- queue_namespace :deployment
- feature_category :continuous_delivery
- worker_resource_boundary :cpu
-
- def perform(deployment_id)
- if (deploy = Deployment.find_by_id(deployment_id))
- deploy.execute_hooks(Time.current)
- end
- end
- end
-end
diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb
index 3c48c4ba3cd..9702fac39ba 100644
--- a/app/workers/expire_pipeline_cache_worker.rb
+++ b/app/workers/expire_pipeline_cache_worker.rb
@@ -1,5 +1,6 @@
# frozen_string_literal: true
+# rubocop: disable Scalability/IdempotentWorker
class ExpirePipelineCacheWorker
include ApplicationWorker
@@ -9,8 +10,12 @@ class ExpirePipelineCacheWorker
queue_namespace :pipeline_cache
urgency :high
worker_resource_boundary :cpu
+ data_consistency :delayed, feature_flag: :load_balancing_for_expire_pipeline_cache_worker
- idempotent!
+ # This worker _should_ be idempotent, but due to us moving this to data_consistency :delayed
+ # and an ongoing incompatibility between the two switches, we need to disable this.
+ # Uncomment once https://gitlab.com/gitlab-org/gitlab/-/issues/325291 is resolved
+ # idempotent!
# rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id)
@@ -21,3 +26,4 @@ class ExpirePipelineCacheWorker
end
# rubocop: enable CodeReuse/ActiveRecord
end
+# rubocop:enable Scalability/IdempotentWorker
diff --git a/app/workers/git_garbage_collect_worker.rb b/app/workers/git_garbage_collect_worker.rb
deleted file mode 100644
index a2aab23db7b..00000000000
--- a/app/workers/git_garbage_collect_worker.rb
+++ /dev/null
@@ -1,19 +0,0 @@
-# frozen_string_literal: true
-
-# According to our docs, we can only remove workers on major releases
-# https://docs.gitlab.com/ee/development/sidekiq_style_guide.html#removing-workers.
-#
-# We need to still maintain this until 14.0 but with the current functionality.
-#
-# In https://gitlab.com/gitlab-org/gitlab/-/issues/299290 we track that removal.
-class GitGarbageCollectWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- sidekiq_options retry: false
- feature_category :gitaly
- loggable_arguments 1, 2, 3
-
- def perform(project_id, task = :gc, lease_key = nil, lease_uuid = nil)
- ::Projects::GitGarbageCollectWorker.new.perform(project_id, task, lease_key, lease_uuid)
- end
-end
diff --git a/app/workers/incident_management/process_alert_worker.rb b/app/workers/incident_management/process_alert_worker.rb
deleted file mode 100644
index 3b90e296ad4..00000000000
--- a/app/workers/incident_management/process_alert_worker.rb
+++ /dev/null
@@ -1,56 +0,0 @@
-# frozen_string_literal: true
-
-module IncidentManagement
- class ProcessAlertWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- sidekiq_options retry: 3
-
- queue_namespace :incident_management
- feature_category :incident_management
-
- # `project_id` and `alert_payload` are deprecated and can be removed
- # starting from 14.0 release
- # https://gitlab.com/gitlab-org/gitlab/-/issues/224500
- #
- # This worker is not scheduled anymore since
- # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/60285
- # and will be removed completely via
- # https://gitlab.com/gitlab-org/gitlab/-/issues/224500
- # in 14.0.
- def perform(_project_id = nil, _alert_payload = nil, alert_id = nil)
- return unless alert_id
-
- alert = find_alert(alert_id)
- return unless alert
-
- result = create_issue_for(alert)
- return if result.success?
-
- log_warning(alert, result)
- end
-
- private
-
- def find_alert(alert_id)
- AlertManagement::Alert.find_by_id(alert_id)
- end
-
- def create_issue_for(alert)
- AlertManagement::CreateAlertIssueService
- .new(alert, User.alert_bot)
- .execute
- end
-
- def log_warning(alert, result)
- issue_id = result.payload[:issue]&.id
-
- Gitlab::AppLogger.warn(
- message: 'Cannot process an Incident',
- issue_id: issue_id,
- alert_id: alert.id,
- errors: result.message
- )
- end
- end
-end
diff --git a/app/workers/incident_management/process_prometheus_alert_worker.rb b/app/workers/incident_management/process_prometheus_alert_worker.rb
deleted file mode 100644
index 7b5c6fd9001..00000000000
--- a/app/workers/incident_management/process_prometheus_alert_worker.rb
+++ /dev/null
@@ -1,23 +0,0 @@
-# frozen_string_literal: true
-
-module IncidentManagement
- class ProcessPrometheusAlertWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- sidekiq_options retry: 3
-
- queue_namespace :incident_management
- feature_category :incident_management
- worker_resource_boundary :cpu
-
- def perform(project_id, alert_hash)
- # no-op
- #
- # This worker is not scheduled anymore since
- # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/35943
- # and will be removed completely via
- # https://gitlab.com/gitlab-org/gitlab/-/issues/227146
- # in 14.0.
- end
- end
-end
diff --git a/app/workers/issue_placement_worker.rb b/app/workers/issue_placement_worker.rb
index dba791c3f05..8166dda135e 100644
--- a/app/workers/issue_placement_worker.rb
+++ b/app/workers/issue_placement_worker.rb
@@ -41,7 +41,7 @@ class IssuePlacementWorker
IssuePlacementWorker.perform_async(nil, leftover.project_id) if leftover.present?
rescue RelativePositioning::NoSpaceLeft => e
Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, project_id: project_id)
- IssueRebalancingWorker.perform_async(nil, project_id.presence || issue.project_id)
+ IssueRebalancingWorker.perform_async(nil, *root_namespace_id_to_rebalance(issue, project_id))
end
def find_issue(issue_id, project_id)
@@ -53,4 +53,11 @@ class IssuePlacementWorker
project.issues.take
end
# rubocop: enable CodeReuse/ActiveRecord
+
+ private
+
+ def root_namespace_id_to_rebalance(issue, project_id)
+ project_id = project_id.presence || issue.project_id
+ Project.find(project_id)&.self_or_root_group_ids
+ end
end
diff --git a/app/workers/issue_rebalancing_worker.rb b/app/workers/issue_rebalancing_worker.rb
index 9eac451f107..66ef7dd3152 100644
--- a/app/workers/issue_rebalancing_worker.rb
+++ b/app/workers/issue_rebalancing_worker.rb
@@ -9,21 +9,44 @@ class IssueRebalancingWorker
urgency :low
feature_category :issue_tracking
tags :exclude_from_kubernetes
+ deduplicate :until_executed, including_scheduled: true
- def perform(ignore = nil, project_id = nil)
- return if project_id.nil?
+ def perform(ignore = nil, project_id = nil, root_namespace_id = nil)
+ # we need to have exactly one of the project_id and root_namespace_id params be non-nil
+ raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id
+ return if project_id.nil? && root_namespace_id.nil?
- project = Project.find(project_id)
+ # pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce)
+ # or the root namespace, this also makes the worker backward compatible with previous version where a project_id was
+ # passed as the param
+ projects_to_rebalance = projects_collection(project_id, root_namespace_id)
- # Temporary disable reabalancing for performance reasons
+ # something might have happened with the namespace between scheduling the worker and actually running it,
+ # maybe it was removed.
+ if projects_to_rebalance.blank?
+ Gitlab::ErrorTracking.log_exception(
+ ArgumentError.new("Projects to be rebalanced not found for arguments: project_id #{project_id}, root_namespace_id: #{root_namespace_id}"),
+ { project_id: project_id, root_namespace_id: root_namespace_id })
+
+ return
+ end
+
+ # Temporary disable rebalancing for performance reasons
# For more information check https://gitlab.com/gitlab-com/gl-infra/production/-/issues/4321
- return if project.root_namespace&.issue_repositioning_disabled?
+ return if projects_to_rebalance.take&.root_namespace&.issue_repositioning_disabled? # rubocop:disable CodeReuse/ActiveRecord
+
+ IssueRebalancingService.new(projects_to_rebalance).execute
+ rescue IssueRebalancingService::TooManyIssues => e
+ Gitlab::ErrorTracking.log_exception(e, root_namespace_id: root_namespace_id, project_id: project_id)
+ end
+
+ private
- # All issues are equivalent as far as we are concerned
- issue = project.issues.take # rubocop: disable CodeReuse/ActiveRecord
+ def projects_collection(project_id, root_namespace_id)
+ # we can have either project_id(older version) or project_id if project is part of a user namespace and not a group
+ # or root_namespace_id(newer version) never both.
+ return Project.id_in([project_id]) if project_id
- IssueRebalancingService.new(issue).execute
- rescue ActiveRecord::RecordNotFound, IssueRebalancingService::TooManyIssues => e
- Gitlab::ErrorTracking.log_exception(e, project_id: project_id)
+ Namespace.find_by_id(root_namespace_id)&.all_projects
end
end
diff --git a/app/workers/jira_connect/sync_branch_worker.rb b/app/workers/jira_connect/sync_branch_worker.rb
index b8211286d1c..4e8566d86c9 100644
--- a/app/workers/jira_connect/sync_branch_worker.rb
+++ b/app/workers/jira_connect/sync_branch_worker.rb
@@ -1,7 +1,7 @@
# frozen_string_literal: true
module JiraConnect
- class SyncBranchWorker # rubocop:disable Scalability/IdempotentWorker
+ class SyncBranchWorker
include ApplicationWorker
sidekiq_options retry: 3
diff --git a/app/workers/jira_connect/sync_merge_request_worker.rb b/app/workers/jira_connect/sync_merge_request_worker.rb
index 6b3a6ae84ad..bf31df2271f 100644
--- a/app/workers/jira_connect/sync_merge_request_worker.rb
+++ b/app/workers/jira_connect/sync_merge_request_worker.rb
@@ -1,7 +1,7 @@
# frozen_string_literal: true
module JiraConnect
- class SyncMergeRequestWorker # rubocop:disable Scalability/IdempotentWorker
+ class SyncMergeRequestWorker
include ApplicationWorker
sidekiq_options retry: 3
diff --git a/app/workers/merge_requests/assignees_change_worker.rb b/app/workers/merge_requests/assignees_change_worker.rb
deleted file mode 100644
index fe39f20151f..00000000000
--- a/app/workers/merge_requests/assignees_change_worker.rb
+++ /dev/null
@@ -1,28 +0,0 @@
-# frozen_string_literal: true
-
-class MergeRequests::AssigneesChangeWorker
- include ApplicationWorker
-
- sidekiq_options retry: 3
-
- feature_category :source_code_management
- urgency :high
- deduplicate :until_executed
- idempotent!
-
- def perform(merge_request_id, user_id, old_assignee_ids)
- merge_request = MergeRequest.find(merge_request_id)
- current_user = User.find(user_id)
-
- # if a user was added and then removed, or removed and then added
- # while waiting for this job to run, assume that nothing happened.
- users = User.id_in(old_assignee_ids - merge_request.assignee_ids)
-
- return if users.blank?
-
- ::MergeRequests::HandleAssigneesChangeService
- .new(project: merge_request.target_project, current_user: current_user)
- .execute(merge_request, users, execute_hooks: true)
- rescue ActiveRecord::RecordNotFound
- end
-end
diff --git a/app/workers/packages/debian/generate_distribution_worker.rb b/app/workers/packages/debian/generate_distribution_worker.rb
new file mode 100644
index 00000000000..68fdd80ffb1
--- /dev/null
+++ b/app/workers/packages/debian/generate_distribution_worker.rb
@@ -0,0 +1,44 @@
+# frozen_string_literal: true
+
+module Packages
+ module Debian
+ class GenerateDistributionWorker # rubocop:disable Scalability/IdempotentWorker
+ include ApplicationWorker
+ include Gitlab::Utils::StrongMemoize
+
+ # The worker is idempotent, by reusing component files with the same file_sha256.
+ #
+ # See GenerateDistributionService#find_or_create_component_file
+ deduplicate :until_executed
+ idempotent!
+
+ queue_namespace :package_repositories
+ feature_category :package_registry
+
+ loggable_arguments 0
+
+ def perform(container_type, distribution_id)
+ @container_type = container_type
+ @distribution_id = distribution_id
+
+ return unless distribution
+
+ ::Packages::Debian::GenerateDistributionService.new(distribution).execute
+ end
+
+ private
+
+ def container_class
+ return ::Packages::Debian::GroupDistribution if @container_type == :group
+
+ ::Packages::Debian::ProjectDistribution
+ end
+
+ def distribution
+ strong_memoize(:distribution) do
+ container_class.find_by_id(@distribution_id)
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb
index fbb672f52e3..97e6adbbf18 100644
--- a/app/workers/pipeline_hooks_worker.rb
+++ b/app/workers/pipeline_hooks_worker.rb
@@ -7,8 +7,8 @@ class PipelineHooksWorker # rubocop:disable Scalability/IdempotentWorker
include PipelineQueue
queue_namespace :pipeline_hooks
- urgency :high
worker_resource_boundary :cpu
+ data_consistency :delayed, feature_flag: :load_balancing_for_pipeline_hooks_worker
# rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id)
diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb
index dc14789fe73..a35b32c35f2 100644
--- a/app/workers/pipeline_process_worker.rb
+++ b/app/workers/pipeline_process_worker.rb
@@ -1,6 +1,6 @@
# frozen_string_literal: true
-class PipelineProcessWorker # rubocop:disable Scalability/IdempotentWorker
+class PipelineProcessWorker
include ApplicationWorker
sidekiq_options retry: 3
@@ -10,12 +10,12 @@ class PipelineProcessWorker # rubocop:disable Scalability/IdempotentWorker
feature_category :continuous_integration
urgency :high
loggable_arguments 1
- data_consistency :delayed, feature_flag: :load_balancing_for_pipeline_process_worker
+
+ idempotent!
+ deduplicate :until_executing, feature_flag: :ci_idempotent_pipeline_process_worker
# rubocop: disable CodeReuse/ActiveRecord
- # `_build_ids` is deprecated and will be removed in 14.0
- # See: https://gitlab.com/gitlab-org/gitlab/-/issues/232806
- def perform(pipeline_id, _build_ids = nil)
+ def perform(pipeline_id)
Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline|
Ci::ProcessPipelineService
.new(pipeline)
diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb
deleted file mode 100644
index e8feb4f2db2..00000000000
--- a/app/workers/pipeline_update_worker.rb
+++ /dev/null
@@ -1,19 +0,0 @@
-# frozen_string_literal: true
-
-# This worker is deprecated and will be removed in 14.0
-# See: https://gitlab.com/gitlab-org/gitlab/-/issues/232806
-class PipelineUpdateWorker
- include ApplicationWorker
-
- sidekiq_options retry: 3
- include PipelineQueue
-
- queue_namespace :pipeline_processing
- urgency :high
-
- idempotent!
-
- def perform(_pipeline_id)
- # no-op
- end
-end
diff --git a/app/workers/project_schedule_bulk_repository_shard_moves_worker.rb b/app/workers/project_schedule_bulk_repository_shard_moves_worker.rb
deleted file mode 100644
index 23d1594e4d9..00000000000
--- a/app/workers/project_schedule_bulk_repository_shard_moves_worker.rb
+++ /dev/null
@@ -1,15 +0,0 @@
-# frozen_string_literal: true
-
-# This is a compatibility class to avoid calling a non-existent
-# class from sidekiq during deployment.
-#
-# This class was moved to a namespace in https://gitlab.com/gitlab-org/gitlab/-/issues/299853.
-# we cannot remove this class entirely because there can be jobs
-# referencing it.
-#
-# We can get rid of this class in 14.0
-# https://gitlab.com/gitlab-org/gitlab/-/issues/322393
-class ProjectScheduleBulkRepositoryShardMovesWorker < Projects::ScheduleBulkRepositoryShardMovesWorker
- idempotent!
- urgency :throttled
-end
diff --git a/app/workers/project_update_repository_storage_worker.rb b/app/workers/project_update_repository_storage_worker.rb
deleted file mode 100644
index 0d68c0e16f8..00000000000
--- a/app/workers/project_update_repository_storage_worker.rb
+++ /dev/null
@@ -1,15 +0,0 @@
-# frozen_string_literal: true
-
-# This is a compatibility class to avoid calling a non-existent
-# class from sidekiq during deployment.
-#
-# This class was moved to a namespace in https://gitlab.com/gitlab-org/gitlab/-/issues/299853.
-# we cannot remove this class entirely because there can be jobs
-# referencing it.
-#
-# We can get rid of this class in 14.0
-# https://gitlab.com/gitlab-org/gitlab/-/issues/322393
-class ProjectUpdateRepositoryStorageWorker < Projects::UpdateRepositoryStorageWorker
- idempotent!
- urgency :throttled
-end
diff --git a/app/workers/propagate_integration_worker.rb b/app/workers/propagate_integration_worker.rb
index 5e694529bc0..0f8229bdf09 100644
--- a/app/workers/propagate_integration_worker.rb
+++ b/app/workers/propagate_integration_worker.rb
@@ -9,9 +9,7 @@ class PropagateIntegrationWorker
idempotent!
loggable_arguments 1
- # TODO: Keep overwrite parameter for backwards compatibility. Remove after >= 14.0
- # https://gitlab.com/gitlab-org/gitlab/-/issues/255382
- def perform(integration_id, overwrite = nil)
+ def perform(integration_id)
Admin::PropagateIntegrationService.propagate(Integration.find(integration_id))
end
end
diff --git a/app/workers/prune_web_hook_logs_worker.rb b/app/workers/prune_web_hook_logs_worker.rb
deleted file mode 100644
index abfaabbf01d..00000000000
--- a/app/workers/prune_web_hook_logs_worker.rb
+++ /dev/null
@@ -1,24 +0,0 @@
-# frozen_string_literal: true
-
-# Worker that deletes a fixed number of outdated rows from the "web_hook_logs"
-# table.
-class PruneWebHookLogsWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- sidekiq_options retry: 3
- # rubocop:disable Scalability/CronWorkerContext
- # This worker does not perform work scoped to a context
- include CronjobQueue
- # rubocop:enable Scalability/CronWorkerContext
-
- feature_category :integrations
-
- # The maximum number of rows to remove in a single job.
- DELETE_LIMIT = 50_000
-
- def perform
- cutoff_date = 90.days.ago.beginning_of_day
-
- WebHookLog.created_before(cutoff_date).delete_with_limit(DELETE_LIMIT)
- end
-end
diff --git a/app/workers/remove_unreferenced_lfs_objects_worker.rb b/app/workers/remove_unreferenced_lfs_objects_worker.rb
index b42883549ca..ca4b70a0485 100644
--- a/app/workers/remove_unreferenced_lfs_objects_worker.rb
+++ b/app/workers/remove_unreferenced_lfs_objects_worker.rb
@@ -1,6 +1,6 @@
# frozen_string_literal: true
-class RemoveUnreferencedLfsObjectsWorker # rubocop:disable Scalability/IdempotentWorker
+class RemoveUnreferencedLfsObjectsWorker
include ApplicationWorker
sidekiq_options retry: 3
@@ -10,8 +10,16 @@ class RemoveUnreferencedLfsObjectsWorker # rubocop:disable Scalability/Idempoten
# rubocop:enable Scalability/CronWorkerContext
feature_category :git_lfs
+ deduplicate :until_executed
+ idempotent!
def perform
- LfsObject.destroy_unreferenced
+ number_of_removed_files = 0
+
+ LfsObject.unreferenced_in_batches do |lfs_objects_without_projects|
+ number_of_removed_files += lfs_objects_without_projects.destroy_all.count # rubocop: disable Cop/DestroyAll
+ end
+
+ number_of_removed_files
end
end
diff --git a/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb b/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb
deleted file mode 100644
index 94a6b22538b..00000000000
--- a/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb
+++ /dev/null
@@ -1,16 +0,0 @@
-# frozen_string_literal: true
-
-# This is a compatibility class to avoid calling a non-existent
-# class from sidekiq during deployment.
-#
-# This class was moved to a namespace in https://gitlab.com/gitlab-org/gitlab/-/issues/299853.
-# we cannot remove this class entirely because there can be jobs
-# referencing it.
-#
-# We can get rid of this class in 14.0
-# https://gitlab.com/gitlab-org/gitlab/-/issues/322393
-class SnippetScheduleBulkRepositoryShardMovesWorker < Snippets::ScheduleBulkRepositoryShardMovesWorker
- idempotent!
- feature_category :gitaly
- urgency :throttled
-end
diff --git a/app/workers/snippet_update_repository_storage_worker.rb b/app/workers/snippet_update_repository_storage_worker.rb
deleted file mode 100644
index befae6db4f4..00000000000
--- a/app/workers/snippet_update_repository_storage_worker.rb
+++ /dev/null
@@ -1,15 +0,0 @@
-# frozen_string_literal: true
-
-# This is a compatibility class to avoid calling a non-existent
-# class from sidekiq during deployment.
-#
-# This class was moved to a namespace in https://gitlab.com/gitlab-org/gitlab/-/issues/299853.
-# we cannot remove this class entirely because there can be jobs
-# referencing it.
-#
-# We can get rid of this class in 14.0
-# https://gitlab.com/gitlab-org/gitlab/-/issues/322393
-class SnippetUpdateRepositoryStorageWorker < Snippets::UpdateRepositoryStorageWorker # rubocop:disable Scalability/IdempotentWorker
- idempotent!
- urgency :throttled
-end
diff --git a/app/workers/ssh_keys/expired_notification_worker.rb b/app/workers/ssh_keys/expired_notification_worker.rb
index 9d5143fe655..b67849942b0 100644
--- a/app/workers/ssh_keys/expired_notification_worker.rb
+++ b/app/workers/ssh_keys/expired_notification_worker.rb
@@ -11,20 +11,37 @@ module SshKeys
tags :exclude_from_kubernetes
idempotent!
+ BATCH_SIZE = 500
+
+ # rubocop: disable CodeReuse/ActiveRecord
def perform
- return unless ::Feature.enabled?(:ssh_key_expiration_email_notification, default_enabled: :yaml)
+ order = Gitlab::Pagination::Keyset::Order.build([
+ Gitlab::Pagination::Keyset::ColumnOrderDefinition.new(
+ attribute_name: 'expires_at_utc',
+ order_expression: Arel.sql("date(expires_at AT TIME ZONE 'UTC')").asc,
+ nullable: :not_nullable,
+ distinct: false,
+ add_to_projections: true
+ ),
+ Gitlab::Pagination::Keyset::ColumnOrderDefinition.new(
+ attribute_name: 'id',
+ order_expression: Key.arel_table[:id].asc
+ )
+ ])
- # rubocop:disable CodeReuse/ActiveRecord
- User.with_ssh_key_expired_today.find_each(batch_size: 10_000) do |user|
- with_context(user: user) do
- Gitlab::AppLogger.info "#{self.class}: Notifying User #{user.id} about expired ssh key(s)"
+ scope = Key.expired_and_not_notified.order(order)
- keys = user.expired_today_and_unnotified_keys
+ iterator = Gitlab::Pagination::Keyset::Iterator.new(scope: scope, use_union_optimization: true)
+ iterator.each_batch(of: BATCH_SIZE) do |relation|
+ users = User.where(id: relation.map(&:user_id)) # Keyset pagination will load the rows
- Keys::ExpiryNotificationService.new(user, { keys: keys, expiring_soon: false }).execute
+ users.each do |user|
+ with_context(user: user) do
+ Keys::ExpiryNotificationService.new(user, { keys: user.expired_and_unnotified_keys, expiring_soon: false }).execute
+ end
end
- # rubocop:enable CodeReuse/ActiveRecord
end
end
+ # rubocop: enable CodeReuse/ActiveRecord
end
end
diff --git a/app/workers/ssh_keys/expiring_soon_notification_worker.rb b/app/workers/ssh_keys/expiring_soon_notification_worker.rb
index 1ec655b5cf5..d87e31c36a5 100644
--- a/app/workers/ssh_keys/expiring_soon_notification_worker.rb
+++ b/app/workers/ssh_keys/expiring_soon_notification_worker.rb
@@ -12,8 +12,6 @@ module SshKeys
idempotent!
def perform
- return unless ::Feature.enabled?(:ssh_key_expiration_email_notification, default_enabled: :yaml)
-
# rubocop:disable CodeReuse/ActiveRecord
User.with_ssh_key_expiring_soon.find_each(batch_size: 10_000) do |user|
with_context(user: user) do
diff --git a/app/workers/stuck_ci_jobs_worker.rb b/app/workers/stuck_ci_jobs_worker.rb
index 6b9f90ce1fc..b3b3d6e7554 100644
--- a/app/workers/stuck_ci_jobs_worker.rb
+++ b/app/workers/stuck_ci_jobs_worker.rb
@@ -15,22 +15,46 @@ class StuckCiJobsWorker # rubocop:disable Scalability/IdempotentWorker
BUILD_PENDING_OUTDATED_TIMEOUT = 1.day
BUILD_SCHEDULED_OUTDATED_TIMEOUT = 1.hour
BUILD_PENDING_STUCK_TIMEOUT = 1.hour
+ BUILD_LOOKBACK = 5.days
def perform
return unless try_obtain_lease
Gitlab::AppLogger.info "#{self.class}: Cleaning stuck builds"
- drop :running, BUILD_RUNNING_OUTDATED_TIMEOUT, 'ci_builds.updated_at < ?', :stuck_or_timeout_failure
- drop :pending, BUILD_PENDING_OUTDATED_TIMEOUT, 'ci_builds.updated_at < ?', :stuck_or_timeout_failure
- drop :scheduled, BUILD_SCHEDULED_OUTDATED_TIMEOUT, 'scheduled_at IS NOT NULL AND scheduled_at < ?', :stale_schedule
- drop_stuck :pending, BUILD_PENDING_STUCK_TIMEOUT, 'ci_builds.updated_at < ?', :stuck_or_timeout_failure
+ drop(running_timed_out_builds, failure_reason: :stuck_or_timeout_failure)
+
+ drop(
+ Ci::Build.pending.updated_before(lookback: BUILD_LOOKBACK.ago, timeout: BUILD_PENDING_OUTDATED_TIMEOUT.ago),
+ failure_reason: :stuck_or_timeout_failure
+ )
+
+ drop(scheduled_timed_out_builds, failure_reason: :stale_schedule)
+
+ drop_stuck(
+ Ci::Build.pending.updated_before(lookback: BUILD_LOOKBACK.ago, timeout: BUILD_PENDING_STUCK_TIMEOUT.ago),
+ failure_reason: :stuck_or_timeout_failure
+ )
remove_lease
end
private
+ def scheduled_timed_out_builds
+ Ci::Build.where(status: :scheduled).where( # rubocop: disable CodeReuse/ActiveRecord
+ 'ci_builds.scheduled_at IS NOT NULL AND ci_builds.scheduled_at < ?',
+ BUILD_SCHEDULED_OUTDATED_TIMEOUT.ago
+ )
+ end
+
+ def running_timed_out_builds
+ Ci::Build.running.where( # rubocop: disable CodeReuse/ActiveRecord
+ 'ci_builds.updated_at < ?',
+ BUILD_RUNNING_OUTDATED_TIMEOUT.ago
+ )
+ end
+
def try_obtain_lease
@uuid = Gitlab::ExclusiveLease.new(EXCLUSIVE_LEASE_KEY, timeout: 30.minutes).try_obtain
end
@@ -39,28 +63,27 @@ class StuckCiJobsWorker # rubocop:disable Scalability/IdempotentWorker
Gitlab::ExclusiveLease.cancel(EXCLUSIVE_LEASE_KEY, @uuid)
end
- def drop(status, timeout, condition, reason)
- search(status, timeout, condition) do |build|
- drop_build :outdated, build, status, timeout, reason
+ def drop(builds, failure_reason:)
+ fetch(builds) do |build|
+ drop_build :outdated, build, failure_reason
end
end
- def drop_stuck(status, timeout, condition, reason)
- search(status, timeout, condition) do |build|
+ def drop_stuck(builds, failure_reason:)
+ fetch(builds) do |build|
break unless build.stuck?
- drop_build :stuck, build, status, timeout, reason
+ drop_build :stuck, build, failure_reason
end
end
# rubocop: disable CodeReuse/ActiveRecord
- def search(status, timeout, condition)
+ def fetch(builds)
loop do
- jobs = Ci::Build.where(status: status)
- .where(condition, timeout.ago)
- .includes(:tags, :runner, project: [:namespace, :route])
+ jobs = builds.includes(:tags, :runner, project: [:namespace, :route])
.limit(100)
.to_a
+
break if jobs.empty?
jobs.each do |job|
@@ -70,8 +93,8 @@ class StuckCiJobsWorker # rubocop:disable Scalability/IdempotentWorker
end
# rubocop: enable CodeReuse/ActiveRecord
- def drop_build(type, build, status, timeout, reason)
- Gitlab::AppLogger.info "#{self.class}: Dropping #{type} build #{build.id} for runner #{build.runner_id} (status: #{status}, timeout: #{timeout}, reason: #{reason})"
+ def drop_build(type, build, reason)
+ Gitlab::AppLogger.info "#{self.class}: Dropping #{type} build #{build.id} for runner #{build.runner_id} (status: #{build.status}, failure_reason: #{reason})"
Gitlab::OptimisticLocking.retry_lock(build, 3, name: 'stuck_ci_jobs_worker_drop_build') do |b|
b.drop(reason)
end
diff --git a/app/workers/users/update_open_issue_count_worker.rb b/app/workers/users/update_open_issue_count_worker.rb
deleted file mode 100644
index d9e313d53df..00000000000
--- a/app/workers/users/update_open_issue_count_worker.rb
+++ /dev/null
@@ -1,26 +0,0 @@
-# frozen_string_literal: true
-
-module Users
- class UpdateOpenIssueCountWorker
- include ApplicationWorker
-
- feature_category :users
- tags :exclude_from_kubernetes
- idempotent!
-
- def perform(target_user_ids)
- target_user_ids = Array.wrap(target_user_ids)
-
- raise ArgumentError, 'No target user ID provided' if target_user_ids.empty?
-
- target_users = User.id_in(target_user_ids)
- raise ArgumentError, 'No valid target user ID provided' if target_users.empty?
-
- target_users.each do |user|
- Users::UpdateAssignedOpenIssueCountService.new(target_user: user).execute
- end
- rescue StandardError => exception
- Gitlab::ErrorTracking.track_and_raise_for_dev_exception(exception)
- end
- end
-end
diff --git a/app/workers/web_hook_worker.rb b/app/workers/web_hook_worker.rb
index dffab61dd0e..3480f49d640 100644
--- a/app/workers/web_hook_worker.rb
+++ b/app/workers/web_hook_worker.rb
@@ -8,6 +8,7 @@ class WebHookWorker
feature_category :integrations
worker_has_external_dependencies!
loggable_arguments 2
+ data_consistency :delayed, feature_flag: :load_balancing_for_web_hook_worker
sidekiq_options retry: 4, dead: false
@@ -15,7 +16,7 @@ class WebHookWorker
hook = WebHook.find(hook_id)
data = data.with_indifferent_access
- WebHookService.new(hook, data, hook_name).execute
+ WebHookService.new(hook, data, hook_name, jid).execute
end
end
# rubocop:enable Scalability/IdempotentWorker
diff --git a/app/workers/web_hooks/log_execution_worker.rb b/app/workers/web_hooks/log_execution_worker.rb
new file mode 100644
index 00000000000..58059370200
--- /dev/null
+++ b/app/workers/web_hooks/log_execution_worker.rb
@@ -0,0 +1,24 @@
+# frozen_string_literal: true
+
+module WebHooks
+ class LogExecutionWorker
+ include ApplicationWorker
+
+ idempotent!
+ feature_category :integrations
+ urgency :low
+
+ # This worker accepts an extra argument. This enables us to
+ # treat this worker as idempotent. Currently this is set to
+ # the Job ID (jid) of the parent worker.
+ def perform(hook_id, log_data, response_category, _unique_by)
+ hook = WebHook.find_by_id(hook_id)
+
+ return unless hook # hook has been deleted before we could run.
+
+ ::WebHooks::LogExecutionService
+ .new(hook: hook, log_data: log_data, response_category: response_category.to_sym)
+ .execute
+ end
+ end
+end