summaryrefslogtreecommitdiff
path: root/app/workers
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2021-04-20 23:50:22 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2021-04-20 23:50:22 +0000
commit9dc93a4519d9d5d7be48ff274127136236a3adb3 (patch)
tree70467ae3692a0e35e5ea56bcb803eb512a10bedb /app/workers
parent4b0f34b6d759d6299322b3a54453e930c6121ff0 (diff)
downloadgitlab-ce-9dc93a4519d9d5d7be48ff274127136236a3adb3.tar.gz
Add latest changes from gitlab-org/gitlab@13-11-stable-eev13.11.0-rc43
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml128
-rw-r--r--app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb39
-rw-r--r--app/workers/build_finished_worker.rb4
-rw-r--r--app/workers/build_hooks_worker.rb1
-rw-r--r--app/workers/bulk_import_worker.rb13
-rw-r--r--app/workers/bulk_imports/entity_worker.rb49
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb70
-rw-r--r--app/workers/chaos/kill_worker.rb4
-rw-r--r--app/workers/ci/drop_pipeline_worker.rb16
-rw-r--r--app/workers/ci/initial_pipeline_process_worker.rb22
-rw-r--r--app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb21
-rw-r--r--app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb2
-rw-r--r--app/workers/concerns/application_worker.rb2
-rw-r--r--app/workers/concerns/cronjob_queue.rb2
-rw-r--r--app/workers/concerns/each_shard_worker.rb8
-rw-r--r--app/workers/concerns/reactive_cacheable_worker.rb8
-rw-r--r--app/workers/concerns/worker_attributes.rb33
-rw-r--r--app/workers/container_expiration_policy_worker.rb22
-rw-r--r--app/workers/database/batched_background_migration_worker.rb57
-rw-r--r--app/workers/delete_stored_files_worker.rb4
-rw-r--r--app/workers/emails_on_push_worker.rb8
-rw-r--r--app/workers/expire_build_artifacts_worker.rb2
-rw-r--r--app/workers/expire_job_cache_worker.rb2
-rw-r--r--app/workers/expire_pipeline_cache_worker.rb2
-rw-r--r--app/workers/irker_worker.rb3
-rw-r--r--app/workers/merge_requests/assignees_change_worker.rb26
-rw-r--r--app/workers/merge_requests/create_pipeline_worker.rb28
-rw-r--r--app/workers/merge_requests/handle_assignees_change_worker.rb22
-rw-r--r--app/workers/merge_requests/resolve_todos_worker.rb18
-rw-r--r--app/workers/namespaces/in_product_marketing_emails_worker.rb21
-rw-r--r--app/workers/new_issue_worker.rb5
-rw-r--r--app/workers/object_storage/migrate_uploads_worker.rb3
-rw-r--r--app/workers/packages/go/sync_packages_worker.rb33
-rw-r--r--app/workers/packages/rubygems/extraction_worker.rb27
-rw-r--r--app/workers/pages_update_configuration_worker.rb2
-rw-r--r--app/workers/projects/post_creation_worker.rb34
-rw-r--r--app/workers/remove_expired_members_worker.rb21
-rw-r--r--app/workers/ssh_keys/expired_notification_worker.rb25
-rw-r--r--app/workers/ssh_keys/expiring_soon_notification_worker.rb25
-rw-r--r--app/workers/todos_destroyer/destroyed_issuable_worker.rb14
-rw-r--r--app/workers/update_highest_role_worker.rb2
41 files changed, 763 insertions, 65 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index ff26aa7a4be..fa6ea54e342 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -25,7 +25,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
- :idempotent: true
+ :idempotent:
:tags: []
- :name: authorized_project_update:authorized_project_update_user_refresh_with_low_urgency
:feature_category: :authentication_and_authorization
@@ -187,6 +187,14 @@
:weight: 1
:idempotent:
:tags: []
+- :name: cronjob:database_batched_background_migration
+ :feature_category: :database
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: cronjob:environments_auto_stop_cron
:feature_category: :continuous_delivery
:has_external_dependencies:
@@ -435,6 +443,22 @@
:weight: 1
:idempotent:
:tags: []
+- :name: cronjob:ssh_keys_expired_notification
+ :feature_category: :compliance_management
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
+- :name: cronjob:ssh_keys_expiring_soon_notification
+ :feature_category: :compliance_management
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: cronjob:stuck_ci_jobs
:feature_category: :continuous_integration
:has_external_dependencies:
@@ -1083,6 +1107,14 @@
:weight: 1
:idempotent:
:tags: []
+- :name: package_repositories:packages_go_sync_packages
+ :feature_category: :package_registry
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: package_repositories:packages_maven_metadata_sync
:feature_category: :package_registry
:has_external_dependencies:
@@ -1099,6 +1131,14 @@
:weight: 1
:idempotent:
:tags: []
+- :name: package_repositories:packages_rubygems_extraction
+ :feature_category: :package_registry
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: pipeline_background:archive_trace
:feature_category: :continuous_integration
:has_external_dependencies:
@@ -1187,6 +1227,14 @@
:weight: 4
:idempotent:
:tags: []
+- :name: pipeline_creation:merge_requests_create_pipeline
+ :feature_category: :continuous_integration
+ :has_external_dependencies:
+ :urgency: :high
+ :resource_boundary: :cpu
+ :weight: 4
+ :idempotent: true
+ :tags: []
- :name: pipeline_creation:run_pipeline_schedule
:feature_category: :continuous_integration
:has_external_dependencies:
@@ -1203,6 +1251,22 @@
:weight: 3
:idempotent:
:tags: []
+- :name: pipeline_default:ci_drop_pipeline
+ :feature_category: :continuous_integration
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 3
+ :idempotent: true
+ :tags: []
+- :name: pipeline_default:ci_merge_requests_add_todo_when_build_fails
+ :feature_category: :continuous_integration
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 3
+ :idempotent: true
+ :tags: []
- :name: pipeline_default:ci_pipeline_bridge_status
:feature_category: :continuous_integration
:has_external_dependencies:
@@ -1283,6 +1347,14 @@
:weight: 5
:idempotent:
:tags: []
+- :name: pipeline_processing:ci_initial_pipeline_process
+ :feature_category: :continuous_integration
+ :has_external_dependencies:
+ :urgency: :high
+ :resource_boundary: :unknown
+ :weight: 5
+ :idempotent: true
+ :tags: []
- :name: pipeline_processing:ci_resource_groups_assign_resource_from_resource_group
:feature_category: :continuous_delivery
:has_external_dependencies:
@@ -1355,6 +1427,14 @@
:weight: 1
:idempotent:
:tags: []
+- :name: todos_destroyer:todos_destroyer_destroyed_issuable
+ :feature_category: :issue_tracking
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: todos_destroyer:todos_destroyer_entity_leave
:feature_category: :issue_tracking
:has_external_dependencies:
@@ -1475,6 +1555,14 @@
:weight: 1
:idempotent:
:tags: []
+- :name: bulk_imports_pipeline
+ :feature_category: :importers
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent:
+ :tags: []
- :name: chat_notification
:feature_category: :chatops
:has_external_dependencies: true
@@ -1781,9 +1869,9 @@
:idempotent: true
:tags: []
- :name: mailers
- :feature_category:
+ :feature_category: :issue_tracking
:has_external_dependencies:
- :urgency:
+ :urgency: low
:resource_boundary:
:weight: 2
:idempotent:
@@ -1812,6 +1900,14 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: merge_requests_assignees_change
+ :feature_category: :source_code_management
+ :has_external_dependencies:
+ :urgency: :high
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: merge_requests_delete_source_branch
:feature_category: :source_code_management
:has_external_dependencies:
@@ -1820,6 +1916,22 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: merge_requests_handle_assignees_change
+ :feature_category: :code_review
+ :has_external_dependencies:
+ :urgency: :high
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
+- :name: merge_requests_resolve_todos
+ :feature_category: :code_review
+ :has_external_dependencies:
+ :urgency: :high
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: metrics_dashboard_prune_old_annotations
:feature_category: :metrics
:has_external_dependencies:
@@ -2056,6 +2168,14 @@
:weight: 1
:idempotent:
:tags: []
+- :name: projects_post_creation
+ :feature_category: :source_code_management
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: projects_schedule_bulk_repository_shard_moves
:feature_category: :gitaly
:has_external_dependencies:
@@ -2273,7 +2393,7 @@
:idempotent:
:tags: []
- :name: update_highest_role
- :feature_category: :authentication_and_authorization
+ :feature_category: :utilization
:has_external_dependencies:
:urgency: :high
:resource_boundary: :unknown
diff --git a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb
index 9bd1ad2ed30..6635c322ab8 100644
--- a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb
+++ b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb
@@ -1,18 +1,49 @@
# frozen_string_literal: true
module AuthorizedProjectUpdate
- class UserRefreshOverUserRangeWorker
+ class UserRefreshOverUserRangeWorker # rubocop:disable Scalability/IdempotentWorker
+ # When the feature flag named `periodic_project_authorization_update_via_replica` is enabled,
+ # this worker checks if a specific user requires an update to their project_authorizations records.
+ # This check is done via the data read from the database replica (and not from the primary).
+ # If this check returns true, a completely new Sidekiq job is enqueued for this specific user
+ # so as to update its project_authorizations records.
+
+ # There is a possibility that the data in the replica is lagging behind the primary
+ # and hence it becomes very important that we check if an update is indeed required for this user
+ # once again via the primary database, which is the reason why we enqueue a completely new Sidekiq job
+ # via `UserRefreshWithLowUrgencyWorker` for this user.
+
include ApplicationWorker
feature_category :authentication_and_authorization
urgency :low
queue_namespace :authorized_project_update
+ # This job will not be deduplicated since it is marked with
+ # `data_consistency :delayed` and not `idempotent!`
+ # See https://gitlab.com/gitlab-org/gitlab/-/issues/325291
deduplicate :until_executing, including_scheduled: true
-
- idempotent!
+ data_consistency :delayed, feature_flag: :periodic_project_authorization_update_via_replica
def perform(start_user_id, end_user_id)
- AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute
+ if Feature.enabled?(:periodic_project_authorization_update_via_replica)
+ User.where(id: start_user_id..end_user_id).find_each do |user| # rubocop: disable CodeReuse/ActiveRecord
+ enqueue_project_authorizations_refresh(user) if project_authorizations_needs_refresh?(user)
+ end
+ else
+ AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute
+ end
+ end
+
+ private
+
+ def project_authorizations_needs_refresh?(user)
+ AuthorizedProjectUpdate::FindRecordsDueForRefreshService.new(user).needs_refresh?
+ end
+
+ def enqueue_project_authorizations_refresh(user)
+ with_context(user: user) do
+ AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker.perform_async(user.id)
+ end
end
end
end
diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb
index 3f99b30fdf7..aeda8d113ac 100644
--- a/app/workers/build_finished_worker.rb
+++ b/app/workers/build_finished_worker.rb
@@ -37,6 +37,10 @@ class BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker
ExpirePipelineCacheWorker.perform_async(build.pipeline_id)
ChatNotificationWorker.perform_async(build.id) if build.pipeline.chat?
+ if build.failed?
+ ::Ci::MergeRequests::AddTodoWhenBuildFailsWorker.perform_async(build.id)
+ end
+
##
# We want to delay sending a build trace to object storage operation to
# validate that this fixes a race condition between this and flushing live
diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb
index ce4aa7229aa..5e05063f058 100644
--- a/app/workers/build_hooks_worker.rb
+++ b/app/workers/build_hooks_worker.rb
@@ -7,6 +7,7 @@ class BuildHooksWorker # rubocop:disable Scalability/IdempotentWorker
queue_namespace :pipeline_hooks
feature_category :continuous_integration
urgency :high
+ data_consistency :delayed, feature_flag: :load_balancing_for_build_hooks_worker
# rubocop: disable CodeReuse/ActiveRecord
def perform(build_id)
diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb
index e6bc54895a7..b4b9d9b05c1 100644
--- a/app/workers/bulk_import_worker.rb
+++ b/app/workers/bulk_import_worker.rb
@@ -21,9 +21,11 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
@bulk_import.start! if @bulk_import.created?
created_entities.first(next_batch_size).each do |entity|
- entity.start!
+ create_pipeline_tracker_for(entity)
BulkImports::EntityWorker.perform_async(entity.id)
+
+ entity.start!
end
re_enqueue
@@ -65,4 +67,13 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
def re_enqueue
BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id)
end
+
+ def create_pipeline_tracker_for(entity)
+ BulkImports::Stage.pipelines.each do |stage, pipeline|
+ entity.trackers.create!(
+ stage: stage,
+ pipeline_name: pipeline
+ )
+ end
+ end
end
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index 5b41ccbdea1..7f173b738cf 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -10,24 +10,47 @@ module BulkImports
worker_has_external_dependencies!
- def perform(entity_id)
- entity = BulkImports::Entity.with_status(:started).find_by_id(entity_id)
+ def perform(entity_id, current_stage = nil)
+ return if stage_running?(entity_id, current_stage)
+
+ logger.info(
+ worker: self.class.name,
+ entity_id: entity_id,
+ current_stage: current_stage
+ )
+
+ next_pipeline_trackers_for(entity_id).each do |pipeline_tracker|
+ BulkImports::PipelineWorker.perform_async(
+ pipeline_tracker.id,
+ pipeline_tracker.stage,
+ entity_id
+ )
+ end
+ rescue => e
+ logger.error(
+ worker: self.class.name,
+ entity_id: entity_id,
+ current_stage: current_stage,
+ error_message: e.message
+ )
+
+ Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id)
+ end
- if entity
- entity.update!(jid: jid)
+ private
- BulkImports::Importers::GroupImporter.new(entity).execute
- end
+ def stage_running?(entity_id, stage)
+ return unless stage
- rescue => e
- extra = {
- bulk_import_id: entity&.bulk_import&.id,
- entity_id: entity&.id
- }
+ BulkImports::Tracker.stage_running?(entity_id, stage)
+ end
- Gitlab::ErrorTracking.track_exception(e, extra)
+ def next_pipeline_trackers_for(entity_id)
+ BulkImports::Tracker.next_pipeline_trackers_for(entity_id)
+ end
- entity&.fail_op
+ def logger
+ @logger ||= Gitlab::Import::Logger.build
end
end
end
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
new file mode 100644
index 00000000000..a6de3c36205
--- /dev/null
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -0,0 +1,70 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
+ include ApplicationWorker
+
+ feature_category :importers
+
+ sidekiq_options retry: false, dead: false
+
+ worker_has_external_dependencies!
+
+ def perform(pipeline_tracker_id, stage, entity_id)
+ pipeline_tracker = ::BulkImports::Tracker
+ .with_status(:created)
+ .find_by_id(pipeline_tracker_id)
+
+ if pipeline_tracker.present?
+ logger.info(
+ worker: self.class.name,
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name
+ )
+
+ run(pipeline_tracker)
+ else
+ logger.error(
+ worker: self.class.name,
+ entity_id: entity_id,
+ pipeline_tracker_id: pipeline_tracker_id,
+ message: 'Unstarted pipeline not found'
+ )
+ end
+
+ ensure
+ ::BulkImports::EntityWorker.perform_async(entity_id, stage)
+ end
+
+ private
+
+ def run(pipeline_tracker)
+ pipeline_tracker.update!(status_event: 'start', jid: jid)
+
+ context = ::BulkImports::Pipeline::Context.new(pipeline_tracker)
+
+ pipeline_tracker.pipeline_class.new(context).run
+
+ pipeline_tracker.finish!
+ rescue => e
+ pipeline_tracker.fail_op!
+
+ logger.error(
+ worker: self.class.name,
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name,
+ message: e.message
+ )
+
+ Gitlab::ErrorTracking.track_exception(
+ e,
+ entity_id: pipeline_tracker.entity.id,
+ pipeline_name: pipeline_tracker.pipeline_name
+ )
+ end
+
+ def logger
+ @logger ||= Gitlab::Import::Logger.build
+ end
+ end
+end
diff --git a/app/workers/chaos/kill_worker.rb b/app/workers/chaos/kill_worker.rb
index 3dedd47a1f9..4148c139d42 100644
--- a/app/workers/chaos/kill_worker.rb
+++ b/app/workers/chaos/kill_worker.rb
@@ -7,8 +7,8 @@ module Chaos
sidekiq_options retry: false
- def perform
- Gitlab::Chaos.kill
+ def perform(signal)
+ Gitlab::Chaos.kill(signal)
end
end
end
diff --git a/app/workers/ci/drop_pipeline_worker.rb b/app/workers/ci/drop_pipeline_worker.rb
new file mode 100644
index 00000000000..d19157a47e8
--- /dev/null
+++ b/app/workers/ci/drop_pipeline_worker.rb
@@ -0,0 +1,16 @@
+# frozen_string_literal: true
+
+module Ci
+ class DropPipelineWorker
+ include ApplicationWorker
+ include PipelineQueue
+
+ idempotent!
+
+ def perform(pipeline_id, failure_reason)
+ Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
+ Ci::DropPipelineService.new.execute(pipeline, failure_reason.to_sym)
+ end
+ end
+ end
+end
diff --git a/app/workers/ci/initial_pipeline_process_worker.rb b/app/workers/ci/initial_pipeline_process_worker.rb
new file mode 100644
index 00000000000..f59726c87fb
--- /dev/null
+++ b/app/workers/ci/initial_pipeline_process_worker.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+module Ci
+ class InitialPipelineProcessWorker
+ include ApplicationWorker
+ include PipelineQueue
+
+ queue_namespace :pipeline_processing
+ feature_category :continuous_integration
+ urgency :high
+ loggable_arguments 1
+ idempotent!
+
+ def perform(pipeline_id)
+ Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
+ Ci::ProcessPipelineService
+ .new(pipeline)
+ .execute
+ end
+ end
+ end
+end
diff --git a/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb b/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb
new file mode 100644
index 00000000000..d5e097dc2b5
--- /dev/null
+++ b/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+module Ci
+ module MergeRequests
+ class AddTodoWhenBuildFailsWorker
+ include ApplicationWorker
+ include PipelineQueue
+
+ urgency :low
+ idempotent!
+
+ def perform(job_id)
+ job = ::CommitStatus.with_pipeline.find_by_id(job_id)
+ project = job&.project
+
+ return unless job && project
+
+ ::MergeRequests::AddTodoWhenBuildFailsService.new(job.project, nil).execute(job)
+ end
+ end
+ end
+end
diff --git a/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb b/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb
index 0bb911bc6c8..fff979d95a9 100644
--- a/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb
+++ b/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb
@@ -14,7 +14,7 @@ module Ci
feature_category :continuous_integration
def perform
- service = ::Ci::PipelineArtifacts::DestroyExpiredArtifactsService.new
+ service = ::Ci::PipelineArtifacts::DestroyAllExpiredService.new
artifacts_count = service.execute
log_extra_metadata_on_done(:destroyed_pipeline_artifacts_count, artifacts_count)
end
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index d101ef100d8..0de26e27631 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -18,7 +18,7 @@ module ApplicationWorker
set_queue
def structured_payload(payload = {})
- context = Labkit::Context.current.to_h.merge(
+ context = Gitlab::ApplicationContext.current.merge(
'class' => self.class.name,
'job_status' => 'running',
'queue' => self.class.queue,
diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb
index 955387b5ad4..b89d6bba72c 100644
--- a/app/workers/concerns/cronjob_queue.rb
+++ b/app/workers/concerns/cronjob_queue.rb
@@ -15,7 +15,7 @@ module CronjobQueue
# Cronjobs never get scheduled with arguments, so this is safe to
# override
def context_for_arguments(_args)
- return if Gitlab::ApplicationContext.current_context_include?('meta.caller_id')
+ return if Gitlab::ApplicationContext.current_context_include?(:caller_id)
Gitlab::ApplicationContext.new(caller_id: "Cronjob")
end
diff --git a/app/workers/concerns/each_shard_worker.rb b/app/workers/concerns/each_shard_worker.rb
index 00f589f957e..d1d558f55fe 100644
--- a/app/workers/concerns/each_shard_worker.rb
+++ b/app/workers/concerns/each_shard_worker.rb
@@ -24,7 +24,13 @@ module EachShardWorker
end
def healthy_ready_shards
- ready_shards.select(&:success)
+ success_checks, failed_checks = ready_shards.partition(&:success)
+
+ if failed_checks.any?
+ ::Gitlab::AppLogger.error(message: 'Excluding unhealthy shards', failed_checks: failed_checks.map(&:payload), class: self.class.name)
+ end
+
+ success_checks
end
def ready_shards
diff --git a/app/workers/concerns/reactive_cacheable_worker.rb b/app/workers/concerns/reactive_cacheable_worker.rb
index 189b0607605..9e882c8ac7a 100644
--- a/app/workers/concerns/reactive_cacheable_worker.rb
+++ b/app/workers/concerns/reactive_cacheable_worker.rb
@@ -17,10 +17,10 @@ module ReactiveCacheableWorker
def perform(class_name, id, *args)
klass = begin
- class_name.constantize
- rescue NameError
- nil
- end
+ class_name.constantize
+ rescue NameError
+ nil
+ end
return unless klass
diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb
index 042508d08f2..6f99fd089ac 100644
--- a/app/workers/concerns/worker_attributes.rb
+++ b/app/workers/concerns/worker_attributes.rb
@@ -11,6 +11,8 @@ module WorkerAttributes
# Urgencies that workers can declare through the `urgencies` attribute
VALID_URGENCIES = [:high, :low, :throttled].freeze
+ VALID_DATA_CONSISTENCIES = [:always, :sticky, :delayed].freeze
+
NAMESPACE_WEIGHTS = {
auto_devops: 2,
auto_merge: 3,
@@ -69,6 +71,35 @@ module WorkerAttributes
class_attributes[:urgency] || :low
end
+ def data_consistency(data_consistency, feature_flag: nil)
+ raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency)
+ raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency]
+
+ class_attributes[:data_consistency_feature_flag] = feature_flag if feature_flag
+ class_attributes[:data_consistency] = data_consistency
+
+ validate_worker_attributes!
+ end
+
+ def validate_worker_attributes!
+ # Since the deduplication should always take into account the latest binary replication pointer into account,
+ # not the first one, the deduplication will not work with sticky or delayed.
+ # Follow up issue to improve this: https://gitlab.com/gitlab-org/gitlab/-/issues/325291
+ if idempotent? && get_data_consistency != :always
+ raise ArgumentError, "Class can't be marked as idempotent if data_consistency is not set to :always"
+ end
+ end
+
+ def get_data_consistency
+ class_attributes[:data_consistency] || :always
+ end
+
+ def get_data_consistency_feature_flag_enabled?
+ return true unless class_attributes[:data_consistency_feature_flag]
+
+ Feature.enabled?(class_attributes[:data_consistency_feature_flag], default_enabled: :yaml)
+ end
+
# Set this attribute on a job when it will call to services outside of the
# application, such as 3rd party applications, other k8s clusters etc See
# doc/development/sidekiq_style_guide.md#jobs-with-external-dependencies for
@@ -96,6 +127,8 @@ module WorkerAttributes
def idempotent!
class_attributes[:idempotent] = true
+
+ validate_worker_attributes!
end
def idempotent?
diff --git a/app/workers/container_expiration_policy_worker.rb b/app/workers/container_expiration_policy_worker.rb
index 43dbea027f2..5ca89179099 100644
--- a/app/workers/container_expiration_policy_worker.rb
+++ b/app/workers/container_expiration_policy_worker.rb
@@ -9,7 +9,7 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo
InvalidPolicyError = Class.new(StandardError)
- BATCH_SIZE = 1000.freeze
+ BATCH_SIZE = 1000
def perform
throttling_enabled? ? perform_throttled : perform_unthrottled
@@ -29,13 +29,15 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo
def perform_throttled
try_obtain_lease do
- with_runnable_policy do |policy|
- ContainerExpirationPolicy.transaction do
- policy.schedule_next_run!
- ContainerRepository.for_project_id(policy.id)
- .each_batch do |relation|
- relation.update_all(expiration_policy_cleanup_status: :cleanup_scheduled)
- end
+ unless loopless_enabled?
+ with_runnable_policy do |policy|
+ ContainerExpirationPolicy.transaction do
+ policy.schedule_next_run!
+ ContainerRepository.for_project_id(policy.id)
+ .each_batch do |relation|
+ relation.update_all(expiration_policy_cleanup_status: :cleanup_scheduled)
+ end
+ end
end
end
@@ -75,6 +77,10 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo
Feature.enabled?(:container_registry_expiration_policies_throttling)
end
+ def loopless_enabled?
+ Feature.enabled?(:container_registry_expiration_policies_loopless)
+ end
+
def lease_timeout
5.hours
end
diff --git a/app/workers/database/batched_background_migration_worker.rb b/app/workers/database/batched_background_migration_worker.rb
new file mode 100644
index 00000000000..de274d58ad7
--- /dev/null
+++ b/app/workers/database/batched_background_migration_worker.rb
@@ -0,0 +1,57 @@
+# frozen_string_literal: true
+
+module Database
+ class BatchedBackgroundMigrationWorker
+ include ApplicationWorker
+ include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+
+ feature_category :database
+ idempotent!
+
+ LEASE_TIMEOUT_MULTIPLIER = 3
+ MINIMUM_LEASE_TIMEOUT = 10.minutes.freeze
+ INTERVAL_VARIANCE = 5.seconds.freeze
+
+ def perform
+ return unless Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops) && active_migration
+
+ with_exclusive_lease(active_migration.interval) do
+ # Now that we have the exclusive lease, reload migration in case another process has changed it.
+ # This is a temporary solution until we have better concurrency handling around job execution
+ #
+ # We also have to disable this cop, because ApplicationRecord aliases reset to reload, but our database
+ # models don't inherit from ApplicationRecord
+ active_migration.reload # rubocop:disable Cop/ActiveRecordAssociationReload
+
+ run_active_migration if active_migration.active? && active_migration.interval_elapsed?(variance: INTERVAL_VARIANCE)
+ end
+ end
+
+ private
+
+ def active_migration
+ @active_migration ||= Gitlab::Database::BackgroundMigration::BatchedMigration.active_migration
+ end
+
+ def run_active_migration
+ Gitlab::Database::BackgroundMigration::BatchedMigrationRunner.new.run_migration_job(active_migration)
+ end
+
+ def with_exclusive_lease(interval)
+ timeout = max(interval * LEASE_TIMEOUT_MULTIPLIER, MINIMUM_LEASE_TIMEOUT)
+ lease = Gitlab::ExclusiveLease.new(lease_key, timeout: timeout)
+
+ yield if lease.try_obtain
+ ensure
+ lease&.cancel
+ end
+
+ def max(left, right)
+ left >= right ? left : right
+ end
+
+ def lease_key
+ self.class.name.demodulize.underscore
+ end
+ end
+end
diff --git a/app/workers/delete_stored_files_worker.rb b/app/workers/delete_stored_files_worker.rb
index 689ac3dd0ce..9cf5631b7d8 100644
--- a/app/workers/delete_stored_files_worker.rb
+++ b/app/workers/delete_stored_files_worker.rb
@@ -9,8 +9,8 @@ class DeleteStoredFilesWorker # rubocop:disable Scalability/IdempotentWorker
def perform(class_name, keys)
klass = begin
class_name.constantize
- rescue NameError
- nil
+ rescue NameError
+ nil
end
unless klass
diff --git a/app/workers/emails_on_push_worker.rb b/app/workers/emails_on_push_worker.rb
index 1a34bf50d87..978b65802dd 100644
--- a/app/workers/emails_on_push_worker.rb
+++ b/app/workers/emails_on_push_worker.rb
@@ -56,7 +56,7 @@ class EmailsOnPushWorker # rubocop:disable Scalability/IdempotentWorker
end
end
- valid_recipients(recipients).each do |recipient|
+ EmailsOnPushService.valid_recipients(recipients).each do |recipient|
send_email(
recipient,
project_id,
@@ -92,10 +92,4 @@ class EmailsOnPushWorker # rubocop:disable Scalability/IdempotentWorker
email.header[:skip_premailer] = true if skip_premailer
email.deliver_now
end
-
- def valid_recipients(recipients)
- recipients.split.select do |recipient|
- recipient.include?('@')
- end.uniq(&:downcase)
- end
end
diff --git a/app/workers/expire_build_artifacts_worker.rb b/app/workers/expire_build_artifacts_worker.rb
index 5db9f0b67e0..50fdd046491 100644
--- a/app/workers/expire_build_artifacts_worker.rb
+++ b/app/workers/expire_build_artifacts_worker.rb
@@ -10,7 +10,7 @@ class ExpireBuildArtifactsWorker # rubocop:disable Scalability/IdempotentWorker
feature_category :continuous_integration
def perform
- service = Ci::DestroyExpiredJobArtifactsService.new
+ service = Ci::JobArtifacts::DestroyAllExpiredService.new
artifacts_count = service.execute
log_extra_metadata_on_done(:destroyed_job_artifacts_count, artifacts_count)
end
diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb
index 77b0edfd7de..48bb1160ae8 100644
--- a/app/workers/expire_job_cache_worker.rb
+++ b/app/workers/expire_job_cache_worker.rb
@@ -10,7 +10,7 @@ class ExpireJobCacheWorker
# rubocop: disable CodeReuse/ActiveRecord
def perform(job_id)
- job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
+ job = CommitStatus.eager_load_pipeline.find_by(id: job_id)
return unless job
pipeline = job.pipeline
diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb
index 02039e40e15..cbea46cdccd 100644
--- a/app/workers/expire_pipeline_cache_worker.rb
+++ b/app/workers/expire_pipeline_cache_worker.rb
@@ -12,7 +12,7 @@ class ExpirePipelineCacheWorker
# rubocop: disable CodeReuse/ActiveRecord
def perform(pipeline_id)
- pipeline = Ci::Pipeline.find_by(id: pipeline_id)
+ pipeline = Ci::Pipeline.eager_load_project.find_by(id: pipeline_id)
return unless pipeline
Ci::ExpirePipelineCacheService.new.execute(pipeline)
diff --git a/app/workers/irker_worker.rb b/app/workers/irker_worker.rb
index 687fb1cd02a..c5bdb3e0970 100644
--- a/app/workers/irker_worker.rb
+++ b/app/workers/irker_worker.rb
@@ -147,8 +147,7 @@ class IrkerWorker # rubocop:disable Scalability/IdempotentWorker
def files_count(commit)
diff_size = commit.raw_deltas.size
- files = "#{diff_size} file".pluralize(diff_size)
- files
+ "#{diff_size} file".pluralize(diff_size)
end
def colorize_sha(sha)
diff --git a/app/workers/merge_requests/assignees_change_worker.rb b/app/workers/merge_requests/assignees_change_worker.rb
new file mode 100644
index 00000000000..9865563e357
--- /dev/null
+++ b/app/workers/merge_requests/assignees_change_worker.rb
@@ -0,0 +1,26 @@
+# frozen_string_literal: true
+
+class MergeRequests::AssigneesChangeWorker
+ include ApplicationWorker
+
+ feature_category :source_code_management
+ urgency :high
+ deduplicate :until_executed
+ idempotent!
+
+ def perform(merge_request_id, user_id, old_assignee_ids)
+ merge_request = MergeRequest.find(merge_request_id)
+ current_user = User.find(user_id)
+
+ # if a user was added and then removed, or removed and then added
+ # while waiting for this job to run, assume that nothing happened.
+ users = User.id_in(old_assignee_ids - merge_request.assignee_ids)
+
+ return if users.blank?
+
+ ::MergeRequests::HandleAssigneesChangeService
+ .new(merge_request.target_project, current_user)
+ .execute(merge_request, users, execute_hooks: true)
+ rescue ActiveRecord::RecordNotFound
+ end
+end
diff --git a/app/workers/merge_requests/create_pipeline_worker.rb b/app/workers/merge_requests/create_pipeline_worker.rb
new file mode 100644
index 00000000000..244ba1af300
--- /dev/null
+++ b/app/workers/merge_requests/create_pipeline_worker.rb
@@ -0,0 +1,28 @@
+# frozen_string_literal: true
+
+module MergeRequests
+ class CreatePipelineWorker
+ include ApplicationWorker
+ include PipelineQueue
+
+ queue_namespace :pipeline_creation
+ feature_category :continuous_integration
+ urgency :high
+ worker_resource_boundary :cpu
+ idempotent!
+
+ def perform(project_id, user_id, merge_request_id)
+ project = Project.find_by_id(project_id)
+ return unless project
+
+ user = User.find_by_id(user_id)
+ return unless user
+
+ merge_request = MergeRequest.find_by_id(merge_request_id)
+ return unless merge_request
+
+ MergeRequests::CreatePipelineService.new(project, user).execute(merge_request)
+ merge_request.update_head_pipeline
+ end
+ end
+end
diff --git a/app/workers/merge_requests/handle_assignees_change_worker.rb b/app/workers/merge_requests/handle_assignees_change_worker.rb
new file mode 100644
index 00000000000..e79d8293bae
--- /dev/null
+++ b/app/workers/merge_requests/handle_assignees_change_worker.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+class MergeRequests::HandleAssigneesChangeWorker
+ include ApplicationWorker
+
+ feature_category :code_review
+ urgency :high
+ deduplicate :until_executed
+ idempotent!
+
+ def perform(merge_request_id, user_id, old_assignee_ids, options = {})
+ merge_request = MergeRequest.find(merge_request_id)
+ user = User.find(user_id)
+
+ old_assignees = User.id_in(old_assignee_ids)
+
+ ::MergeRequests::HandleAssigneesChangeService
+ .new(merge_request.target_project, user)
+ .execute(merge_request, old_assignees, options)
+ rescue ActiveRecord::RecordNotFound
+ end
+end
diff --git a/app/workers/merge_requests/resolve_todos_worker.rb b/app/workers/merge_requests/resolve_todos_worker.rb
new file mode 100644
index 00000000000..2a5f742f809
--- /dev/null
+++ b/app/workers/merge_requests/resolve_todos_worker.rb
@@ -0,0 +1,18 @@
+# frozen_string_literal: true
+
+class MergeRequests::ResolveTodosWorker
+ include ApplicationWorker
+
+ feature_category :code_review
+ urgency :high
+ deduplicate :until_executed
+ idempotent!
+
+ def perform(merge_request_id, user_id)
+ merge_request = MergeRequest.find(merge_request_id)
+ user = User.find(user_id)
+
+ MergeRequests::ResolveTodosService.new(merge_request, user).execute
+ rescue ActiveRecord::RecordNotFound
+ end
+end
diff --git a/app/workers/namespaces/in_product_marketing_emails_worker.rb b/app/workers/namespaces/in_product_marketing_emails_worker.rb
index f8fa393264a..3070afed3d6 100644
--- a/app/workers/namespaces/in_product_marketing_emails_worker.rb
+++ b/app/workers/namespaces/in_product_marketing_emails_worker.rb
@@ -9,10 +9,27 @@ module Namespaces
urgency :low
def perform
- return unless Gitlab::CurrentSettings.in_product_marketing_emails_enabled
- return unless Gitlab::Experimentation.active?(:in_product_marketing_emails)
+ return if paid_self_managed_instance?
+ return if setting_disabled?
+ return if experiment_inactive?
Namespaces::InProductMarketingEmailsService.send_for_all_tracks_and_intervals
end
+
+ private
+
+ def paid_self_managed_instance?
+ false
+ end
+
+ def setting_disabled?
+ !Gitlab::CurrentSettings.in_product_marketing_emails_enabled
+ end
+
+ def experiment_inactive?
+ Gitlab.com? && !Gitlab::Experimentation.active?(:in_product_marketing_emails)
+ end
end
end
+
+Namespaces::InProductMarketingEmailsWorker.prepend_if_ee('EE::Namespaces::InProductMarketingEmailsWorker')
diff --git a/app/workers/new_issue_worker.rb b/app/workers/new_issue_worker.rb
index be9a168c3f6..c08f4b4cd75 100644
--- a/app/workers/new_issue_worker.rb
+++ b/app/workers/new_issue_worker.rb
@@ -14,7 +14,12 @@ class NewIssueWorker # rubocop:disable Scalability/IdempotentWorker
::EventCreateService.new.open_issue(issuable, user)
::NotificationService.new.new_issue(issuable, user)
+
issuable.create_cross_references!(user)
+
+ Issues::AfterCreateService
+ .new(issuable.project, user)
+ .execute(issuable)
end
def issuable_class
diff --git a/app/workers/object_storage/migrate_uploads_worker.rb b/app/workers/object_storage/migrate_uploads_worker.rb
index f489e428e8d..666bacb0188 100644
--- a/app/workers/object_storage/migrate_uploads_worker.rb
+++ b/app/workers/object_storage/migrate_uploads_worker.rb
@@ -16,7 +16,8 @@ module ObjectStorage
attr_accessor :error
def initialize(upload, error = nil)
- @upload, @error = upload, error
+ @upload = upload
+ @error = error
end
def success?
diff --git a/app/workers/packages/go/sync_packages_worker.rb b/app/workers/packages/go/sync_packages_worker.rb
new file mode 100644
index 00000000000..e41f27f2252
--- /dev/null
+++ b/app/workers/packages/go/sync_packages_worker.rb
@@ -0,0 +1,33 @@
+# frozen_string_literal: true
+
+module Packages
+ module Go
+ class SyncPackagesWorker
+ include ApplicationWorker
+ include Gitlab::Golang
+
+ queue_namespace :package_repositories
+ feature_category :package_registry
+
+ deduplicate :until_executing
+ idempotent!
+
+ def perform(project_id, ref_name, path)
+ project = Project.find_by_id(project_id)
+ return unless project && project.repository.find_tag(ref_name)
+
+ module_name = go_path(project, path)
+ mod = Packages::Go::ModuleFinder.new(project, module_name).execute
+ return unless mod
+
+ ver = Packages::Go::VersionFinder.new(mod).find(ref_name)
+ return unless ver
+
+ Packages::Go::CreatePackageService.new(project, nil, version: ver).execute
+
+ rescue ::Packages::Go::CreatePackageService::GoZipSizeError => ex
+ Gitlab::ErrorTracking.log_exception(ex)
+ end
+ end
+ end
+end
diff --git a/app/workers/packages/rubygems/extraction_worker.rb b/app/workers/packages/rubygems/extraction_worker.rb
new file mode 100644
index 00000000000..1e5cd0b54ce
--- /dev/null
+++ b/app/workers/packages/rubygems/extraction_worker.rb
@@ -0,0 +1,27 @@
+# frozen_string_literal: true
+
+module Packages
+ module Rubygems
+ class ExtractionWorker # rubocop:disable Scalability/IdempotentWorker
+ include ApplicationWorker
+
+ queue_namespace :package_repositories
+ feature_category :package_registry
+ deduplicate :until_executing
+
+ idempotent!
+
+ def perform(package_file_id)
+ package_file = ::Packages::PackageFile.find_by_id(package_file_id)
+
+ return unless package_file
+
+ ::Packages::Rubygems::ProcessGemService.new(package_file).execute
+
+ rescue ::Packages::Rubygems::ProcessGemService::ExtractionError => e
+ Gitlab::ErrorTracking.log_exception(e, project_id: package_file.project_id)
+ package_file.package.destroy!
+ end
+ end
+ end
+end
diff --git a/app/workers/pages_update_configuration_worker.rb b/app/workers/pages_update_configuration_worker.rb
index 6e82e2099c7..ca5016dc782 100644
--- a/app/workers/pages_update_configuration_worker.rb
+++ b/app/workers/pages_update_configuration_worker.rb
@@ -7,7 +7,7 @@ class PagesUpdateConfigurationWorker
feature_category :pages
def self.perform_async(*args)
- return unless Feature.enabled?(:pages_update_legacy_storage, default_enabled: true)
+ return unless ::Settings.pages.local_store.enabled
super(*args)
end
diff --git a/app/workers/projects/post_creation_worker.rb b/app/workers/projects/post_creation_worker.rb
new file mode 100644
index 00000000000..2ca62e582b6
--- /dev/null
+++ b/app/workers/projects/post_creation_worker.rb
@@ -0,0 +1,34 @@
+# frozen_string_literal: true
+
+module Projects
+ class PostCreationWorker
+ include ApplicationWorker
+
+ feature_category :source_code_management
+ idempotent!
+
+ def perform(project_id)
+ project = Project.find_by_id(project_id)
+
+ return unless project
+
+ create_prometheus_service(project)
+ end
+
+ private
+
+ def create_prometheus_service(project)
+ service = project.find_or_initialize_service(::PrometheusService.to_param)
+
+ # If the service has already been inserted in the database, that
+ # means it came from a template, and there's nothing more to do.
+ return if service.persisted?
+
+ return unless service.prometheus_available?
+
+ service.save!
+ rescue ActiveRecord::RecordInvalid => e
+ Gitlab::ErrorTracking.track_exception(e, extra: { project_id: project.id })
+ end
+ end
+end
diff --git a/app/workers/remove_expired_members_worker.rb b/app/workers/remove_expired_members_worker.rb
index 35844fdf297..fc2ec047e1c 100644
--- a/app/workers/remove_expired_members_worker.rb
+++ b/app/workers/remove_expired_members_worker.rb
@@ -2,20 +2,29 @@
class RemoveExpiredMembersWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
- include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
+ include CronjobQueue
feature_category :authentication_and_authorization
worker_resource_boundary :cpu
# rubocop: disable CodeReuse/ActiveRecord
def perform
- Member.expired.preload(:user).find_each do |member|
- Members::DestroyService.new.execute(member, skip_authorization: true)
+ Member.expired.preload(:user, :source).find_each do |member|
+ context = {
+ user: member.user,
+ # The ApplicationContext will reject type-mismatches. So a GroupMemeber will only populate `namespace`.
+ # while a `ProjectMember` will populate `project
+ project: member.source,
+ namespace: member.source
+ }
+ with_context(context) do
+ Members::DestroyService.new.execute(member, skip_authorization: true)
- expired_user = member.user
+ expired_user = member.user
- if expired_user.project_bot?
- Users::DestroyService.new(nil).execute(expired_user, skip_authorization: true)
+ if expired_user.project_bot?
+ Users::DestroyService.new(nil).execute(expired_user, skip_authorization: true)
+ end
end
rescue => ex
logger.error("Expired Member ID=#{member.id} cannot be removed - #{ex}")
diff --git a/app/workers/ssh_keys/expired_notification_worker.rb b/app/workers/ssh_keys/expired_notification_worker.rb
new file mode 100644
index 00000000000..ab6d1998773
--- /dev/null
+++ b/app/workers/ssh_keys/expired_notification_worker.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+module SshKeys
+ class ExpiredNotificationWorker
+ include ApplicationWorker
+ include CronjobQueue
+
+ feature_category :compliance_management
+ idempotent!
+
+ def perform
+ return unless ::Feature.enabled?(:ssh_key_expiration_email_notification, default_enabled: :yaml)
+
+ User.with_ssh_key_expired_today.find_each do |user|
+ with_context(user: user) do
+ Gitlab::AppLogger.info "#{self.class}: Notifying User #{user.id} about expired ssh key(s)"
+
+ keys = user.expired_today_and_unnotified_keys
+
+ Keys::ExpiryNotificationService.new(user, { keys: keys, expiring_soon: false }).execute
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/ssh_keys/expiring_soon_notification_worker.rb b/app/workers/ssh_keys/expiring_soon_notification_worker.rb
new file mode 100644
index 00000000000..3214cd7a242
--- /dev/null
+++ b/app/workers/ssh_keys/expiring_soon_notification_worker.rb
@@ -0,0 +1,25 @@
+# frozen_string_literal: true
+
+module SshKeys
+ class ExpiringSoonNotificationWorker
+ include ApplicationWorker
+ include CronjobQueue
+
+ feature_category :compliance_management
+ idempotent!
+
+ def perform
+ return unless ::Feature.enabled?(:ssh_key_expiration_email_notification, default_enabled: :yaml)
+
+ User.with_ssh_key_expiring_soon.find_each do |user|
+ with_context(user: user) do
+ Gitlab::AppLogger.info "#{self.class}: Notifying User #{user.id} about expiring soon ssh key(s)"
+
+ keys = user.expiring_soon_and_unnotified_keys
+
+ Keys::ExpiryNotificationService.new(user, { keys: keys, expiring_soon: true }).execute
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/todos_destroyer/destroyed_issuable_worker.rb b/app/workers/todos_destroyer/destroyed_issuable_worker.rb
new file mode 100644
index 00000000000..6ca1959ff34
--- /dev/null
+++ b/app/workers/todos_destroyer/destroyed_issuable_worker.rb
@@ -0,0 +1,14 @@
+# frozen_string_literal: true
+
+module TodosDestroyer
+ class DestroyedIssuableWorker
+ include ApplicationWorker
+ include TodosDestroyerQueue
+
+ idempotent!
+
+ def perform(target_id, target_type)
+ ::Todos::Destroy::DestroyedIssuableService.new(target_id, target_type).execute
+ end
+ end
+end
diff --git a/app/workers/update_highest_role_worker.rb b/app/workers/update_highest_role_worker.rb
index 1e2c974b6e5..952f1e511ea 100644
--- a/app/workers/update_highest_role_worker.rb
+++ b/app/workers/update_highest_role_worker.rb
@@ -3,7 +3,7 @@
class UpdateHighestRoleWorker
include ApplicationWorker
- feature_category :authentication_and_authorization
+ feature_category :utilization
urgency :high
weight 2