diff options
author | Robert Speicher <rspeicher@gmail.com> | 2021-01-20 13:34:23 -0600 |
---|---|---|
committer | Robert Speicher <rspeicher@gmail.com> | 2021-01-20 13:34:23 -0600 |
commit | 6438df3a1e0fb944485cebf07976160184697d72 (patch) | |
tree | 00b09bfd170e77ae9391b1a2f5a93ef6839f2597 /app/workers | |
parent | 42bcd54d971da7ef2854b896a7b34f4ef8601067 (diff) | |
download | gitlab-ce-6438df3a1e0fb944485cebf07976160184697d72.tar.gz |
Add latest changes from gitlab-org/gitlab@13-8-stable-eev13.8.0-rc42
Diffstat (limited to 'app/workers')
23 files changed, 427 insertions, 76 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 1f2e8213b64..4c4a314a1e6 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -147,6 +147,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: cronjob:ci_pipeline_artifacts_expire_artifacts + :feature_category: :continuous_integration + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:ci_platform_metrics_update_cron :feature_category: :continuous_integration :has_external_dependencies: @@ -188,7 +196,7 @@ :idempotent: :tags: [] - :name: cronjob:gitlab_usage_ping - :feature_category: :collection + :feature_category: :usage_ping :has_external_dependencies: :urgency: :low :resource_boundary: :unknown @@ -881,7 +889,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: + :idempotent: true :tags: [] - :name: jira_connect:jira_connect_sync_builds :feature_category: :integrations @@ -891,13 +899,29 @@ :weight: 1 :idempotent: true :tags: [] +- :name: jira_connect:jira_connect_sync_deployments + :feature_category: :integrations + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] +- :name: jira_connect:jira_connect_sync_feature_flags + :feature_category: :integrations + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: jira_connect:jira_connect_sync_merge_request :feature_category: :integrations :has_external_dependencies: true :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: + :idempotent: true :tags: [] - :name: jira_connect:jira_connect_sync_project :feature_category: :integrations @@ -1070,22 +1094,22 @@ :idempotent: true :tags: [] - :name: pipeline_background:ci_daily_build_group_report_results - :feature_category: :continuous_integration + :feature_category: :code_testing :has_external_dependencies: :urgency: :low :resource_boundary: :unknown :weight: 1 :idempotent: true :tags: [] -- :name: pipeline_background:ci_pipeline_success_unlock_artifacts - :feature_category: :continuous_integration +- :name: pipeline_background:ci_pipeline_artifacts_coverage_report + :feature_category: :code_testing :has_external_dependencies: :urgency: :low :resource_boundary: :unknown :weight: 1 :idempotent: true :tags: [] -- :name: pipeline_background:ci_pipelines_create_artifact +- :name: pipeline_background:ci_pipeline_success_unlock_artifacts :feature_category: :continuous_integration :has_external_dependencies: :urgency: :low @@ -1418,6 +1442,14 @@ :tags: [] - :name: bulk_import :feature_category: :importers + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] +- :name: bulk_imports_entity + :feature_category: :importers :has_external_dependencies: true :urgency: :low :resource_boundary: :unknown @@ -1577,6 +1609,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: experiments_record_conversion_event + :feature_category: :users + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: expire_build_instance_artifacts :feature_category: :continuous_integration :has_external_dependencies: @@ -1787,6 +1827,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: namespaces_onboarding_pipeline_created + :feature_category: :subgroups + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: namespaces_onboarding_user_added :feature_category: :users :has_external_dependencies: @@ -2095,6 +2143,22 @@ :weight: 1 :idempotent: :tags: [] +- :name: snippet_schedule_bulk_repository_shard_moves + :feature_category: :gitaly + :has_external_dependencies: + :urgency: :throttled + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] +- :name: snippet_update_repository_storage + :feature_category: :gitaly + :has_external_dependencies: + :urgency: :throttled + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: system_hook_push :feature_category: :source_code_management :has_external_dependencies: diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index 7828d046036..81099d4e5f7 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -7,9 +7,58 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker sidekiq_options retry: false, dead: false - worker_has_external_dependencies! + PERFORM_DELAY = 5.seconds + DEFAULT_BATCH_SIZE = 5 def perform(bulk_import_id) - BulkImports::Importers::GroupsImporter.new(bulk_import_id).execute + @bulk_import = BulkImport.find_by_id(bulk_import_id) + + return unless @bulk_import + return if @bulk_import.finished? + return @bulk_import.finish! if all_entities_processed? && @bulk_import.started? + return re_enqueue if max_batch_size_exceeded? # Do not start more jobs if max allowed are already running + + @bulk_import.start! if @bulk_import.created? + + created_entities.first(next_batch_size).each do |entity| + entity.start! + + BulkImports::EntityWorker.perform_async(entity.id) + end + + re_enqueue + end + + private + + def entities + @entities ||= @bulk_import.entities + end + + def started_entities + entities.with_status(:started) + end + + def created_entities + entities.with_status(:created) + end + + def all_entities_processed? + entities.all? { |entity| entity.finished? || entity.failed? } + end + + def max_batch_size_exceeded? + started_entities.count >= DEFAULT_BATCH_SIZE + end + + def next_batch_size + [DEFAULT_BATCH_SIZE - started_entities.count, 0].max + end + + # A new BulkImportWorker job is enqueued to either + # - Process the new BulkImports::Entity created during import (e.g. for the subgroups) + # - Or to mark the `bulk_import` as finished + def re_enqueue + BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id) end end diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb new file mode 100644 index 00000000000..9b29ad8f326 --- /dev/null +++ b/app/workers/bulk_imports/entity_worker.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module BulkImports + class EntityWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + feature_category :importers + + sidekiq_options retry: false, dead: false + + worker_has_external_dependencies! + + def perform(entity_id) + entity = BulkImports::Entity.with_status(:started).find_by_id(entity_id) + + if entity + entity.update!(jid: jid) + + BulkImports::Importers::GroupImporter.new(entity).execute + end + end + end +end diff --git a/app/workers/ci/daily_build_group_report_results_worker.rb b/app/workers/ci/daily_build_group_report_results_worker.rb index a6d3c485e24..687cadc6366 100644 --- a/app/workers/ci/daily_build_group_report_results_worker.rb +++ b/app/workers/ci/daily_build_group_report_results_worker.rb @@ -5,6 +5,8 @@ module Ci include ApplicationWorker include PipelineBackgroundQueue + feature_category :code_testing + idempotent! def perform(pipeline_id) diff --git a/app/workers/ci/pipelines/create_artifact_worker.rb b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb index 220df975503..4de56f54f44 100644 --- a/app/workers/ci/pipelines/create_artifact_worker.rb +++ b/app/workers/ci/pipeline_artifacts/coverage_report_worker.rb @@ -1,16 +1,18 @@ # frozen_string_literal: true module Ci - module Pipelines - class CreateArtifactWorker + module PipelineArtifacts + class CoverageReportWorker include ApplicationWorker include PipelineBackgroundQueue + feature_category :code_testing + idempotent! def perform(pipeline_id) Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| - Ci::Pipelines::CreateArtifactService.new.execute(pipeline) + Ci::PipelineArtifacts::CoverageReportService.new.execute(pipeline) end end end diff --git a/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb b/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb new file mode 100644 index 00000000000..0bb911bc6c8 --- /dev/null +++ b/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module Ci + module PipelineArtifacts + class ExpireArtifactsWorker + include ApplicationWorker + # rubocop:disable Scalability/CronWorkerContext + # This worker does not perform work scoped to a context + include CronjobQueue + # rubocop:enable Scalability/CronWorkerContext + + deduplicate :until_executed, including_scheduled: true + idempotent! + feature_category :continuous_integration + + def perform + service = ::Ci::PipelineArtifacts::DestroyExpiredArtifactsService.new + artifacts_count = service.execute + log_extra_metadata_on_done(:destroyed_pipeline_artifacts_count, artifacts_count) + end + end + end +end diff --git a/app/workers/concerns/update_repository_storage_worker.rb b/app/workers/concerns/update_repository_storage_worker.rb new file mode 100644 index 00000000000..f46b64895a2 --- /dev/null +++ b/app/workers/concerns/update_repository_storage_worker.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module UpdateRepositoryStorageWorker + extend ActiveSupport::Concern + include ApplicationWorker + + included do + idempotent! + feature_category :gitaly + urgency :throttled + end + + def perform(container_id, new_repository_storage_key, repository_storage_move_id = nil) + repository_storage_move = + if repository_storage_move_id + find_repository_storage_move(repository_storage_move_id) + else + # maintain compatibility with workers queued before release + container = find_container(container_id) + container.repository_storage_moves.create!( + source_storage_name: container.repository_storage, + destination_storage_name: new_repository_storage_key + ) + end + + update_repository_storage(repository_storage_move) + end + + private + + def find_repository_storage_move(repository_storage_move_id) + raise NotImplementedError + end + + def find_container(container_id) + raise NotImplementedError + end + + def update_repository_storage(repository_storage_move) + raise NotImplementedError + end +end diff --git a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb index 8c3c2e9e103..7c86b194574 100644 --- a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb +++ b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb @@ -12,6 +12,14 @@ module ContainerExpirationPolicies worker_resource_boundary :unknown idempotent! + LOG_ON_DONE_FIELDS = %i[ + cleanup_status + cleanup_tags_service_original_size + cleanup_tags_service_before_truncate_size + cleanup_tags_service_after_truncate_size + cleanup_tags_service_before_delete_size + ].freeze + def perform_work return unless throttling_enabled? return unless container_repository @@ -26,7 +34,7 @@ module ContainerExpirationPolicies result = ContainerExpirationPolicies::CleanupService.new(container_repository) .execute - log_extra_metadata_on_done(:cleanup_status, result.payload[:cleanup_status]) + log_on_done(result) end def remaining_work_count @@ -92,5 +100,22 @@ module ContainerExpirationPolicies def log_info(extra_structure) logger.info(structured_payload(extra_structure)) end + + def log_on_done(result) + LOG_ON_DONE_FIELDS.each do |field| + value = result.payload[field] + + next if value.nil? + + log_extra_metadata_on_done(field, value) + end + + before_truncate_size = result.payload[:cleanup_tags_service_before_truncate_size] + after_truncate_size = result.payload[:cleanup_tags_service_after_truncate_size] + truncated = before_truncate_size && + after_truncate_size && + before_truncate_size != after_truncate_size + log_extra_metadata_on_done(:cleanup_tags_service_truncated, !!truncated) + end end end diff --git a/app/workers/experiments/record_conversion_event_worker.rb b/app/workers/experiments/record_conversion_event_worker.rb new file mode 100644 index 00000000000..e38ce7b3d01 --- /dev/null +++ b/app/workers/experiments/record_conversion_event_worker.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Experiments + class RecordConversionEventWorker + include ApplicationWorker + + feature_category :users + urgency :low + + idempotent! + + def perform(experiment, user_id) + return unless Gitlab::Experimentation.active?(experiment) + + ::Experiment.record_conversion_event(experiment, user_id) + end + end +end diff --git a/app/workers/expire_build_artifacts_worker.rb b/app/workers/expire_build_artifacts_worker.rb index 12372961250..5db9f0b67e0 100644 --- a/app/workers/expire_build_artifacts_worker.rb +++ b/app/workers/expire_build_artifacts_worker.rb @@ -10,6 +10,8 @@ class ExpireBuildArtifactsWorker # rubocop:disable Scalability/IdempotentWorker feature_category :continuous_integration def perform - Ci::DestroyExpiredJobArtifactsService.new.execute + service = Ci::DestroyExpiredJobArtifactsService.new + artifacts_count = service.execute + log_extra_metadata_on_done(:destroyed_job_artifacts_count, artifacts_count) end end diff --git a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb index 0809d0b7c29..790e8b0eccf 100644 --- a/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb +++ b/app/workers/gitlab/github_import/stage/import_pull_requests_reviews_worker.rb @@ -11,18 +11,11 @@ module Gitlab # client - An instance of Gitlab::GithubImport::Client. # project - An instance of Project. def import(client, project) - waiter = - if Feature.enabled?(:github_import_pull_request_reviews, project, default_enabled: true) - waiter = Importer::PullRequestsReviewsImporter - .new(project, client) - .execute + waiter = Importer::PullRequestsReviewsImporter + .new(project, client) + .execute - project.import_state.refresh_jid_expiration - - waiter - else - JobWaiter.new - end + project.import_state.refresh_jid_expiration AdvanceStageWorker.perform_async( project.id, diff --git a/app/workers/gitlab_performance_bar_stats_worker.rb b/app/workers/gitlab_performance_bar_stats_worker.rb index d63f8111864..558df0ab7b3 100644 --- a/app/workers/gitlab_performance_bar_stats_worker.rb +++ b/app/workers/gitlab_performance_bar_stats_worker.rb @@ -7,12 +7,13 @@ class GitlabPerformanceBarStatsWorker LEASE_TIMEOUT = 600 WORKER_DELAY = 120 STATS_KEY = 'performance_bar_stats:pending_request_ids' + STATS_KEY_EXPIRE = 30.minutes.to_i feature_category :metrics idempotent! def perform(lease_uuid) - Gitlab::Redis::SharedState.with do |redis| + Gitlab::Redis::Cache.with do |redis| request_ids = fetch_request_ids(redis, lease_uuid) stats = Gitlab::PerformanceBar::Stats.new(redis) diff --git a/app/workers/gitlab_usage_ping_worker.rb b/app/workers/gitlab_usage_ping_worker.rb index 1bb600bbd13..782b089261f 100644 --- a/app/workers/gitlab_usage_ping_worker.rb +++ b/app/workers/gitlab_usage_ping_worker.rb @@ -8,7 +8,7 @@ class GitlabUsagePingWorker # rubocop:disable Scalability/IdempotentWorker include CronjobQueue # rubocop:disable Scalability/CronWorkerContext include Gitlab::ExclusiveLeaseHelpers - feature_category :collection + feature_category :usage_ping sidekiq_options retry: 3, dead: false sidekiq_retry_in { |count| (count + 1) * 8.hours.to_i } diff --git a/app/workers/issuable_export_csv_worker.rb b/app/workers/issuable_export_csv_worker.rb index d91ba77287f..33452b14edb 100644 --- a/app/workers/issuable_export_csv_worker.rb +++ b/app/workers/issuable_export_csv_worker.rb @@ -7,47 +7,45 @@ class IssuableExportCsvWorker # rubocop:disable Scalability/IdempotentWorker worker_resource_boundary :cpu loggable_arguments 2 - PERMITTED_TYPES = [:merge_request, :issue].freeze - def perform(type, current_user_id, project_id, params) - @type = type.to_sym - check_permitted_type! - process_params!(params, project_id) - - @current_user = User.find(current_user_id) - @project = Project.find(project_id) - @service = service(find_objects(params)) + user = User.find(current_user_id) + project = Project.find(project_id) + finder_params = map_params(params, project_id) - @service.email(@current_user) + export_service(type.to_sym, user, project, finder_params).email(user) + rescue ActiveRecord::RecordNotFound => error + logger.error("Failed to export CSV (current_user_id:#{current_user_id}, project_id:#{project_id}): #{error.message}") end private - def find_objects(params) - case @type - when :issue - IssuesFinder.new(@current_user, params).execute - when :merge_request - MergeRequestsFinder.new(@current_user, params).execute - end + def map_params(params, project_id) + params + .symbolize_keys + .except(:sort) + .merge(project_id: project_id) end - def service(issuables) - case @type + def export_service(type, user, project, params) + issuable_class = service_classes_for(type) + issuables = issuable_class[:finder].new(user, params).execute + issuable_class[:service].new(issuables, project) + end + + def service_classes_for(type) + case type when :issue - Issues::ExportCsvService.new(issuables, @project) + { finder: IssuesFinder, service: Issues::ExportCsvService } when :merge_request - MergeRequests::ExportCsvService.new(issuables, @project) + { finder: MergeRequestsFinder, service: MergeRequests::ExportCsvService } + else + raise ArgumentError, type_error_message(type) end end - def process_params!(params, project_id) - params.symbolize_keys! - params[:project_id] = project_id - params.delete(:sort) - end - - def check_permitted_type! - raise ArgumentError, "type parameter must be :issue or :merge_request, it was #{@type}" unless PERMITTED_TYPES.include?(@type) + def type_error_message(type) + "Type parameter must be :issue or :merge_request, it was #{type}" end end + +IssuableExportCsvWorker.prepend_if_ee('::EE::IssuableExportCsvWorker') diff --git a/app/workers/jira_connect/sync_branch_worker.rb b/app/workers/jira_connect/sync_branch_worker.rb index d7e773b0861..1af51c4bb74 100644 --- a/app/workers/jira_connect/sync_branch_worker.rb +++ b/app/workers/jira_connect/sync_branch_worker.rb @@ -8,8 +8,9 @@ module JiraConnect feature_category :integrations loggable_arguments 1, 2 worker_has_external_dependencies! + idempotent! - def perform(project_id, branch_name, commit_shas, update_sequence_id = nil) + def perform(project_id, branch_name, commit_shas, update_sequence_id) project = Project.find_by_id(project_id) return unless project diff --git a/app/workers/jira_connect/sync_deployments_worker.rb b/app/workers/jira_connect/sync_deployments_worker.rb new file mode 100644 index 00000000000..0f261e29464 --- /dev/null +++ b/app/workers/jira_connect/sync_deployments_worker.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module JiraConnect + class SyncDeploymentsWorker + include ApplicationWorker + + idempotent! + worker_has_external_dependencies! + + queue_namespace :jira_connect + feature_category :integrations + + def perform(deployment_id, sequence_id) + deployment = Deployment.find_by_id(deployment_id) + + return unless deployment + return unless Feature.enabled?(:jira_sync_deployments, deployment.project) + + ::JiraConnect::SyncService + .new(deployment.project) + .execute(deployments: [deployment], update_sequence_id: sequence_id) + end + + def self.perform_async(id) + seq_id = ::Atlassian::JiraConnect::Client.generate_update_sequence_id + super(id, seq_id) + end + end +end diff --git a/app/workers/jira_connect/sync_feature_flags_worker.rb b/app/workers/jira_connect/sync_feature_flags_worker.rb new file mode 100644 index 00000000000..7e98d0eada7 --- /dev/null +++ b/app/workers/jira_connect/sync_feature_flags_worker.rb @@ -0,0 +1,24 @@ +# frozen_string_literal: true + +module JiraConnect + class SyncFeatureFlagsWorker + include ApplicationWorker + + idempotent! + worker_has_external_dependencies! + + queue_namespace :jira_connect + feature_category :integrations + + def perform(feature_flag_id, sequence_id) + feature_flag = ::Operations::FeatureFlag.find_by_id(feature_flag_id) + + return unless feature_flag + return unless Feature.enabled?(:jira_sync_feature_flags, feature_flag.project) + + ::JiraConnect::SyncService + .new(feature_flag.project) + .execute(feature_flags: [feature_flag], update_sequence_id: sequence_id) + end + end +end diff --git a/app/workers/jira_connect/sync_merge_request_worker.rb b/app/workers/jira_connect/sync_merge_request_worker.rb index 6ef426790b3..543d8e002fe 100644 --- a/app/workers/jira_connect/sync_merge_request_worker.rb +++ b/app/workers/jira_connect/sync_merge_request_worker.rb @@ -6,10 +6,11 @@ module JiraConnect queue_namespace :jira_connect feature_category :integrations + idempotent! worker_has_external_dependencies! - def perform(merge_request_id, update_sequence_id = nil) + def perform(merge_request_id, update_sequence_id) merge_request = MergeRequest.find_by_id(merge_request_id) return unless merge_request && merge_request.project diff --git a/app/workers/namespaces/onboarding_pipeline_created_worker.rb b/app/workers/namespaces/onboarding_pipeline_created_worker.rb new file mode 100644 index 00000000000..e1de6d0046b --- /dev/null +++ b/app/workers/namespaces/onboarding_pipeline_created_worker.rb @@ -0,0 +1,20 @@ +# frozen_string_literal: true + +module Namespaces + class OnboardingPipelineCreatedWorker + include ApplicationWorker + + feature_category :subgroups + urgency :low + + deduplicate :until_executing + idempotent! + + def perform(namespace_id) + namespace = Namespace.find_by_id(namespace_id) + return unless namespace + + OnboardingProgressService.new(namespace).execute(action: :pipeline_created) + end + end +end diff --git a/app/workers/object_pool/join_worker.rb b/app/workers/object_pool/join_worker.rb index f1008d3be83..8103c04b507 100644 --- a/app/workers/object_pool/join_worker.rb +++ b/app/workers/object_pool/join_worker.rb @@ -15,7 +15,7 @@ module ObjectPool project.link_pool_repository - Projects::HousekeepingService.new(project).execute + Repositories::HousekeepingService.new(project).execute end end end diff --git a/app/workers/project_update_repository_storage_worker.rb b/app/workers/project_update_repository_storage_worker.rb index 7c0b1ae07fa..5636eec8233 100644 --- a/app/workers/project_update_repository_storage_worker.rb +++ b/app/workers/project_update_repository_storage_worker.rb @@ -1,25 +1,23 @@ # frozen_string_literal: true -class ProjectUpdateRepositoryStorageWorker - include ApplicationWorker +class ProjectUpdateRepositoryStorageWorker # rubocop:disable Scalability/IdempotentWorker + extend ::Gitlab::Utils::Override + include UpdateRepositoryStorageWorker - idempotent! - feature_category :gitaly - urgency :throttled + private - def perform(project_id, new_repository_storage_key, repository_storage_move_id = nil) - repository_storage_move = - if repository_storage_move_id - ProjectRepositoryStorageMove.find(repository_storage_move_id) - else - # maintain compatibility with workers queued before release - project = Project.find(project_id) - project.repository_storage_moves.create!( - source_storage_name: project.repository_storage, - destination_storage_name: new_repository_storage_key - ) - end + override :find_repository_storage_move + def find_repository_storage_move(repository_storage_move_id) + ProjectRepositoryStorageMove.find(repository_storage_move_id) + end + + override :find_container + def find_container(container_id) + Project.find(container_id) + end + override :update_repository_storage + def update_repository_storage(repository_storage_move) ::Projects::UpdateRepositoryStorageService.new(repository_storage_move).execute end end diff --git a/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb b/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb new file mode 100644 index 00000000000..47f24ad3500 --- /dev/null +++ b/app/workers/snippet_schedule_bulk_repository_shard_moves_worker.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +class SnippetScheduleBulkRepositoryShardMovesWorker + include ApplicationWorker + + idempotent! + feature_category :gitaly + urgency :throttled + + def perform(source_storage_name, destination_storage_name = nil) + Snippets::ScheduleBulkRepositoryShardMovesService.new.execute(source_storage_name, destination_storage_name) + end +end diff --git a/app/workers/snippet_update_repository_storage_worker.rb b/app/workers/snippet_update_repository_storage_worker.rb new file mode 100644 index 00000000000..a28a02a0298 --- /dev/null +++ b/app/workers/snippet_update_repository_storage_worker.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +class SnippetUpdateRepositoryStorageWorker # rubocop:disable Scalability/IdempotentWorker + extend ::Gitlab::Utils::Override + include UpdateRepositoryStorageWorker + + private + + override :find_repository_storage_move + def find_repository_storage_move(repository_storage_move_id) + SnippetRepositoryStorageMove.find(repository_storage_move_id) + end + + override :find_container + def find_container(container_id) + Snippet.find(container_id) + end + + override :update_repository_storage + def update_repository_storage(repository_storage_move) + ::Snippets::UpdateRepositoryStorageService.new(repository_storage_move).execute + end +end |