summaryrefslogtreecommitdiff
path: root/app/workers
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml51
-rw-r--r--app/workers/background_migration/single_database_worker.rb148
-rw-r--r--app/workers/background_migration_worker.rb117
-rw-r--r--app/workers/bulk_imports/entity_worker.rb5
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb4
-rw-r--r--app/workers/ci/create_downstream_pipeline_worker.rb1
-rw-r--r--app/workers/ci/pending_builds/update_group_worker.rb19
-rw-r--r--app/workers/ci/pending_builds/update_project_worker.rb19
-rw-r--r--app/workers/ci/pipeline_artifacts/create_quality_report_worker.rb2
-rw-r--r--app/workers/concerns/application_worker.rb4
-rw-r--r--app/workers/expire_job_cache_worker.rb15
-rw-r--r--app/workers/issuable_export_csv_worker.rb2
-rw-r--r--app/workers/issue_placement_worker.rb7
-rw-r--r--app/workers/issue_rebalancing_worker.rb3
-rw-r--r--app/workers/issues/rebalancing_worker.rb1
-rw-r--r--app/workers/issues/reschedule_stuck_issue_rebalances_worker.rb4
-rw-r--r--app/workers/namespaces/process_sync_events_worker.rb22
-rw-r--r--app/workers/projects/process_sync_events_worker.rb22
-rw-r--r--app/workers/propagate_integration_worker.rb2
-rw-r--r--app/workers/propagate_service_template_worker.rb29
-rw-r--r--app/workers/purge_dependency_proxy_cache_worker.rb11
-rw-r--r--app/workers/todos_destroyer/private_features_worker.rb2
22 files changed, 312 insertions, 178 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index 699744b355c..e5ac9da37c6 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -1447,6 +1447,24 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: pipeline_background:ci_pending_builds_update_group
+ :worker_name: Ci::PendingBuilds::UpdateGroupWorker
+ :feature_category: :continuous_integration
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
+- :name: pipeline_background:ci_pending_builds_update_project
+ :worker_name: Ci::PendingBuilds::UpdateProjectWorker
+ :feature_category: :continuous_integration
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: pipeline_background:ci_pipeline_artifacts_coverage_report
:worker_name: Ci::PipelineArtifacts::CoverageReportWorker
:feature_category: :code_testing
@@ -1458,7 +1476,7 @@
:tags: []
- :name: pipeline_background:ci_pipeline_artifacts_create_quality_report
:worker_name: Ci::PipelineArtifacts::CreateQualityReportWorker
- :feature_category: :code_testing
+ :feature_category: :code_quality
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
@@ -1559,7 +1577,7 @@
:worker_name: Ci::CreateDownstreamPipelineWorker
:feature_category: :continuous_integration
:has_external_dependencies:
- :urgency: :low
+ :urgency: :high
:resource_boundary: :cpu
:weight: 3
:idempotent:
@@ -1913,7 +1931,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
- :idempotent:
+ :idempotent: true
:tags: []
- :name: bulk_imports_export_request
:worker_name: BulkImports::ExportRequestWorker
@@ -2474,6 +2492,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: namespaces_process_sync_events
+ :worker_name: Namespaces::ProcessSyncEventsWorker
+ :feature_category: :sharding
+ :has_external_dependencies:
+ :urgency: :high
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: new_issue
:worker_name: NewIssueWorker
:feature_category: :team_planning
@@ -2645,6 +2672,15 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: projects_process_sync_events
+ :worker_name: Projects::ProcessSyncEventsWorker
+ :feature_category: :sharding
+ :has_external_dependencies:
+ :urgency: :high
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: projects_schedule_bulk_repository_shard_moves
:worker_name: Projects::ScheduleBulkRepositoryShardMovesWorker
:feature_category: :gitaly
@@ -2717,15 +2753,6 @@
:weight: 1
:idempotent: true
:tags: []
-- :name: propagate_service_template
- :worker_name: PropagateServiceTemplateWorker
- :feature_category: :integrations
- :has_external_dependencies:
- :urgency: :low
- :resource_boundary: :unknown
- :weight: 1
- :idempotent:
- :tags: []
- :name: reactive_caching
:worker_name: ReactiveCachingWorker
:feature_category: :not_owned
diff --git a/app/workers/background_migration/single_database_worker.rb b/app/workers/background_migration/single_database_worker.rb
new file mode 100644
index 00000000000..b6661d4fd14
--- /dev/null
+++ b/app/workers/background_migration/single_database_worker.rb
@@ -0,0 +1,148 @@
+# frozen_string_literal: true
+
+module BackgroundMigration
+ module SingleDatabaseWorker
+ extend ActiveSupport::Concern
+
+ include ApplicationWorker
+
+ MAX_LEASE_ATTEMPTS = 5
+
+ included do
+ data_consistency :always
+
+ sidekiq_options retry: 3
+
+ feature_category :database
+ urgency :throttled
+ loggable_arguments 0, 1
+ end
+
+ class_methods do
+ # The minimum amount of time between processing two jobs of the same migration
+ # class.
+ #
+ # This interval is set to 2 or 5 minutes so autovacuuming and other
+ # maintenance related tasks have plenty of time to clean up after a migration
+ # has been performed.
+ def minimum_interval
+ 2.minutes.to_i
+ end
+
+ def tracking_database
+ raise NotImplementedError, "#{self.name} does not implement #{__method__}"
+ end
+
+ def unhealthy_metric_name
+ raise NotImplementedError, "#{self.name} does not implement #{__method__}"
+ end
+ end
+
+ # Performs the background migration.
+ #
+ # See Gitlab::BackgroundMigration.perform for more information.
+ #
+ # class_name - The class name of the background migration to run.
+ # arguments - The arguments to pass to the migration class.
+ # lease_attempts - The number of times we will try to obtain an exclusive
+ # lease on the class before giving up. See MR for more discussion.
+ # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956
+ def perform(class_name, arguments = [], lease_attempts = MAX_LEASE_ATTEMPTS)
+ job_coordinator.with_shared_connection do
+ perform_with_connection(class_name, arguments, lease_attempts)
+ end
+ end
+
+ private
+
+ def job_coordinator
+ @job_coordinator ||= Gitlab::BackgroundMigration.coordinator_for_database(self.class.tracking_database)
+ end
+
+ def perform_with_connection(class_name, arguments, lease_attempts)
+ with_context(caller_id: class_name.to_s) do
+ retried = lease_attempts != MAX_LEASE_ATTEMPTS
+ attempts_left = lease_attempts - 1
+ should_perform, ttl = perform_and_ttl(class_name, attempts_left, retried)
+
+ break if should_perform.nil?
+
+ if should_perform
+ job_coordinator.perform(class_name, arguments)
+ else
+ # If the lease could not be obtained this means either another process is
+ # running a migration of this class or we ran one recently. In this case
+ # we'll reschedule the job in such a way that it is picked up again around
+ # the time the lease expires.
+ self.class
+ .perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left)
+ end
+ end
+ end
+
+ def perform_and_ttl(class_name, attempts_left, retried)
+ # In test environments `perform_in` will run right away. This can then
+ # lead to stack level errors in the above `#perform`. To work around this
+ # we'll just perform the migration right away in the test environment.
+ return [true, nil] if always_perform?
+
+ lease = lease_for(class_name, retried)
+ lease_obtained = !!lease.try_obtain
+ healthy_db = healthy_database?
+ perform = lease_obtained && healthy_db
+
+ database_unhealthy_counter.increment if lease_obtained && !healthy_db
+
+ # When the DB is unhealthy or the lease can't be obtained after several tries,
+ # then give up on the job and log a warning. Otherwise we could end up in
+ # an infinite rescheduling loop. Jobs can be tracked in the database with the
+ # use of Gitlab::Database::BackgroundMigrationJob
+ if !perform && attempts_left < 0
+ msg = if !lease_obtained
+ 'Job could not get an exclusive lease after several tries. Giving up.'
+ else
+ 'Database was unhealthy after several tries. Giving up.'
+ end
+
+ Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid)
+
+ return [nil, nil]
+ end
+
+ [perform, lease.ttl]
+ end
+
+ def lease_for(class_name, retried)
+ Gitlab::ExclusiveLease
+ .new(lease_key_for(class_name, retried), timeout: self.class.minimum_interval)
+ end
+
+ def lease_key_for(class_name, retried)
+ key = "#{self.class.name}:#{class_name}"
+ # We use a different exclusive lock key for retried jobs to allow them running concurrently with the scheduled jobs.
+ # See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68763 for more information.
+ key += ":retried" if retried
+ key
+ end
+
+ def always_perform?
+ Rails.env.test?
+ end
+
+ # Returns true if the database is healthy enough to allow the migration to be
+ # performed.
+ #
+ # class_name - The name of the background migration that we might want to
+ # run.
+ def healthy_database?
+ !Postgresql::ReplicationSlot.lag_too_great?
+ end
+
+ def database_unhealthy_counter
+ Gitlab::Metrics.counter(
+ self.class.unhealthy_metric_name,
+ 'The number of times a background migration is rescheduled because the database is unhealthy.'
+ )
+ end
+ end
+end
diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb
index b771ab4d4e7..dea0d467eca 100644
--- a/app/workers/background_migration_worker.rb
+++ b/app/workers/background_migration_worker.rb
@@ -1,120 +1,13 @@
# frozen_string_literal: true
class BackgroundMigrationWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
+ include BackgroundMigration::SingleDatabaseWorker
- MAX_LEASE_ATTEMPTS = 5
-
- data_consistency :always
-
- sidekiq_options retry: 3
-
- feature_category :database
- urgency :throttled
- loggable_arguments 0, 1
-
- # The minimum amount of time between processing two jobs of the same migration
- # class.
- #
- # This interval is set to 2 or 5 minutes so autovacuuming and other
- # maintenance related tasks have plenty of time to clean up after a migration
- # has been performed.
- def self.minimum_interval
- 2.minutes.to_i
- end
-
- # Performs the background migration.
- #
- # See Gitlab::BackgroundMigration.perform for more information.
- #
- # class_name - The class name of the background migration to run.
- # arguments - The arguments to pass to the migration class.
- # lease_attempts - The number of times we will try to obtain an exclusive
- # lease on the class before giving up. See MR for more discussion.
- # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956
- def perform(class_name, arguments = [], lease_attempts = MAX_LEASE_ATTEMPTS)
- with_context(caller_id: class_name.to_s) do
- retried = lease_attempts != MAX_LEASE_ATTEMPTS
- attempts_left = lease_attempts - 1
- should_perform, ttl = perform_and_ttl(class_name, attempts_left, retried)
-
- break if should_perform.nil?
-
- if should_perform
- Gitlab::BackgroundMigration.perform(class_name, arguments)
- else
- # If the lease could not be obtained this means either another process is
- # running a migration of this class or we ran one recently. In this case
- # we'll reschedule the job in such a way that it is picked up again around
- # the time the lease expires.
- self.class
- .perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left)
- end
- end
- end
-
- def perform_and_ttl(class_name, attempts_left, retried)
- # In test environments `perform_in` will run right away. This can then
- # lead to stack level errors in the above `#perform`. To work around this
- # we'll just perform the migration right away in the test environment.
- return [true, nil] if always_perform?
-
- lease = lease_for(class_name, retried)
- lease_obtained = !!lease.try_obtain
- healthy_db = healthy_database?
- perform = lease_obtained && healthy_db
-
- database_unhealthy_counter.increment if lease_obtained && !healthy_db
-
- # When the DB is unhealthy or the lease can't be obtained after several tries,
- # then give up on the job and log a warning. Otherwise we could end up in
- # an infinite rescheduling loop. Jobs can be tracked in the database with the
- # use of Gitlab::Database::BackgroundMigrationJob
- if !perform && attempts_left < 0
- msg = if !lease_obtained
- 'Job could not get an exclusive lease after several tries. Giving up.'
- else
- 'Database was unhealthy after several tries. Giving up.'
- end
-
- Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid)
-
- return [nil, nil]
- end
-
- [perform, lease.ttl]
- end
-
- def lease_for(class_name, retried)
- Gitlab::ExclusiveLease
- .new(lease_key_for(class_name, retried), timeout: self.class.minimum_interval)
- end
-
- def lease_key_for(class_name, retried)
- key = "#{self.class.name}:#{class_name}"
- # We use a different exclusive lock key for retried jobs to allow them running concurrently with the scheduled jobs.
- # See https://gitlab.com/gitlab-org/gitlab/-/merge_requests/68763 for more information.
- key += ":retried" if retried
- key
- end
-
- def always_perform?
- Rails.env.test?
- end
-
- # Returns true if the database is healthy enough to allow the migration to be
- # performed.
- #
- # class_name - The name of the background migration that we might want to
- # run.
- def healthy_database?
- !Postgresql::ReplicationSlot.lag_too_great?
+ def self.tracking_database
+ @tracking_database ||= Gitlab::BackgroundMigration::DEFAULT_TRACKING_DATABASE
end
- def database_unhealthy_counter
- Gitlab::Metrics.counter(
- :background_migration_database_health_reschedules,
- 'The number of times a background migration is rescheduled because the database is unhealthy.'
- )
+ def self.unhealthy_metric_name
+ @unhealthy_metric_name ||= :background_migration_database_health_reschedules
end
end
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
index 5c04cdc96a0..70d6626df91 100644
--- a/app/workers/bulk_imports/entity_worker.rb
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -12,6 +12,9 @@ module BulkImports
worker_has_external_dependencies!
+ idempotent!
+ deduplicate :until_executed, including_scheduled: true
+
def perform(entity_id, current_stage = nil)
return if stage_running?(entity_id, current_stage)
@@ -48,7 +51,7 @@ module BulkImports
end
def next_pipeline_trackers_for(entity_id)
- BulkImports::Tracker.next_pipeline_trackers_for(entity_id)
+ BulkImports::Tracker.next_pipeline_trackers_for(entity_id).update(status_event: 'enqueue')
end
def logger
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 35633b55489..8e5d7013c2c 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -16,7 +16,7 @@ module BulkImports
def perform(pipeline_tracker_id, stage, entity_id)
pipeline_tracker = ::BulkImports::Tracker
- .with_status(:created, :started)
+ .with_status(:enqueued)
.find_by_id(pipeline_tracker_id)
if pipeline_tracker.present?
@@ -68,6 +68,8 @@ module BulkImports
message: "Retrying error: #{e.message}"
)
+ pipeline_tracker.update!(status_event: 'retry', jid: jid)
+
reenqueue(pipeline_tracker, delay: e.retry_delay)
else
fail_tracker(pipeline_tracker, e)
diff --git a/app/workers/ci/create_downstream_pipeline_worker.rb b/app/workers/ci/create_downstream_pipeline_worker.rb
index 6d4cd2539c1..747cb088272 100644
--- a/app/workers/ci/create_downstream_pipeline_worker.rb
+++ b/app/workers/ci/create_downstream_pipeline_worker.rb
@@ -7,6 +7,7 @@ module Ci
sidekiq_options retry: 3
worker_resource_boundary :cpu
+ urgency :high
def perform(bridge_id)
::Ci::Bridge.find_by_id(bridge_id).try do |bridge|
diff --git a/app/workers/ci/pending_builds/update_group_worker.rb b/app/workers/ci/pending_builds/update_group_worker.rb
new file mode 100644
index 00000000000..3ee3a9116d8
--- /dev/null
+++ b/app/workers/ci/pending_builds/update_group_worker.rb
@@ -0,0 +1,19 @@
+# frozen_string_literal: true
+
+module Ci
+ module PendingBuilds
+ class UpdateGroupWorker
+ include ApplicationWorker
+ include PipelineBackgroundQueue
+
+ data_consistency :always
+ idempotent!
+
+ def perform(group_id, update_params)
+ ::Group.find_by_id(group_id).try do |group|
+ ::Ci::UpdatePendingBuildService.new(group, update_params).execute
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/ci/pending_builds/update_project_worker.rb b/app/workers/ci/pending_builds/update_project_worker.rb
new file mode 100644
index 00000000000..bac0316c80b
--- /dev/null
+++ b/app/workers/ci/pending_builds/update_project_worker.rb
@@ -0,0 +1,19 @@
+# frozen_string_literal: true
+
+module Ci
+ module PendingBuilds
+ class UpdateProjectWorker
+ include ApplicationWorker
+ include PipelineBackgroundQueue
+
+ data_consistency :always
+ idempotent!
+
+ def perform(project_id, update_params)
+ ::Project.find_by_id(project_id).try do |project|
+ ::Ci::UpdatePendingBuildService.new(project, update_params).execute
+ end
+ end
+ end
+ end
+end
diff --git a/app/workers/ci/pipeline_artifacts/create_quality_report_worker.rb b/app/workers/ci/pipeline_artifacts/create_quality_report_worker.rb
index bb0a81a0a17..dc7e8f888c6 100644
--- a/app/workers/ci/pipeline_artifacts/create_quality_report_worker.rb
+++ b/app/workers/ci/pipeline_artifacts/create_quality_report_worker.rb
@@ -10,7 +10,7 @@ module Ci
sidekiq_options retry: 3
queue_namespace :pipeline_background
- feature_category :code_testing
+ feature_category :code_quality
idempotent!
diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb
index 03a0b5fae00..d0b09c15289 100644
--- a/app/workers/concerns/application_worker.rb
+++ b/app/workers/concerns/application_worker.rb
@@ -93,9 +93,11 @@ module ApplicationWorker
end
def perform_async(*args)
+ return super if Gitlab::Database::LoadBalancing.primary_only?
+
# Worker execution for workers with data_consistency set to :delayed or :sticky
# will be delayed to give replication enough time to complete
- if utilizes_load_balancing_capabilities?
+ if utilizes_load_balancing_capabilities? && Feature.disabled?(:skip_scheduling_workers_for_replicas, default_enabled: :yaml)
perform_in(delay_interval, *args)
else
super
diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb
index 3c5a7717d70..49f0222e9c9 100644
--- a/app/workers/expire_job_cache_worker.rb
+++ b/app/workers/expire_job_cache_worker.rb
@@ -15,19 +15,10 @@ class ExpireJobCacheWorker # rubocop:disable Scalability/IdempotentWorker
idempotent!
def perform(job_id)
- job = CommitStatus.preload(:pipeline, :project).find_by_id(job_id) # rubocop: disable CodeReuse/ActiveRecord
+ job = CommitStatus.find_by_id(job_id)
return unless job
- pipeline = job.pipeline
- project = job.project
-
- Gitlab::EtagCaching::Store.new.touch(project_job_path(project, job))
- ExpirePipelineCacheWorker.perform_async(pipeline.id)
- end
-
- private
-
- def project_job_path(project, job)
- Gitlab::Routing.url_helpers.project_build_path(project, job.id, format: :json)
+ job.expire_etag_cache!
+ ExpirePipelineCacheWorker.perform_async(job.pipeline_id)
end
end
diff --git a/app/workers/issuable_export_csv_worker.rb b/app/workers/issuable_export_csv_worker.rb
index 9d543a21dc3..ffa0ed68fc7 100644
--- a/app/workers/issuable_export_csv_worker.rb
+++ b/app/workers/issuable_export_csv_worker.rb
@@ -41,7 +41,7 @@ class IssuableExportCsvWorker # rubocop:disable Scalability/IdempotentWorker
def parse_params(params, project_id)
params
- .symbolize_keys
+ .with_indifferent_access
.except(:sort)
.merge(project_id: project_id)
end
diff --git a/app/workers/issue_placement_worker.rb b/app/workers/issue_placement_worker.rb
index cfd72b90a42..26dec221f45 100644
--- a/app/workers/issue_placement_worker.rb
+++ b/app/workers/issue_placement_worker.rb
@@ -1,5 +1,8 @@
# frozen_string_literal: true
+# DEPRECATED. Will be removed in 14.7 https://gitlab.com/gitlab-org/gitlab/-/merge_requests/72803
+# Please use Issues::PlacementWorker instead
+#
# todo: remove this worker and it's queue definition from all_queues after Issues::PlacementWorker is deployed
# We want to keep it for one release in case some jobs are already scheduled in the old queue so we need the worker
# to be available to finish those. All new jobs will be queued into the new queue.
@@ -43,10 +46,10 @@ class IssuePlacementWorker
Issue.move_nulls_to_end(to_place)
Issues::BaseService.new(project: nil).rebalance_if_needed(to_place.max_by(&:relative_position))
- IssuePlacementWorker.perform_async(nil, leftover.project_id) if leftover.present?
+ Issues::PlacementWorker.perform_async(nil, leftover.project_id) if leftover.present?
rescue RelativePositioning::NoSpaceLeft => e
Gitlab::ErrorTracking.log_exception(e, issue_id: issue_id, project_id: project_id)
- IssueRebalancingWorker.perform_async(nil, *root_namespace_id_to_rebalance(issue, project_id))
+ Issues::RebalancingWorker.perform_async(nil, *root_namespace_id_to_rebalance(issue, project_id))
end
def find_issue(issue_id, project_id)
diff --git a/app/workers/issue_rebalancing_worker.rb b/app/workers/issue_rebalancing_worker.rb
index a43e76feae4..73edb2eb653 100644
--- a/app/workers/issue_rebalancing_worker.rb
+++ b/app/workers/issue_rebalancing_worker.rb
@@ -1,5 +1,8 @@
# frozen_string_literal: true
+# DEPRECATED. Will be removed in 14.7 https://gitlab.com/gitlab-org/gitlab/-/merge_requests/72803
+# Please use Issues::RebalancingWorker instead
+#
# todo: remove this worker and it's queue definition from all_queues after Issue::RebalancingWorker is released.
# We want to keep it for one release in case some jobs are already scheduled in the old queue so we need the worker
# to be available to finish those. All new jobs will be queued into the new queue.
diff --git a/app/workers/issues/rebalancing_worker.rb b/app/workers/issues/rebalancing_worker.rb
index 466617d9fa1..8de0588a2a1 100644
--- a/app/workers/issues/rebalancing_worker.rb
+++ b/app/workers/issues/rebalancing_worker.rb
@@ -17,6 +17,7 @@ module Issues
# we need to have exactly one of the project_id and root_namespace_id params be non-nil
raise ArgumentError, "Expected only one of the params project_id: #{project_id} and root_namespace_id: #{root_namespace_id}" if project_id && root_namespace_id
return if project_id.nil? && root_namespace_id.nil?
+ return if ::Gitlab::Issues::Rebalancing::State.rebalance_recently_finished?(project_id, root_namespace_id)
# pull the projects collection to be rebalanced either the project if namespace is not a group(i.e. user namesapce)
# or the root namespace, this also makes the worker backward compatible with previous version where a project_id was
diff --git a/app/workers/issues/reschedule_stuck_issue_rebalances_worker.rb b/app/workers/issues/reschedule_stuck_issue_rebalances_worker.rb
index d1759589cc0..77cedae558b 100644
--- a/app/workers/issues/reschedule_stuck_issue_rebalances_worker.rb
+++ b/app/workers/issues/reschedule_stuck_issue_rebalances_worker.rb
@@ -20,13 +20,13 @@ module Issues
namespaces = Namespace.id_in(namespace_ids)
projects = Project.id_in(project_ids)
- IssueRebalancingWorker.bulk_perform_async_with_contexts(
+ Issues::RebalancingWorker.bulk_perform_async_with_contexts(
namespaces,
arguments_proc: -> (namespace) { [nil, nil, namespace.id] },
context_proc: -> (namespace) { { namespace: namespace } }
)
- IssueRebalancingWorker.bulk_perform_async_with_contexts(
+ Issues::RebalancingWorker.bulk_perform_async_with_contexts(
projects,
arguments_proc: -> (project) { [nil, project.id, nil] },
context_proc: -> (project) { { project: project } }
diff --git a/app/workers/namespaces/process_sync_events_worker.rb b/app/workers/namespaces/process_sync_events_worker.rb
new file mode 100644
index 00000000000..f3c4f5bebb1
--- /dev/null
+++ b/app/workers/namespaces/process_sync_events_worker.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+module Namespaces
+ # This worker can be called multiple times at the same time but only one of them can
+ # process events at a time. This is ensured by `try_obtain_lease` in `Ci::ProcessSyncEventsService`.
+ # `until_executing` here is to reduce redundant worker enqueuing.
+ class ProcessSyncEventsWorker
+ include ApplicationWorker
+
+ data_consistency :always
+
+ feature_category :sharding
+ urgency :high
+
+ idempotent!
+ deduplicate :until_executing
+
+ def perform
+ ::Ci::ProcessSyncEventsService.new(::Namespaces::SyncEvent, ::Ci::NamespaceMirror).execute
+ end
+ end
+end
diff --git a/app/workers/projects/process_sync_events_worker.rb b/app/workers/projects/process_sync_events_worker.rb
new file mode 100644
index 00000000000..b7c4b4de3d0
--- /dev/null
+++ b/app/workers/projects/process_sync_events_worker.rb
@@ -0,0 +1,22 @@
+# frozen_string_literal: true
+
+module Projects
+ # This worker can be called multiple times at the same time but only one of them can
+ # process events at a time. This is ensured by `try_obtain_lease` in `Ci::ProcessSyncEventsService`.
+ # `until_executing` here is to reduce redundant worker enqueuing.
+ class ProcessSyncEventsWorker
+ include ApplicationWorker
+
+ data_consistency :always
+
+ feature_category :sharding
+ urgency :high
+
+ idempotent!
+ deduplicate :until_executing
+
+ def perform
+ ::Ci::ProcessSyncEventsService.new(::Projects::SyncEvent, ::Ci::ProjectMirror).execute
+ end
+ end
+end
diff --git a/app/workers/propagate_integration_worker.rb b/app/workers/propagate_integration_worker.rb
index 9d21d92b6e3..099f423dc0f 100644
--- a/app/workers/propagate_integration_worker.rb
+++ b/app/workers/propagate_integration_worker.rb
@@ -12,6 +12,6 @@ class PropagateIntegrationWorker
idempotent!
def perform(integration_id)
- Admin::PropagateIntegrationService.propagate(Integration.find(integration_id))
+ ::Integrations::PropagateService.propagate(Integration.find(integration_id))
end
end
diff --git a/app/workers/propagate_service_template_worker.rb b/app/workers/propagate_service_template_worker.rb
deleted file mode 100644
index 908f867279f..00000000000
--- a/app/workers/propagate_service_template_worker.rb
+++ /dev/null
@@ -1,29 +0,0 @@
-# frozen_string_literal: true
-
-# No longer in use https://gitlab.com/groups/gitlab-org/-/epics/5672
-# To be removed https://gitlab.com/gitlab-org/gitlab/-/issues/335178
-class PropagateServiceTemplateWorker # rubocop:disable Scalability/IdempotentWorker
- include ApplicationWorker
-
- data_consistency :always
-
- sidekiq_options retry: 3
-
- feature_category :integrations
-
- LEASE_TIMEOUT = 4.hours.to_i
-
- def perform(template_id)
- return unless try_obtain_lease_for(template_id)
-
- Admin::PropagateServiceTemplate.propagate(Integration.find_by_id(template_id))
- end
-
- private
-
- def try_obtain_lease_for(template_id)
- Gitlab::ExclusiveLease
- .new("propagate_service_template_worker:#{template_id}", timeout: LEASE_TIMEOUT)
- .try_obtain
- end
-end
diff --git a/app/workers/purge_dependency_proxy_cache_worker.rb b/app/workers/purge_dependency_proxy_cache_worker.rb
index db43e4adf20..615fa81f28e 100644
--- a/app/workers/purge_dependency_proxy_cache_worker.rb
+++ b/app/workers/purge_dependency_proxy_cache_worker.rb
@@ -12,14 +12,21 @@ class PurgeDependencyProxyCacheWorker
queue_namespace :dependency_proxy
feature_category :dependency_proxy
+ UPDATE_BATCH_SIZE = 100
+
def perform(current_user_id, group_id)
@current_user = User.find_by_id(current_user_id)
@group = Group.find_by_id(group_id)
return unless valid?
- @group.dependency_proxy_blobs.destroy_all # rubocop:disable Cop/DestroyAll
- @group.dependency_proxy_manifests.destroy_all # rubocop:disable Cop/DestroyAll
+ @group.dependency_proxy_blobs.each_batch(of: UPDATE_BATCH_SIZE) do |batch|
+ batch.update_all(status: :expired)
+ end
+
+ @group.dependency_proxy_manifests.each_batch(of: UPDATE_BATCH_SIZE) do |batch|
+ batch.update_all(status: :expired)
+ end
end
private
diff --git a/app/workers/todos_destroyer/private_features_worker.rb b/app/workers/todos_destroyer/private_features_worker.rb
index 150e1c8a50e..09e81216aab 100644
--- a/app/workers/todos_destroyer/private_features_worker.rb
+++ b/app/workers/todos_destroyer/private_features_worker.rb
@@ -10,7 +10,7 @@ module TodosDestroyer
include TodosDestroyerQueue
def perform(project_id, user_id = nil)
- ::Todos::Destroy::PrivateFeaturesService.new(project_id, user_id).execute
+ ::Todos::Destroy::UnauthorizedFeaturesService.new(project_id, user_id).execute
end
end
end