summaryrefslogtreecommitdiff
path: root/app/workers
diff options
context:
space:
mode:
authorRobert Speicher <rspeicher@gmail.com>2021-01-20 13:34:23 -0600
committerRobert Speicher <rspeicher@gmail.com>2021-01-20 13:34:23 -0600
commit6438df3a1e0fb944485cebf07976160184697d72 (patch)
tree00b09bfd170e77ae9391b1a2f5a93ef6839f2597 /app/workers
parent42bcd54d971da7ef2854b896a7b34f4ef8601067 (diff)
downloadgitlab-ce-6438df3a1e0fb944485cebf07976160184697d72.tar.gz
Add latest changes from gitlab-org/gitlab@13-8-stable-eev13.8.0-rc42
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/all_queues.yml78
-rw-r--r--app/workers/bulk_import_worker.rb53
-rw-r--r--app/workers/bulk_imports/entity_worker.rb23
-rw-r--r--app/workers/ci/daily_build_group_report_results_worker.rb2
-rw-r--r--app/workers/ci/pipeline_artifacts/coverage_report_worker.rb (renamed from app/workers/ci/pipelines/create_artifact_worker.rb)8
-rw-r--r--app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb23
-rw-r--r--app/workers/concerns/update_repository_storage_worker.rb42
-rw-r--r--app/workers/container_expiration_policies/cleanup_container_repository_worker.rb27
-rw-r--r--app/workers/experiments/record_conversion_event_worker.rb18
-rw-r--r--app/workers/expire_build_artifacts_worker.rb4
-rw-r--r--app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb15
-rw-r--r--app/workers/gitlab_performance_bar_stats_worker.rb3
-rw-r--r--app/workers/gitlab_usage_ping_worker.rb2
-rw-r--r--app/workers/issuable_export_csv_worker.rb56
-rw-r--r--app/workers/jira_connect/sync_branch_worker.rb3
-rw-r--r--app/workers/jira_connect/sync_deployments_worker.rb29
-rw-r--r--app/workers/jira_connect/sync_feature_flags_worker.rb24
-rw-r--r--app/workers/jira_connect/sync_merge_request_worker.rb3
-rw-r--r--app/workers/namespaces/onboarding_pipeline_created_worker.rb20
-rw-r--r--app/workers/object_pool/join_worker.rb2
-rw-r--r--app/workers/project_update_repository_storage_worker.rb32
-rw-r--r--app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb13
-rw-r--r--app/workers/snippet_update_repository_storage_worker.rb23
23 files changed, 427 insertions, 76 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml
index 1f2e8213b64..4c4a314a1e6 100644
--- a/app/workers/all_queues.yml
+++ b/app/workers/all_queues.yml
@@ -147,6 +147,14 @@
:weight: 1
:idempotent:
:tags: []
+- :name: cronjob:ci_pipeline_artifacts_expire_artifacts
+ :feature_category: :continuous_integration
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: cronjob:ci_platform_metrics_update_cron
:feature_category: :continuous_integration
:has_external_dependencies:
@@ -188,7 +196,7 @@
:idempotent:
:tags: []
- :name: cronjob:gitlab_usage_ping
- :feature_category: :collection
+ :feature_category: :usage_ping
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
@@ -881,7 +889,7 @@
:urgency: :low
:resource_boundary: :unknown
:weight: 1
- :idempotent:
+ :idempotent: true
:tags: []
- :name: jira_connect:jira_connect_sync_builds
:feature_category: :integrations
@@ -891,13 +899,29 @@
:weight: 1
:idempotent: true
:tags: []
+- :name: jira_connect:jira_connect_sync_deployments
+ :feature_category: :integrations
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
+- :name: jira_connect:jira_connect_sync_feature_flags
+ :feature_category: :integrations
+ :has_external_dependencies: true
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: jira_connect:jira_connect_sync_merge_request
:feature_category: :integrations
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
:weight: 1
- :idempotent:
+ :idempotent: true
:tags: []
- :name: jira_connect:jira_connect_sync_project
:feature_category: :integrations
@@ -1070,22 +1094,22 @@
:idempotent: true
:tags: []
- :name: pipeline_background:ci_daily_build_group_report_results
- :feature_category: :continuous_integration
+ :feature_category: :code_testing
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
-- :name: pipeline_background:ci_pipeline_success_unlock_artifacts
- :feature_category: :continuous_integration
+- :name: pipeline_background:ci_pipeline_artifacts_coverage_report
+ :feature_category: :code_testing
:has_external_dependencies:
:urgency: :low
:resource_boundary: :unknown
:weight: 1
:idempotent: true
:tags: []
-- :name: pipeline_background:ci_pipelines_create_artifact
+- :name: pipeline_background:ci_pipeline_success_unlock_artifacts
:feature_category: :continuous_integration
:has_external_dependencies:
:urgency: :low
@@ -1418,6 +1442,14 @@
:tags: []
- :name: bulk_import
:feature_category: :importers
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent:
+ :tags: []
+- :name: bulk_imports_entity
+ :feature_category: :importers
:has_external_dependencies: true
:urgency: :low
:resource_boundary: :unknown
@@ -1577,6 +1609,14 @@
:weight: 1
:idempotent:
:tags: []
+- :name: experiments_record_conversion_event
+ :feature_category: :users
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: expire_build_instance_artifacts
:feature_category: :continuous_integration
:has_external_dependencies:
@@ -1787,6 +1827,14 @@
:weight: 1
:idempotent:
:tags: []
+- :name: namespaces_onboarding_pipeline_created
+ :feature_category: :subgroups
+ :has_external_dependencies:
+ :urgency: :low
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: namespaces_onboarding_user_added
:feature_category: :users
:has_external_dependencies:
@@ -2095,6 +2143,22 @@
:weight: 1
:idempotent:
:tags: []
+- :name: snippet_schedule_bulk_repository_shard_moves
+ :feature_category: :gitaly
+ :has_external_dependencies:
+ :urgency: :throttled
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
+- :name: snippet_update_repository_storage
+ :feature_category: :gitaly
+ :has_external_dependencies:
+ :urgency: :throttled
+ :resource_boundary: :unknown
+ :weight: 1
+ :idempotent: true
+ :tags: []
- :name: system_hook_push
:feature_category: :source_code_management
:has_external_dependencies:
diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb
index 7828d046036..81099d4e5f7 100644
--- a/app/workers/bulk_import_worker.rb
+++ b/app/workers/bulk_import_worker.rb
@@ -7,9 +7,58 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker
sidekiq_options retry: false, dead: false
- worker_has_external_dependencies!
+ PERFORM_DELAY = 5.seconds
+ DEFAULT_BATCH_SIZE = 5
def perform(bulk_import_id)
- BulkImports::Importers::GroupsImporter.new(bulk_import_id).execute
+ @bulk_import = BulkImport.find_by_id(bulk_import_id)
+
+ return unless @bulk_import
+ return if @bulk_import.finished?
+ return @bulk_import.finish! if all_entities_processed? && @bulk_import.started?
+ return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running
+
+ @bulk_import.start! if @bulk_import.created?
+
+ created_entities.first(next_batch_size).each do |entity|
+ entity.start!
+
+ BulkImports::EntityWorker.perform_async(entity.id)
+ end
+
+ re_enqueue
+ end
+
+ private
+
+ def entities
+ @entities ||= @bulk_import.entities
+ end
+
+ def started_entities
+ entities.with_status(:started)
+ end
+
+ def created_entities
+ entities.with_status(:created)
+ end
+
+ def all_entities_processed?
+ entities.all? { |entity| entity.finished? || entity.failed? }
+ end
+
+ def max_batch_size_exceeded?
+ started_entities.count >= DEFAULT_BATCH_SIZE
+ end
+
+ def next_batch_size
+ [DEFAULT_BATCH_SIZE - started_entities.count, 0].max
+ end
+
+ # A new BulkImportWorker job is enqueued to either
+ # - Process the new BulkImports::Entity created during import (e.g. for the subgroups)
+ # - Or to mark the `bulk_import` as finished
+ def re_enqueue
+ BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id)
end
end
diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb
new file mode 100644
index 00000000000..9b29ad8f326
--- /dev/null
+++ b/app/workers/bulk_imports/entity_worker.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+module BulkImports
+ class EntityWorker # rubocop:disable Scalability/IdempotentWorker
+ include ApplicationWorker
+
+ feature_category :importers
+
+ sidekiq_options retry: false, dead: false
+
+ worker_has_external_dependencies!
+
+ def perform(entity_id)
+ entity = BulkImports::Entity.with_status(:started).find_by_id(entity_id)
+
+ if entity
+ entity.update!(jid: jid)
+
+ BulkImports::Importers::GroupImporter.new(entity).execute
+ end
+ end
+ end
+end
diff --git a/app/workers/ci/daily_build_group_report_results_worker.rb b/app/workers/ci/daily_build_group_report_results_worker.rb
index a6d3c485e24..687cadc6366 100644
--- a/app/workers/ci/daily_build_group_report_results_worker.rb
+++ b/app/workers/ci/daily_build_group_report_results_worker.rb
@@ -5,6 +5,8 @@ module Ci
include ApplicationWorker
include PipelineBackgroundQueue
+ feature_category :code_testing
+
idempotent!
def perform(pipeline_id)
diff --git a/app/workers/ci/pipelines/create_artifact_worker.rb b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb
index 220df975503..4de56f54f44 100644
--- a/app/workers/ci/pipelines/create_artifact_worker.rb
+++ b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb
@@ -1,16 +1,18 @@
# frozen_string_literal: true
module Ci
- module Pipelines
- class CreateArtifactWorker
+ module PipelineArtifacts
+ class CoverageReportWorker
include ApplicationWorker
include PipelineBackgroundQueue
+ feature_category :code_testing
+
idempotent!
def perform(pipeline_id)
Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline|
- Ci::Pipelines::CreateArtifactService.new.execute(pipeline)
+ Ci::PipelineArtifacts::CoverageReportService.new.execute(pipeline)
end
end
end
diff --git a/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb b/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb
new file mode 100644
index 00000000000..0bb911bc6c8
--- /dev/null
+++ b/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+module Ci
+ module PipelineArtifacts
+ class ExpireArtifactsWorker
+ include ApplicationWorker
+ # rubocop:disable Scalability/CronWorkerContext
+ # This worker does not perform work scoped to a context
+ include CronjobQueue
+ # rubocop:enable Scalability/CronWorkerContext
+
+ deduplicate :until_executed, including_scheduled: true
+ idempotent!
+ feature_category :continuous_integration
+
+ def perform
+ service = ::Ci::PipelineArtifacts::DestroyExpiredArtifactsService.new
+ artifacts_count = service.execute
+ log_extra_metadata_on_done(:destroyed_pipeline_artifacts_count, artifacts_count)
+ end
+ end
+ end
+end
diff --git a/app/workers/concerns/update_repository_storage_worker.rb b/app/workers/concerns/update_repository_storage_worker.rb
new file mode 100644
index 00000000000..f46b64895a2
--- /dev/null
+++ b/app/workers/concerns/update_repository_storage_worker.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+module UpdateRepositoryStorageWorker
+ extend ActiveSupport::Concern
+ include ApplicationWorker
+
+ included do
+ idempotent!
+ feature_category :gitaly
+ urgency :throttled
+ end
+
+ def perform(container_id, new_repository_storage_key, repository_storage_move_id = nil)
+ repository_storage_move =
+ if repository_storage_move_id
+ find_repository_storage_move(repository_storage_move_id)
+ else
+ # maintain compatibility with workers queued before release
+ container = find_container(container_id)
+ container.repository_storage_moves.create!(
+ source_storage_name: container.repository_storage,
+ destination_storage_name: new_repository_storage_key
+ )
+ end
+
+ update_repository_storage(repository_storage_move)
+ end
+
+ private
+
+ def find_repository_storage_move(repository_storage_move_id)
+ raise NotImplementedError
+ end
+
+ def find_container(container_id)
+ raise NotImplementedError
+ end
+
+ def update_repository_storage(repository_storage_move)
+ raise NotImplementedError
+ end
+end
diff --git a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb
index 8c3c2e9e103..7c86b194574 100644
--- a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb
+++ b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb
@@ -12,6 +12,14 @@ module ContainerExpirationPolicies
worker_resource_boundary :unknown
idempotent!
+ LOG_ON_DONE_FIELDS = %i[
+ cleanup_status
+ cleanup_tags_service_original_size
+ cleanup_tags_service_before_truncate_size
+ cleanup_tags_service_after_truncate_size
+ cleanup_tags_service_before_delete_size
+ ].freeze
+
def perform_work
return unless throttling_enabled?
return unless container_repository
@@ -26,7 +34,7 @@ module ContainerExpirationPolicies
result = ContainerExpirationPolicies::CleanupService.new(container_repository)
.execute
- log_extra_metadata_on_done(:cleanup_status, result.payload[:cleanup_status])
+ log_on_done(result)
end
def remaining_work_count
@@ -92,5 +100,22 @@ module ContainerExpirationPolicies
def log_info(extra_structure)
logger.info(structured_payload(extra_structure))
end
+
+ def log_on_done(result)
+ LOG_ON_DONE_FIELDS.each do |field|
+ value = result.payload[field]
+
+ next if value.nil?
+
+ log_extra_metadata_on_done(field, value)
+ end
+
+ before_truncate_size = result.payload[:cleanup_tags_service_before_truncate_size]
+ after_truncate_size = result.payload[:cleanup_tags_service_after_truncate_size]
+ truncated = before_truncate_size &&
+ after_truncate_size &&
+ before_truncate_size != after_truncate_size
+ log_extra_metadata_on_done(:cleanup_tags_service_truncated, !!truncated)
+ end
end
end
diff --git a/app/workers/experiments/record_conversion_event_worker.rb b/app/workers/experiments/record_conversion_event_worker.rb
new file mode 100644
index 00000000000..e38ce7b3d01
--- /dev/null
+++ b/app/workers/experiments/record_conversion_event_worker.rb
@@ -0,0 +1,18 @@
+# frozen_string_literal: true
+
+module Experiments
+ class RecordConversionEventWorker
+ include ApplicationWorker
+
+ feature_category :users
+ urgency :low
+
+ idempotent!
+
+ def perform(experiment, user_id)
+ return unless Gitlab::Experimentation.active?(experiment)
+
+ ::Experiment.record_conversion_event(experiment, user_id)
+ end
+ end
+end
diff --git a/app/workers/expire_build_artifacts_worker.rb b/app/workers/expire_build_artifacts_worker.rb
index 12372961250..5db9f0b67e0 100644
--- a/app/workers/expire_build_artifacts_worker.rb
+++ b/app/workers/expire_build_artifacts_worker.rb
@@ -10,6 +10,8 @@ class ExpireBuildArtifactsWorker # rubocop:disable Scalability/IdempotentWorker
feature_category :continuous_integration
def perform
- Ci::DestroyExpiredJobArtifactsService.new.execute
+ service = Ci::DestroyExpiredJobArtifactsService.new
+ artifacts_count = service.execute
+ log_extra_metadata_on_done(:destroyed_job_artifacts_count, artifacts_count)
end
end
diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
index 0809d0b7c29..790e8b0eccf 100644
--- a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
+++ b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb
@@ -11,18 +11,11 @@ module Gitlab
# client - An instance of Gitlab::GithubImport::Client.
# project - An instance of Project.
def import(client, project)
- waiter =
- if Feature.enabled?(:github_import_pull_request_reviews, project, default_enabled: true)
- waiter = Importer::PullRequestsReviewsImporter
- .new(project, client)
- .execute
+ waiter = Importer::PullRequestsReviewsImporter
+ .new(project, client)
+ .execute
- project.import_state.refresh_jid_expiration
-
- waiter
- else
- JobWaiter.new
- end
+ project.import_state.refresh_jid_expiration
AdvanceStageWorker.perform_async(
project.id,
diff --git a/app/workers/gitlab_performance_bar_stats_worker.rb b/app/workers/gitlab_performance_bar_stats_worker.rb
index d63f8111864..558df0ab7b3 100644
--- a/app/workers/gitlab_performance_bar_stats_worker.rb
+++ b/app/workers/gitlab_performance_bar_stats_worker.rb
@@ -7,12 +7,13 @@ class GitlabPerformanceBarStatsWorker
LEASE_TIMEOUT = 600
WORKER_DELAY = 120
STATS_KEY = 'performance_bar_stats:pending_request_ids'
+ STATS_KEY_EXPIRE = 30.minutes.to_i
feature_category :metrics
idempotent!
def perform(lease_uuid)
- Gitlab::Redis::SharedState.with do |redis|
+ Gitlab::Redis::Cache.with do |redis|
request_ids = fetch_request_ids(redis, lease_uuid)
stats = Gitlab::PerformanceBar::Stats.new(redis)
diff --git a/app/workers/gitlab_usage_ping_worker.rb b/app/workers/gitlab_usage_ping_worker.rb
index 1bb600bbd13..782b089261f 100644
--- a/app/workers/gitlab_usage_ping_worker.rb
+++ b/app/workers/gitlab_usage_ping_worker.rb
@@ -8,7 +8,7 @@ class GitlabUsagePingWorker # rubocop:disable Scalability/IdempotentWorker
include CronjobQueue # rubocop:disable Scalability/CronWorkerContext
include Gitlab::ExclusiveLeaseHelpers
- feature_category :collection
+ feature_category :usage_ping
sidekiq_options retry: 3, dead: false
sidekiq_retry_in { |count| (count + 1) * 8.hours.to_i }
diff --git a/app/workers/issuable_export_csv_worker.rb b/app/workers/issuable_export_csv_worker.rb
index d91ba77287f..33452b14edb 100644
--- a/app/workers/issuable_export_csv_worker.rb
+++ b/app/workers/issuable_export_csv_worker.rb
@@ -7,47 +7,45 @@ class IssuableExportCsvWorker # rubocop:disable Scalability/IdempotentWorker
worker_resource_boundary :cpu
loggable_arguments 2
- PERMITTED_TYPES = [:merge_request, :issue].freeze
-
def perform(type, current_user_id, project_id, params)
- @type = type.to_sym
- check_permitted_type!
- process_params!(params, project_id)
-
- @current_user = User.find(current_user_id)
- @project = Project.find(project_id)
- @service = service(find_objects(params))
+ user = User.find(current_user_id)
+ project = Project.find(project_id)
+ finder_params = map_params(params, project_id)
- @service.email(@current_user)
+ export_service(type.to_sym, user, project, finder_params).email(user)
+ rescue ActiveRecord::RecordNotFound => error
+ logger.error("Failed to export CSV (current_user_id:#{current_user_id}, project_id:#{project_id}): #{error.message}")
end
private
- def find_objects(params)
- case @type
- when :issue
- IssuesFinder.new(@current_user, params).execute
- when :merge_request
- MergeRequestsFinder.new(@current_user, params).execute
- end
+ def map_params(params, project_id)
+ params
+ .symbolize_keys
+ .except(:sort)
+ .merge(project_id: project_id)
end
- def service(issuables)
- case @type
+ def export_service(type, user, project, params)
+ issuable_class = service_classes_for(type)
+ issuables = issuable_class[:finder].new(user, params).execute
+ issuable_class[:service].new(issuables, project)
+ end
+
+ def service_classes_for(type)
+ case type
when :issue
- Issues::ExportCsvService.new(issuables, @project)
+ { finder: IssuesFinder, service: Issues::ExportCsvService }
when :merge_request
- MergeRequests::ExportCsvService.new(issuables, @project)
+ { finder: MergeRequestsFinder, service: MergeRequests::ExportCsvService }
+ else
+ raise ArgumentError, type_error_message(type)
end
end
- def process_params!(params, project_id)
- params.symbolize_keys!
- params[:project_id] = project_id
- params.delete(:sort)
- end
-
- def check_permitted_type!
- raise ArgumentError, "type parameter must be :issue or :merge_request, it was #{@type}" unless PERMITTED_TYPES.include?(@type)
+ def type_error_message(type)
+ "Type parameter must be :issue or :merge_request, it was #{type}"
end
end
+
+IssuableExportCsvWorker.prepend_if_ee('::EE::IssuableExportCsvWorker')
diff --git a/app/workers/jira_connect/sync_branch_worker.rb b/app/workers/jira_connect/sync_branch_worker.rb
index d7e773b0861..1af51c4bb74 100644
--- a/app/workers/jira_connect/sync_branch_worker.rb
+++ b/app/workers/jira_connect/sync_branch_worker.rb
@@ -8,8 +8,9 @@ module JiraConnect
feature_category :integrations
loggable_arguments 1, 2
worker_has_external_dependencies!
+ idempotent!
- def perform(project_id, branch_name, commit_shas, update_sequence_id = nil)
+ def perform(project_id, branch_name, commit_shas, update_sequence_id)
project = Project.find_by_id(project_id)
return unless project
diff --git a/app/workers/jira_connect/sync_deployments_worker.rb b/app/workers/jira_connect/sync_deployments_worker.rb
new file mode 100644
index 00000000000..0f261e29464
--- /dev/null
+++ b/app/workers/jira_connect/sync_deployments_worker.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+module JiraConnect
+ class SyncDeploymentsWorker
+ include ApplicationWorker
+
+ idempotent!
+ worker_has_external_dependencies!
+
+ queue_namespace :jira_connect
+ feature_category :integrations
+
+ def perform(deployment_id, sequence_id)
+ deployment = Deployment.find_by_id(deployment_id)
+
+ return unless deployment
+ return unless Feature.enabled?(:jira_sync_deployments, deployment.project)
+
+ ::JiraConnect::SyncService
+ .new(deployment.project)
+ .execute(deployments: [deployment], update_sequence_id: sequence_id)
+ end
+
+ def self.perform_async(id)
+ seq_id = ::Atlassian::JiraConnect::Client.generate_update_sequence_id
+ super(id, seq_id)
+ end
+ end
+end
diff --git a/app/workers/jira_connect/sync_feature_flags_worker.rb b/app/workers/jira_connect/sync_feature_flags_worker.rb
new file mode 100644
index 00000000000..7e98d0eada7
--- /dev/null
+++ b/app/workers/jira_connect/sync_feature_flags_worker.rb
@@ -0,0 +1,24 @@
+# frozen_string_literal: true
+
+module JiraConnect
+ class SyncFeatureFlagsWorker
+ include ApplicationWorker
+
+ idempotent!
+ worker_has_external_dependencies!
+
+ queue_namespace :jira_connect
+ feature_category :integrations
+
+ def perform(feature_flag_id, sequence_id)
+ feature_flag = ::Operations::FeatureFlag.find_by_id(feature_flag_id)
+
+ return unless feature_flag
+ return unless Feature.enabled?(:jira_sync_feature_flags, feature_flag.project)
+
+ ::JiraConnect::SyncService
+ .new(feature_flag.project)
+ .execute(feature_flags: [feature_flag], update_sequence_id: sequence_id)
+ end
+ end
+end
diff --git a/app/workers/jira_connect/sync_merge_request_worker.rb b/app/workers/jira_connect/sync_merge_request_worker.rb
index 6ef426790b3..543d8e002fe 100644
--- a/app/workers/jira_connect/sync_merge_request_worker.rb
+++ b/app/workers/jira_connect/sync_merge_request_worker.rb
@@ -6,10 +6,11 @@ module JiraConnect
queue_namespace :jira_connect
feature_category :integrations
+ idempotent!
worker_has_external_dependencies!
- def perform(merge_request_id, update_sequence_id = nil)
+ def perform(merge_request_id, update_sequence_id)
merge_request = MergeRequest.find_by_id(merge_request_id)
return unless merge_request && merge_request.project
diff --git a/app/workers/namespaces/onboarding_pipeline_created_worker.rb b/app/workers/namespaces/onboarding_pipeline_created_worker.rb
new file mode 100644
index 00000000000..e1de6d0046b
--- /dev/null
+++ b/app/workers/namespaces/onboarding_pipeline_created_worker.rb
@@ -0,0 +1,20 @@
+# frozen_string_literal: true
+
+module Namespaces
+ class OnboardingPipelineCreatedWorker
+ include ApplicationWorker
+
+ feature_category :subgroups
+ urgency :low
+
+ deduplicate :until_executing
+ idempotent!
+
+ def perform(namespace_id)
+ namespace = Namespace.find_by_id(namespace_id)
+ return unless namespace
+
+ OnboardingProgressService.new(namespace).execute(action: :pipeline_created)
+ end
+ end
+end
diff --git a/app/workers/object_pool/join_worker.rb b/app/workers/object_pool/join_worker.rb
index f1008d3be83..8103c04b507 100644
--- a/app/workers/object_pool/join_worker.rb
+++ b/app/workers/object_pool/join_worker.rb
@@ -15,7 +15,7 @@ module ObjectPool
project.link_pool_repository
- Projects::HousekeepingService.new(project).execute
+ Repositories::HousekeepingService.new(project).execute
end
end
end
diff --git a/app/workers/project_update_repository_storage_worker.rb b/app/workers/project_update_repository_storage_worker.rb
index 7c0b1ae07fa..5636eec8233 100644
--- a/app/workers/project_update_repository_storage_worker.rb
+++ b/app/workers/project_update_repository_storage_worker.rb
@@ -1,25 +1,23 @@
# frozen_string_literal: true
-class ProjectUpdateRepositoryStorageWorker
- include ApplicationWorker
+class ProjectUpdateRepositoryStorageWorker # rubocop:disable Scalability/IdempotentWorker
+ extend ::Gitlab::Utils::Override
+ include UpdateRepositoryStorageWorker
- idempotent!
- feature_category :gitaly
- urgency :throttled
+ private
- def perform(project_id, new_repository_storage_key, repository_storage_move_id = nil)
- repository_storage_move =
- if repository_storage_move_id
- ProjectRepositoryStorageMove.find(repository_storage_move_id)
- else
- # maintain compatibility with workers queued before release
- project = Project.find(project_id)
- project.repository_storage_moves.create!(
- source_storage_name: project.repository_storage,
- destination_storage_name: new_repository_storage_key
- )
- end
+ override :find_repository_storage_move
+ def find_repository_storage_move(repository_storage_move_id)
+ ProjectRepositoryStorageMove.find(repository_storage_move_id)
+ end
+
+ override :find_container
+ def find_container(container_id)
+ Project.find(container_id)
+ end
+ override :update_repository_storage
+ def update_repository_storage(repository_storage_move)
::Projects::UpdateRepositoryStorageService.new(repository_storage_move).execute
end
end
diff --git a/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb b/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb
new file mode 100644
index 00000000000..47f24ad3500
--- /dev/null
+++ b/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb
@@ -0,0 +1,13 @@
+# frozen_string_literal: true
+
+class SnippetScheduleBulkRepositoryShardMovesWorker
+ include ApplicationWorker
+
+ idempotent!
+ feature_category :gitaly
+ urgency :throttled
+
+ def perform(source_storage_name, destination_storage_name = nil)
+ Snippets::ScheduleBulkRepositoryShardMovesService.new.execute(source_storage_name, destination_storage_name)
+ end
+end
diff --git a/app/workers/snippet_update_repository_storage_worker.rb b/app/workers/snippet_update_repository_storage_worker.rb
new file mode 100644
index 00000000000..a28a02a0298
--- /dev/null
+++ b/app/workers/snippet_update_repository_storage_worker.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+class SnippetUpdateRepositoryStorageWorker # rubocop:disable Scalability/IdempotentWorker
+ extend ::Gitlab::Utils::Override
+ include UpdateRepositoryStorageWorker
+
+ private
+
+ override :find_repository_storage_move
+ def find_repository_storage_move(repository_storage_move_id)
+ SnippetRepositoryStorageMove.find(repository_storage_move_id)
+ end
+
+ override :find_container
+ def find_container(container_id)
+ Snippet.find(container_id)
+ end
+
+ override :update_repository_storage
+ def update_repository_storage(repository_storage_move)
+ ::Snippets::UpdateRepositoryStorageService.new(repository_storage_move).execute
+ end
+end