From 7e9c479f7de77702622631cff2628a9c8dcbc627 Mon Sep 17 00:00:00 2001 From: GitLab Bot Date: Thu, 19 Nov 2020 08:27:35 +0000 Subject: Add latest changes from gitlab-org/gitlab@13-6-stable-ee --- app/workers/all_queues.yml | 58 ++++++++++++- .../instance_statistics/counter_job_worker.rb | 10 ++- app/workers/background_migration_worker.rb | 57 ++++++++----- app/workers/build_finished_worker.rb | 5 ++ app/workers/bulk_import_worker.rb | 15 ++++ app/workers/ci/build_trace_chunk_flush_worker.rb | 2 + app/workers/ci/delete_objects_worker.rb | 8 +- app/workers/cleanup_container_repository_worker.rb | 5 +- app/workers/concerns/application_worker.rb | 2 +- app/workers/concerns/limited_capacity/worker.rb | 7 +- app/workers/concerns/reenqueuer.rb | 2 +- .../cleanup_container_repository_worker.rb | 96 ++++++++++++++++++++++ app/workers/container_expiration_policy_worker.rb | 75 +++++++++++++++-- app/workers/destroy_pages_deployments_worker.rb | 19 +++++ app/workers/git_garbage_collect_worker.rb | 19 +++-- app/workers/jira_connect/sync_branch_worker.rb | 4 +- .../jira_connect/sync_merge_request_worker.rb | 4 +- app/workers/jira_connect/sync_project_worker.rb | 30 +++++++ app/workers/post_receive.rb | 2 +- ...pagate_integration_inherit_descendant_worker.rb | 19 +++++ .../propagate_integration_inherit_worker.rb | 4 +- app/workers/purge_dependency_proxy_cache_worker.rb | 27 ++++++ app/workers/remove_expired_members_worker.rb | 10 ++- app/workers/repository_cleanup_worker.rb | 5 +- .../schedule_merge_request_cleanup_refs_worker.rb | 26 ++++++ 25 files changed, 452 insertions(+), 59 deletions(-) create mode 100644 app/workers/bulk_import_worker.rb create mode 100644 app/workers/container_expiration_policies/cleanup_container_repository_worker.rb create mode 100644 app/workers/destroy_pages_deployments_worker.rb create mode 100644 app/workers/jira_connect/sync_project_worker.rb create mode 100644 app/workers/propagate_integration_inherit_descendant_worker.rb create mode 100644 app/workers/purge_dependency_proxy_cache_worker.rb create mode 100644 app/workers/schedule_merge_request_cleanup_refs_worker.rb (limited to 'app/workers') diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 30b89f37562..6f080a97f7a 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -97,7 +97,15 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: + :idempotent: true + :tags: [] +- :name: container_repository:container_expiration_policies_cleanup_container_repository + :feature_category: :container_registry + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true :tags: [] - :name: container_repository:delete_container_repository :feature_category: :container_registry @@ -371,6 +379,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: cronjob:schedule_merge_request_cleanup_refs + :feature_category: :source_code_management + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:schedule_migrate_external_diffs :feature_category: :source_code_management :has_external_dependencies: @@ -435,6 +451,14 @@ :weight: 1 :idempotent: true :tags: [] +- :name: dependency_proxy:purge_dependency_proxy_cache + :feature_category: :dependency_proxy + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: deployment:deployments_drop_older_deployments :feature_category: :continuous_delivery :has_external_dependencies: @@ -819,6 +843,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: jira_connect:jira_connect_sync_project + :feature_category: :integrations + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: jira_importer:jira_import_advance_stage :feature_category: :importers :has_external_dependencies: @@ -1312,6 +1344,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: bulk_import + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] - :name: chat_notification :feature_category: :chatops :has_external_dependencies: true @@ -1409,6 +1449,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: destroy_pages_deployments + :feature_category: :pages + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: detect_repository_languages :feature_category: :source_code_management :has_external_dependencies: @@ -1839,6 +1887,14 @@ :weight: 1 :idempotent: true :tags: [] +- :name: propagate_integration_inherit_descendant + :feature_category: :integrations + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: propagate_integration_project :feature_category: :integrations :has_external_dependencies: diff --git a/app/workers/analytics/instance_statistics/counter_job_worker.rb b/app/workers/analytics/instance_statistics/counter_job_worker.rb index 062b5ccc207..7fc715419b8 100644 --- a/app/workers/analytics/instance_statistics/counter_job_worker.rb +++ b/app/workers/analytics/instance_statistics/counter_job_worker.rb @@ -11,18 +11,24 @@ module Analytics idempotent! def perform(measurement_identifier, min_id, max_id, recorded_at) - query_scope = ::Analytics::InstanceStatistics::Measurement::IDENTIFIER_QUERY_MAPPING[measurement_identifier].call + query_scope = ::Analytics::InstanceStatistics::Measurement.identifier_query_mapping[measurement_identifier].call count = if min_id.nil? || max_id.nil? # table is empty 0 else - Gitlab::Database::BatchCount.batch_count(query_scope, start: min_id, finish: max_id) + counter(query_scope, min_id, max_id) end return if count == Gitlab::Database::BatchCounter::FALLBACK InstanceStatistics::Measurement.insert_all([{ recorded_at: recorded_at, count: count, identifier: measurement_identifier }]) end + + private + + def counter(query_scope, min_id, max_id) + Gitlab::Database::BatchCount.batch_count(query_scope, start: min_id, finish: max_id) + end end end end diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb index 74a12dbff77..70c4ad53726 100644 --- a/app/workers/background_migration_worker.rb +++ b/app/workers/background_migration_worker.rb @@ -24,10 +24,14 @@ class BackgroundMigrationWorker # rubocop:disable Scalability/IdempotentWorker # class_name - The class name of the background migration to run. # arguments - The arguments to pass to the migration class. # lease_attempts - The number of times we will try to obtain an exclusive - # lease on the class before running anyway. Pass 0 to always run. + # lease on the class before giving up. See MR for more discussion. + # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/45298#note_434304956 def perform(class_name, arguments = [], lease_attempts = 5) with_context(caller_id: class_name.to_s) do - should_perform, ttl = perform_and_ttl(class_name) + attempts_left = lease_attempts - 1 + should_perform, ttl = perform_and_ttl(class_name, attempts_left) + + break if should_perform.nil? if should_perform Gitlab::BackgroundMigration.perform(class_name, arguments) @@ -37,32 +41,41 @@ class BackgroundMigrationWorker # rubocop:disable Scalability/IdempotentWorker # we'll reschedule the job in such a way that it is picked up again around # the time the lease expires. self.class - .perform_in(ttl || self.class.minimum_interval, class_name, arguments) + .perform_in(ttl || self.class.minimum_interval, class_name, arguments, attempts_left) end end end - def perform_and_ttl(class_name) - if always_perform? - # In test environments `perform_in` will run right away. This can then - # lead to stack level errors in the above `#perform`. To work around this - # we'll just perform the migration right away in the test environment. - [true, nil] - else - lease = lease_for(class_name) - perform = !!lease.try_obtain - - # If we managed to acquire the lease but the DB is not healthy, then we - # want to simply reschedule our job and try again _after_ the lease - # expires. - if perform && !healthy_database? - database_unhealthy_counter.increment - - perform = false - end + def perform_and_ttl(class_name, attempts_left) + # In test environments `perform_in` will run right away. This can then + # lead to stack level errors in the above `#perform`. To work around this + # we'll just perform the migration right away in the test environment. + return [true, nil] if always_perform? + + lease = lease_for(class_name) + lease_obtained = !!lease.try_obtain + healthy_db = healthy_database? + perform = lease_obtained && healthy_db + + database_unhealthy_counter.increment if lease_obtained && !healthy_db - [perform, lease.ttl] + # When the DB is unhealthy or the lease can't be obtained after several tries, + # then give up on the job and log a warning. Otherwise we could end up in + # an infinite rescheduling loop. Jobs can be tracked in the database with the + # use of Gitlab::Database::BackgroundMigrationJob + if !perform && attempts_left < 0 + msg = if !lease_obtained + 'Job could not get an exclusive lease after several tries. Giving up.' + else + 'Database was unhealthy after several tries. Giving up.' + end + + Sidekiq.logger.warn(class: class_name, message: msg, job_id: jid) + + return [nil, nil] end + + [perform, lease.ttl] end def lease_for(class_name) diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index d7a5fcf4f18..af2305528ce 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -33,6 +33,11 @@ class BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker BuildCoverageWorker.new.perform(build.id) Ci::BuildReportResultWorker.new.perform(build.id) + # TODO: As per https://gitlab.com/groups/gitlab-com/gl-infra/-/epics/194, it may be + # best to avoid creating more workers that we have no intention of calling async. + # Change the previous worker calls on top to also just call the service directly. + Ci::TestCasesService.new.execute(build) + # We execute these async as these are independent operations. BuildHooksWorker.perform_async(build.id) ExpirePipelineCacheWorker.perform_async(build.pipeline_id) if build.pipeline.cacheable? diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb new file mode 100644 index 00000000000..7828d046036 --- /dev/null +++ b/app/workers/bulk_import_worker.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + feature_category :importers + + sidekiq_options retry: false, dead: false + + worker_has_external_dependencies! + + def perform(bulk_import_id) + BulkImports::Importers::GroupsImporter.new(bulk_import_id).execute + end +end diff --git a/app/workers/ci/build_trace_chunk_flush_worker.rb b/app/workers/ci/build_trace_chunk_flush_worker.rb index 89400247a7b..a63b12c0d03 100644 --- a/app/workers/ci/build_trace_chunk_flush_worker.rb +++ b/app/workers/ci/build_trace_chunk_flush_worker.rb @@ -5,6 +5,8 @@ module Ci include ApplicationWorker include PipelineBackgroundQueue + deduplicate :until_executed + idempotent! # rubocop: disable CodeReuse/ActiveRecord diff --git a/app/workers/ci/delete_objects_worker.rb b/app/workers/ci/delete_objects_worker.rb index e34be33b438..d845ad61358 100644 --- a/app/workers/ci/delete_objects_worker.rb +++ b/app/workers/ci/delete_objects_worker.rb @@ -14,18 +14,16 @@ module Ci def remaining_work_count(*args) @remaining_work_count ||= service - .remaining_batches_count(max_batch_count: remaining_capacity) + .remaining_batches_count(max_batch_count: max_running_jobs) end def max_running_jobs - if ::Feature.enabled?(:ci_delete_objects_low_concurrency) - 2 - elsif ::Feature.enabled?(:ci_delete_objects_medium_concurrency) + if ::Feature.enabled?(:ci_delete_objects_medium_concurrency) 20 elsif ::Feature.enabled?(:ci_delete_objects_high_concurrency) 50 else - 0 + 2 end end diff --git a/app/workers/cleanup_container_repository_worker.rb b/app/workers/cleanup_container_repository_worker.rb index 80cc296fff5..1cac2858156 100644 --- a/app/workers/cleanup_container_repository_worker.rb +++ b/app/workers/cleanup_container_repository_worker.rb @@ -1,10 +1,13 @@ # frozen_string_literal: true -class CleanupContainerRepositoryWorker # rubocop:disable Scalability/IdempotentWorker +class CleanupContainerRepositoryWorker include ApplicationWorker queue_namespace :container_repository feature_category :container_registry + urgency :low + worker_resource_boundary :unknown + idempotent! loggable_arguments 2 attr_reader :container_repository, :current_user diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 30dec5159a2..d101ef100d8 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -19,7 +19,7 @@ module ApplicationWorker def structured_payload(payload = {}) context = Labkit::Context.current.to_h.merge( - 'class' => self.class, + 'class' => self.class.name, 'job_status' => 'running', 'queue' => self.class.queue, 'jid' => jid diff --git a/app/workers/concerns/limited_capacity/worker.rb b/app/workers/concerns/limited_capacity/worker.rb index c0d6bfff2f5..b5a97e49300 100644 --- a/app/workers/concerns/limited_capacity/worker.rb +++ b/app/workers/concerns/limited_capacity/worker.rb @@ -67,6 +67,7 @@ module LimitedCapacity return unless has_capacity? job_tracker.register(jid) + report_running_jobs_metrics perform_work(*args) rescue => exception raise @@ -108,11 +109,15 @@ module LimitedCapacity end def report_prometheus_metrics(*args) - running_jobs_gauge.set(prometheus_labels, running_jobs_count) + report_running_jobs_metrics remaining_work_gauge.set(prometheus_labels, remaining_work_count(*args)) max_running_jobs_gauge.set(prometheus_labels, max_running_jobs) end + def report_running_jobs_metrics + running_jobs_gauge.set(prometheus_labels, running_jobs_count) + end + def required_jobs_count(*args) [ remaining_work_count(*args), diff --git a/app/workers/concerns/reenqueuer.rb b/app/workers/concerns/reenqueuer.rb index bf6f6546c03..6f399b6d90b 100644 --- a/app/workers/concerns/reenqueuer.rb +++ b/app/workers/concerns/reenqueuer.rb @@ -13,7 +13,7 @@ # - `#lease_timeout` # # The worker spec should include `it_behaves_like 'reenqueuer'` and -# `it_behaves_like 'it is rate limited to 1 call per'`. +# `it_behaves_like '#perform is rate limited to 1 call per'`. # # Optionally override `#minimum_duration` to adjust the rate limit. # diff --git a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb new file mode 100644 index 00000000000..8c3c2e9e103 --- /dev/null +++ b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +module ContainerExpirationPolicies + class CleanupContainerRepositoryWorker + include ApplicationWorker + include LimitedCapacity::Worker + include Gitlab::Utils::StrongMemoize + + queue_namespace :container_repository + feature_category :container_registry + urgency :low + worker_resource_boundary :unknown + idempotent! + + def perform_work + return unless throttling_enabled? + return unless container_repository + + log_extra_metadata_on_done(:container_repository_id, container_repository.id) + + unless allowed_to_run?(container_repository) + container_repository.cleanup_unscheduled! + log_extra_metadata_on_done(:cleanup_status, :skipped) + return + end + + result = ContainerExpirationPolicies::CleanupService.new(container_repository) + .execute + log_extra_metadata_on_done(:cleanup_status, result.payload[:cleanup_status]) + end + + def remaining_work_count + cleanup_scheduled_count = ContainerRepository.cleanup_scheduled.count + cleanup_unfinished_count = ContainerRepository.cleanup_unfinished.count + total_count = cleanup_scheduled_count + cleanup_unfinished_count + + log_info( + cleanup_scheduled_count: cleanup_scheduled_count, + cleanup_unfinished_count: cleanup_unfinished_count, + cleanup_total_count: total_count + ) + + total_count + end + + def max_running_jobs + return 0 unless throttling_enabled? + + ::Gitlab::CurrentSettings.current_application_settings.container_registry_expiration_policies_worker_capacity + end + + private + + def allowed_to_run?(container_repository) + return false unless policy&.enabled && policy&.next_run_at + + Time.zone.now + max_cleanup_execution_time.seconds < policy.next_run_at + end + + def throttling_enabled? + Feature.enabled?(:container_registry_expiration_policies_throttling) + end + + def max_cleanup_execution_time + ::Gitlab::CurrentSettings.current_application_settings.container_registry_delete_tags_service_timeout + end + + def policy + project.container_expiration_policy + end + + def project + container_repository&.project + end + + def container_repository + strong_memoize(:container_repository) do + ContainerRepository.transaction do + # rubocop: disable CodeReuse/ActiveRecord + # We need a lock to prevent two workers from picking up the same row + container_repository = ContainerRepository.waiting_for_cleanup + .order(:expiration_policy_cleanup_status, :expiration_policy_started_at) + .limit(1) + .lock('FOR UPDATE SKIP LOCKED') + .first + # rubocop: enable CodeReuse/ActiveRecord + container_repository&.tap(&:cleanup_ongoing!) + end + end + end + + def log_info(extra_structure) + logger.info(structured_payload(extra_structure)) + end + end +end diff --git a/app/workers/container_expiration_policy_worker.rb b/app/workers/container_expiration_policy_worker.rb index 61ba27f00d2..43dbea027f2 100644 --- a/app/workers/container_expiration_policy_worker.rb +++ b/app/workers/container_expiration_policy_worker.rb @@ -3,20 +3,79 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker include CronjobQueue + include ExclusiveLeaseGuard feature_category :container_registry + InvalidPolicyError = Class.new(StandardError) + + BATCH_SIZE = 1000.freeze + def perform - ContainerExpirationPolicy.executable.preloaded.each_batch do |relation| - relation.each do |container_expiration_policy| - with_context(project: container_expiration_policy.project, - user: container_expiration_policy.project.owner) do |project:, user:| - ContainerExpirationPolicyService.new(project, user) - .execute(container_expiration_policy) - rescue ContainerExpirationPolicyService::InvalidPolicyError => e - Gitlab::ErrorTracking.log_exception(e, container_expiration_policy_id: container_expiration_policy.id) + throttling_enabled? ? perform_throttled : perform_unthrottled + end + + private + + def perform_unthrottled + with_runnable_policy(preloaded: true) do |policy| + with_context(project: policy.project, + user: policy.project.owner) do |project:, user:| + ContainerExpirationPolicyService.new(project, user) + .execute(policy) + end + end + end + + def perform_throttled + try_obtain_lease do + with_runnable_policy do |policy| + ContainerExpirationPolicy.transaction do + policy.schedule_next_run! + ContainerRepository.for_project_id(policy.id) + .each_batch do |relation| + relation.update_all(expiration_policy_cleanup_status: :cleanup_scheduled) + end end end + + ContainerExpirationPolicies::CleanupContainerRepositoryWorker.perform_with_capacity end end + + # TODO : remove the preload option when cleaning FF container_registry_expiration_policies_throttling + def with_runnable_policy(preloaded: false) + ContainerExpirationPolicy.runnable_schedules.each_batch(of: BATCH_SIZE) do |policies| + # rubocop: disable CodeReuse/ActiveRecord + cte = Gitlab::SQL::CTE.new(:batched_policies, policies.limit(BATCH_SIZE)) + # rubocop: enable CodeReuse/ActiveRecord + scope = cte.apply_to(ContainerExpirationPolicy.all).with_container_repositories + + scope = scope.preloaded if preloaded + + scope.each do |policy| + if policy.valid? + yield policy + else + disable_invalid_policy!(policy) + end + end + end + end + + def disable_invalid_policy!(policy) + policy.disable! + Gitlab::ErrorTracking.log_exception( + ::ContainerExpirationPolicyWorker::InvalidPolicyError.new, + container_expiration_policy_id: policy.id + ) + end + + def throttling_enabled? + Feature.enabled?(:container_registry_expiration_policies_throttling) + end + + def lease_timeout + 5.hours + end end diff --git a/app/workers/destroy_pages_deployments_worker.rb b/app/workers/destroy_pages_deployments_worker.rb new file mode 100644 index 00000000000..32b539325c9 --- /dev/null +++ b/app/workers/destroy_pages_deployments_worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +class DestroyPagesDeploymentsWorker + include ApplicationWorker + + idempotent! + + loggable_arguments 0, 1 + sidekiq_options retry: 3 + feature_category :pages + + def perform(project_id, last_deployment_id = nil) + project = Project.find_by_id(project_id) + + return unless project + + ::Pages::DestroyDeploymentsService.new(project, last_deployment_id).execute + end +end diff --git a/app/workers/git_garbage_collect_worker.rb b/app/workers/git_garbage_collect_worker.rb index 9071e4b8a1b..e1dcb16bafb 100644 --- a/app/workers/git_garbage_collect_worker.rb +++ b/app/workers/git_garbage_collect_worker.rb @@ -27,15 +27,15 @@ class GitGarbageCollectWorker # rubocop:disable Scalability/IdempotentWorker task = task.to_sym - if task == :gc + if gc?(task) ::Projects::GitDeduplicationService.new(project).execute cleanup_orphan_lfs_file_references(project) end - gitaly_call(task, project.repository.raw_repository) + gitaly_call(task, project) # Refresh the branch cache in case garbage collection caused a ref lookup to fail - flush_ref_caches(project) if task == :gc + flush_ref_caches(project) if gc?(task) update_repository_statistics(project) if task != :pack_refs @@ -48,6 +48,10 @@ class GitGarbageCollectWorker # rubocop:disable Scalability/IdempotentWorker private + def gc?(task) + task == :gc || task == :prune + end + def try_obtain_lease(key) ::Gitlab::ExclusiveLease.new(key, timeout: LEASE_TIMEOUT).try_obtain end @@ -64,8 +68,9 @@ class GitGarbageCollectWorker # rubocop:disable Scalability/IdempotentWorker ::Gitlab::ExclusiveLease.get_uuid(key) end - ## `repository` has to be a Gitlab::Git::Repository - def gitaly_call(task, repository) + def gitaly_call(task, project) + repository = project.repository.raw_repository + client = if task == :pack_refs Gitlab::GitalyClient::RefService.new(repository) else @@ -73,8 +78,8 @@ class GitGarbageCollectWorker # rubocop:disable Scalability/IdempotentWorker end case task - when :gc - client.garbage_collect(bitmaps_enabled?) + when :prune, :gc + client.garbage_collect(bitmaps_enabled?, prune: task == :prune) when :full_repack client.repack_full(bitmaps_enabled?) when :incremental_repack diff --git a/app/workers/jira_connect/sync_branch_worker.rb b/app/workers/jira_connect/sync_branch_worker.rb index 8c3416478fd..4c1c987353d 100644 --- a/app/workers/jira_connect/sync_branch_worker.rb +++ b/app/workers/jira_connect/sync_branch_worker.rb @@ -8,7 +8,7 @@ module JiraConnect feature_category :integrations loggable_arguments 1, 2 - def perform(project_id, branch_name, commit_shas) + def perform(project_id, branch_name, commit_shas, update_sequence_id = nil) project = Project.find_by_id(project_id) return unless project @@ -16,7 +16,7 @@ module JiraConnect branches = [project.repository.find_branch(branch_name)] if branch_name.present? commits = project.commits_by(oids: commit_shas) if commit_shas.present? - JiraConnect::SyncService.new(project).execute(commits: commits, branches: branches) + JiraConnect::SyncService.new(project).execute(commits: commits, branches: branches, update_sequence_id: update_sequence_id) end end end diff --git a/app/workers/jira_connect/sync_merge_request_worker.rb b/app/workers/jira_connect/sync_merge_request_worker.rb index b78bb8dfe16..f45ab38f35d 100644 --- a/app/workers/jira_connect/sync_merge_request_worker.rb +++ b/app/workers/jira_connect/sync_merge_request_worker.rb @@ -7,12 +7,12 @@ module JiraConnect queue_namespace :jira_connect feature_category :integrations - def perform(merge_request_id) + def perform(merge_request_id, update_sequence_id = nil) merge_request = MergeRequest.find_by_id(merge_request_id) return unless merge_request && merge_request.project - JiraConnect::SyncService.new(merge_request.project).execute(merge_requests: [merge_request]) + JiraConnect::SyncService.new(merge_request.project).execute(merge_requests: [merge_request], update_sequence_id: update_sequence_id) end end end diff --git a/app/workers/jira_connect/sync_project_worker.rb b/app/workers/jira_connect/sync_project_worker.rb new file mode 100644 index 00000000000..4d52705f207 --- /dev/null +++ b/app/workers/jira_connect/sync_project_worker.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module JiraConnect + class SyncProjectWorker + include ApplicationWorker + + queue_namespace :jira_connect + feature_category :integrations + idempotent! + worker_has_external_dependencies! + + MERGE_REQUEST_LIMIT = 400 + + def perform(project_id, update_sequence_id) + project = Project.find_by_id(project_id) + + return if project.nil? + + JiraConnect::SyncService.new(project).execute(merge_requests: merge_requests_to_sync(project), update_sequence_id: update_sequence_id) + end + + private + + # rubocop: disable CodeReuse/ActiveRecord + def merge_requests_to_sync(project) + project.merge_requests.with_jira_issue_keys.preload(:author).limit(MERGE_REQUEST_LIMIT).order(id: :desc) + end + # rubocop: enable CodeReuse/ActiveRecord + end +end diff --git a/app/workers/post_receive.rb b/app/workers/post_receive.rb index 0b224b88e4d..9fe7dd31e68 100644 --- a/app/workers/post_receive.rb +++ b/app/workers/post_receive.rb @@ -20,7 +20,7 @@ class PostReceive # rubocop:disable Scalability/IdempotentWorker changes = Base64.decode64(changes) unless changes.include?(' ') # Use Sidekiq.logger so arguments can be correlated with execution # time and thread ID's. - Sidekiq.logger.info "changes: #{changes.inspect}" if ENV['SIDEKIQ_LOG_ARGUMENTS'] + Sidekiq.logger.info "changes: #{changes.inspect}" if SidekiqLogArguments.enabled? post_received = Gitlab::GitPostReceive.new(container, identifier, changes, push_options) if repo_type.wiki? diff --git a/app/workers/propagate_integration_inherit_descendant_worker.rb b/app/workers/propagate_integration_inherit_descendant_worker.rb new file mode 100644 index 00000000000..d589619818c --- /dev/null +++ b/app/workers/propagate_integration_inherit_descendant_worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +class PropagateIntegrationInheritDescendantWorker + include ApplicationWorker + + feature_category :integrations + idempotent! + + # rubocop: disable CodeReuse/ActiveRecord + def perform(integration_id, min_id, max_id) + integration = Service.find_by_id(integration_id) + return unless integration + + batch = Service.inherited_descendants_from_self_or_ancestors_from(integration).where(id: min_id..max_id) + + BulkUpdateIntegrationService.new(integration, batch).execute + end + # rubocop: enable CodeReuse/ActiveRecord +end diff --git a/app/workers/propagate_integration_inherit_worker.rb b/app/workers/propagate_integration_inherit_worker.rb index ef3132202f6..40d67c6d3bf 100644 --- a/app/workers/propagate_integration_inherit_worker.rb +++ b/app/workers/propagate_integration_inherit_worker.rb @@ -11,9 +11,9 @@ class PropagateIntegrationInheritWorker integration = Service.find_by_id(integration_id) return unless integration - services = Service.where(id: min_id..max_id).by_type(integration.type).inherit_from_id(integration.id) + batch = Service.where(id: min_id..max_id).by_type(integration.type).inherit_from_id(integration.id) - BulkUpdateIntegrationService.new(integration, services).execute + BulkUpdateIntegrationService.new(integration, batch).execute end # rubocop: enable CodeReuse/ActiveRecord end diff --git a/app/workers/purge_dependency_proxy_cache_worker.rb b/app/workers/purge_dependency_proxy_cache_worker.rb new file mode 100644 index 00000000000..594cdd3ed11 --- /dev/null +++ b/app/workers/purge_dependency_proxy_cache_worker.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +class PurgeDependencyProxyCacheWorker + include ApplicationWorker + include Gitlab::Allowable + idempotent! + + queue_namespace :dependency_proxy + feature_category :dependency_proxy + + def perform(current_user_id, group_id) + @current_user = User.find_by_id(current_user_id) + @group = Group.find_by_id(group_id) + + return unless valid? + + @group.dependency_proxy_blobs.destroy_all # rubocop:disable Cop/DestroyAll + end + + private + + def valid? + return unless @group + + can?(@current_user, :admin_group, @group) && @group.dependency_proxy_feature_available? + end +end diff --git a/app/workers/remove_expired_members_worker.rb b/app/workers/remove_expired_members_worker.rb index f56a6cd9fa2..35844fdf297 100644 --- a/app/workers/remove_expired_members_worker.rb +++ b/app/workers/remove_expired_members_worker.rb @@ -7,11 +7,19 @@ class RemoveExpiredMembersWorker # rubocop:disable Scalability/IdempotentWorker feature_category :authentication_and_authorization worker_resource_boundary :cpu + # rubocop: disable CodeReuse/ActiveRecord def perform - Member.expired.find_each do |member| + Member.expired.preload(:user).find_each do |member| Members::DestroyService.new.execute(member, skip_authorization: true) + + expired_user = member.user + + if expired_user.project_bot? + Users::DestroyService.new(nil).execute(expired_user, skip_authorization: true) + end rescue => ex logger.error("Expired Member ID=#{member.id} cannot be removed - #{ex}") end end + # rubocop: enable CodeReuse/ActiveRecord end diff --git a/app/workers/repository_cleanup_worker.rb b/app/workers/repository_cleanup_worker.rb index 33b7223dd95..03c9add6afb 100644 --- a/app/workers/repository_cleanup_worker.rb +++ b/app/workers/repository_cleanup_worker.rb @@ -27,8 +27,9 @@ class RepositoryCleanupWorker # rubocop:disable Scalability/IdempotentWorker project = Project.find(project_id) user = User.find(user_id) - # Ensure the file is removed - project.bfg_object_map.remove! + # Ensure the file is removed and the repository is made read-write again + Projects::CleanupService.cleanup_after(project) + notification_service.repository_cleanup_failure(project, user, error) end diff --git a/app/workers/schedule_merge_request_cleanup_refs_worker.rb b/app/workers/schedule_merge_request_cleanup_refs_worker.rb new file mode 100644 index 00000000000..17cabba4278 --- /dev/null +++ b/app/workers/schedule_merge_request_cleanup_refs_worker.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +class ScheduleMergeRequestCleanupRefsWorker + include ApplicationWorker + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + + feature_category :source_code_management + idempotent! + + # Based on existing data, MergeRequestCleanupRefsWorker can run 3 jobs per + # second. This means that 180 jobs can be performed but since there are some + # spikes from time time, it's better to give it some allowance. + LIMIT = 180 + DELAY = 10.seconds + BATCH_SIZE = 30 + + def perform + return if Gitlab::Database.read_only? + + ids = MergeRequest::CleanupSchedule.scheduled_merge_request_ids(LIMIT).map { |id| [id] } + + MergeRequestCleanupRefsWorker.bulk_perform_in(DELAY, ids, batch_size: BATCH_SIZE) # rubocop:disable Scalability/BulkPerformWithContext + + log_extra_metadata_on_done(:merge_requests_count, ids.size) + end +end -- cgit v1.2.1