diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-04-20 23:50:22 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-04-20 23:50:22 +0000 |
commit | 9dc93a4519d9d5d7be48ff274127136236a3adb3 (patch) | |
tree | 70467ae3692a0e35e5ea56bcb803eb512a10bedb /app/workers | |
parent | 4b0f34b6d759d6299322b3a54453e930c6121ff0 (diff) | |
download | gitlab-ce-9dc93a4519d9d5d7be48ff274127136236a3adb3.tar.gz |
Add latest changes from gitlab-org/gitlab@13-11-stable-eev13.11.0-rc43
Diffstat (limited to 'app/workers')
41 files changed, 763 insertions, 65 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index ff26aa7a4be..fa6ea54e342 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -25,7 +25,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: true + :idempotent: :tags: [] - :name: authorized_project_update:authorized_project_update_user_refresh_with_low_urgency :feature_category: :authentication_and_authorization @@ -187,6 +187,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: cronjob:database_batched_background_migration + :feature_category: :database + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:environments_auto_stop_cron :feature_category: :continuous_delivery :has_external_dependencies: @@ -435,6 +443,22 @@ :weight: 1 :idempotent: :tags: [] +- :name: cronjob:ssh_keys_expired_notification + :feature_category: :compliance_management + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] +- :name: cronjob:ssh_keys_expiring_soon_notification + :feature_category: :compliance_management + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:stuck_ci_jobs :feature_category: :continuous_integration :has_external_dependencies: @@ -1083,6 +1107,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: package_repositories:packages_go_sync_packages + :feature_category: :package_registry + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: package_repositories:packages_maven_metadata_sync :feature_category: :package_registry :has_external_dependencies: @@ -1099,6 +1131,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: package_repositories:packages_rubygems_extraction + :feature_category: :package_registry + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: pipeline_background:archive_trace :feature_category: :continuous_integration :has_external_dependencies: @@ -1187,6 +1227,14 @@ :weight: 4 :idempotent: :tags: [] +- :name: pipeline_creation:merge_requests_create_pipeline + :feature_category: :continuous_integration + :has_external_dependencies: + :urgency: :high + :resource_boundary: :cpu + :weight: 4 + :idempotent: true + :tags: [] - :name: pipeline_creation:run_pipeline_schedule :feature_category: :continuous_integration :has_external_dependencies: @@ -1203,6 +1251,22 @@ :weight: 3 :idempotent: :tags: [] +- :name: pipeline_default:ci_drop_pipeline + :feature_category: :continuous_integration + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 3 + :idempotent: true + :tags: [] +- :name: pipeline_default:ci_merge_requests_add_todo_when_build_fails + :feature_category: :continuous_integration + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 3 + :idempotent: true + :tags: [] - :name: pipeline_default:ci_pipeline_bridge_status :feature_category: :continuous_integration :has_external_dependencies: @@ -1283,6 +1347,14 @@ :weight: 5 :idempotent: :tags: [] +- :name: pipeline_processing:ci_initial_pipeline_process + :feature_category: :continuous_integration + :has_external_dependencies: + :urgency: :high + :resource_boundary: :unknown + :weight: 5 + :idempotent: true + :tags: [] - :name: pipeline_processing:ci_resource_groups_assign_resource_from_resource_group :feature_category: :continuous_delivery :has_external_dependencies: @@ -1355,6 +1427,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: todos_destroyer:todos_destroyer_destroyed_issuable + :feature_category: :issue_tracking + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: todos_destroyer:todos_destroyer_entity_leave :feature_category: :issue_tracking :has_external_dependencies: @@ -1475,6 +1555,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: bulk_imports_pipeline + :feature_category: :importers + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] - :name: chat_notification :feature_category: :chatops :has_external_dependencies: true @@ -1781,9 +1869,9 @@ :idempotent: true :tags: [] - :name: mailers - :feature_category: + :feature_category: :issue_tracking :has_external_dependencies: - :urgency: + :urgency: low :resource_boundary: :weight: 2 :idempotent: @@ -1812,6 +1900,14 @@ :weight: 1 :idempotent: true :tags: [] +- :name: merge_requests_assignees_change + :feature_category: :source_code_management + :has_external_dependencies: + :urgency: :high + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: merge_requests_delete_source_branch :feature_category: :source_code_management :has_external_dependencies: @@ -1820,6 +1916,22 @@ :weight: 1 :idempotent: true :tags: [] +- :name: merge_requests_handle_assignees_change + :feature_category: :code_review + :has_external_dependencies: + :urgency: :high + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] +- :name: merge_requests_resolve_todos + :feature_category: :code_review + :has_external_dependencies: + :urgency: :high + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: metrics_dashboard_prune_old_annotations :feature_category: :metrics :has_external_dependencies: @@ -2056,6 +2168,14 @@ :weight: 1 :idempotent: :tags: [] +- :name: projects_post_creation + :feature_category: :source_code_management + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: projects_schedule_bulk_repository_shard_moves :feature_category: :gitaly :has_external_dependencies: @@ -2273,7 +2393,7 @@ :idempotent: :tags: [] - :name: update_highest_role - :feature_category: :authentication_and_authorization + :feature_category: :utilization :has_external_dependencies: :urgency: :high :resource_boundary: :unknown diff --git a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb index 9bd1ad2ed30..6635c322ab8 100644 --- a/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb +++ b/app/workers/authorized_project_update/user_refresh_over_user_range_worker.rb @@ -1,18 +1,49 @@ # frozen_string_literal: true module AuthorizedProjectUpdate - class UserRefreshOverUserRangeWorker + class UserRefreshOverUserRangeWorker # rubocop:disable Scalability/IdempotentWorker + # When the feature flag named `periodic_project_authorization_update_via_replica` is enabled, + # this worker checks if a specific user requires an update to their project_authorizations records. + # This check is done via the data read from the database replica (and not from the primary). + # If this check returns true, a completely new Sidekiq job is enqueued for this specific user + # so as to update its project_authorizations records. + + # There is a possibility that the data in the replica is lagging behind the primary + # and hence it becomes very important that we check if an update is indeed required for this user + # once again via the primary database, which is the reason why we enqueue a completely new Sidekiq job + # via `UserRefreshWithLowUrgencyWorker` for this user. + include ApplicationWorker feature_category :authentication_and_authorization urgency :low queue_namespace :authorized_project_update + # This job will not be deduplicated since it is marked with + # `data_consistency :delayed` and not `idempotent!` + # See https://gitlab.com/gitlab-org/gitlab/-/issues/325291 deduplicate :until_executing, including_scheduled: true - - idempotent! + data_consistency :delayed, feature_flag: :periodic_project_authorization_update_via_replica def perform(start_user_id, end_user_id) - AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute + if Feature.enabled?(:periodic_project_authorization_update_via_replica) + User.where(id: start_user_id..end_user_id).find_each do |user| # rubocop: disable CodeReuse/ActiveRecord + enqueue_project_authorizations_refresh(user) if project_authorizations_needs_refresh?(user) + end + else + AuthorizedProjectUpdate::RecalculateForUserRangeService.new(start_user_id, end_user_id).execute + end + end + + private + + def project_authorizations_needs_refresh?(user) + AuthorizedProjectUpdate::FindRecordsDueForRefreshService.new(user).needs_refresh? + end + + def enqueue_project_authorizations_refresh(user) + with_context(user: user) do + AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker.perform_async(user.id) + end end end end diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index 3f99b30fdf7..aeda8d113ac 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -37,6 +37,10 @@ class BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker ExpirePipelineCacheWorker.perform_async(build.pipeline_id) ChatNotificationWorker.perform_async(build.id) if build.pipeline.chat? + if build.failed? + ::Ci::MergeRequests::AddTodoWhenBuildFailsWorker.perform_async(build.id) + end + ## # We want to delay sending a build trace to object storage operation to # validate that this fixes a race condition between this and flushing live diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb index ce4aa7229aa..5e05063f058 100644 --- a/app/workers/build_hooks_worker.rb +++ b/app/workers/build_hooks_worker.rb @@ -7,6 +7,7 @@ class BuildHooksWorker # rubocop:disable Scalability/IdempotentWorker queue_namespace :pipeline_hooks feature_category :continuous_integration urgency :high + data_consistency :delayed, feature_flag: :load_balancing_for_build_hooks_worker # rubocop: disable CodeReuse/ActiveRecord def perform(build_id) diff --git a/app/workers/bulk_import_worker.rb b/app/workers/bulk_import_worker.rb index e6bc54895a7..b4b9d9b05c1 100644 --- a/app/workers/bulk_import_worker.rb +++ b/app/workers/bulk_import_worker.rb @@ -21,9 +21,11 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker @bulk_import.start! if @bulk_import.created? created_entities.first(next_batch_size).each do |entity| - entity.start! + create_pipeline_tracker_for(entity) BulkImports::EntityWorker.perform_async(entity.id) + + entity.start! end re_enqueue @@ -65,4 +67,13 @@ class BulkImportWorker # rubocop:disable Scalability/IdempotentWorker def re_enqueue BulkImportWorker.perform_in(PERFORM_DELAY, @bulk_import.id) end + + def create_pipeline_tracker_for(entity) + BulkImports::Stage.pipelines.each do |stage, pipeline| + entity.trackers.create!( + stage: stage, + pipeline_name: pipeline + ) + end + end end diff --git a/app/workers/bulk_imports/entity_worker.rb b/app/workers/bulk_imports/entity_worker.rb index 5b41ccbdea1..7f173b738cf 100644 --- a/app/workers/bulk_imports/entity_worker.rb +++ b/app/workers/bulk_imports/entity_worker.rb @@ -10,24 +10,47 @@ module BulkImports worker_has_external_dependencies! - def perform(entity_id) - entity = BulkImports::Entity.with_status(:started).find_by_id(entity_id) + def perform(entity_id, current_stage = nil) + return if stage_running?(entity_id, current_stage) + + logger.info( + worker: self.class.name, + entity_id: entity_id, + current_stage: current_stage + ) + + next_pipeline_trackers_for(entity_id).each do |pipeline_tracker| + BulkImports::PipelineWorker.perform_async( + pipeline_tracker.id, + pipeline_tracker.stage, + entity_id + ) + end + rescue => e + logger.error( + worker: self.class.name, + entity_id: entity_id, + current_stage: current_stage, + error_message: e.message + ) + + Gitlab::ErrorTracking.track_exception(e, entity_id: entity_id) + end - if entity - entity.update!(jid: jid) + private - BulkImports::Importers::GroupImporter.new(entity).execute - end + def stage_running?(entity_id, stage) + return unless stage - rescue => e - extra = { - bulk_import_id: entity&.bulk_import&.id, - entity_id: entity&.id - } + BulkImports::Tracker.stage_running?(entity_id, stage) + end - Gitlab::ErrorTracking.track_exception(e, extra) + def next_pipeline_trackers_for(entity_id) + BulkImports::Tracker.next_pipeline_trackers_for(entity_id) + end - entity&.fail_op + def logger + @logger ||= Gitlab::Import::Logger.build end end end diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb new file mode 100644 index 00000000000..a6de3c36205 --- /dev/null +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +module BulkImports + class PipelineWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + feature_category :importers + + sidekiq_options retry: false, dead: false + + worker_has_external_dependencies! + + def perform(pipeline_tracker_id, stage, entity_id) + pipeline_tracker = ::BulkImports::Tracker + .with_status(:created) + .find_by_id(pipeline_tracker_id) + + if pipeline_tracker.present? + logger.info( + worker: self.class.name, + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name + ) + + run(pipeline_tracker) + else + logger.error( + worker: self.class.name, + entity_id: entity_id, + pipeline_tracker_id: pipeline_tracker_id, + message: 'Unstarted pipeline not found' + ) + end + + ensure + ::BulkImports::EntityWorker.perform_async(entity_id, stage) + end + + private + + def run(pipeline_tracker) + pipeline_tracker.update!(status_event: 'start', jid: jid) + + context = ::BulkImports::Pipeline::Context.new(pipeline_tracker) + + pipeline_tracker.pipeline_class.new(context).run + + pipeline_tracker.finish! + rescue => e + pipeline_tracker.fail_op! + + logger.error( + worker: self.class.name, + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name, + message: e.message + ) + + Gitlab::ErrorTracking.track_exception( + e, + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name + ) + end + + def logger + @logger ||= Gitlab::Import::Logger.build + end + end +end diff --git a/app/workers/chaos/kill_worker.rb b/app/workers/chaos/kill_worker.rb index 3dedd47a1f9..4148c139d42 100644 --- a/app/workers/chaos/kill_worker.rb +++ b/app/workers/chaos/kill_worker.rb @@ -7,8 +7,8 @@ module Chaos sidekiq_options retry: false - def perform - Gitlab::Chaos.kill + def perform(signal) + Gitlab::Chaos.kill(signal) end end end diff --git a/app/workers/ci/drop_pipeline_worker.rb b/app/workers/ci/drop_pipeline_worker.rb new file mode 100644 index 00000000000..d19157a47e8 --- /dev/null +++ b/app/workers/ci/drop_pipeline_worker.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Ci + class DropPipelineWorker + include ApplicationWorker + include PipelineQueue + + idempotent! + + def perform(pipeline_id, failure_reason) + Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| + Ci::DropPipelineService.new.execute(pipeline, failure_reason.to_sym) + end + end + end +end diff --git a/app/workers/ci/initial_pipeline_process_worker.rb b/app/workers/ci/initial_pipeline_process_worker.rb new file mode 100644 index 00000000000..f59726c87fb --- /dev/null +++ b/app/workers/ci/initial_pipeline_process_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module Ci + class InitialPipelineProcessWorker + include ApplicationWorker + include PipelineQueue + + queue_namespace :pipeline_processing + feature_category :continuous_integration + urgency :high + loggable_arguments 1 + idempotent! + + def perform(pipeline_id) + Ci::Pipeline.find_by_id(pipeline_id).try do |pipeline| + Ci::ProcessPipelineService + .new(pipeline) + .execute + end + end + end +end diff --git a/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb b/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb new file mode 100644 index 00000000000..d5e097dc2b5 --- /dev/null +++ b/app/workers/ci/merge_requests/add_todo_when_build_fails_worker.rb @@ -0,0 +1,21 @@ +# frozen_string_literal: true +module Ci + module MergeRequests + class AddTodoWhenBuildFailsWorker + include ApplicationWorker + include PipelineQueue + + urgency :low + idempotent! + + def perform(job_id) + job = ::CommitStatus.with_pipeline.find_by_id(job_id) + project = job&.project + + return unless job && project + + ::MergeRequests::AddTodoWhenBuildFailsService.new(job.project, nil).execute(job) + end + end + end +end diff --git a/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb b/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb index 0bb911bc6c8..fff979d95a9 100644 --- a/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb +++ b/app/workers/ci/pipeline_artifacts/expire_artifacts_worker.rb @@ -14,7 +14,7 @@ module Ci feature_category :continuous_integration def perform - service = ::Ci::PipelineArtifacts::DestroyExpiredArtifactsService.new + service = ::Ci::PipelineArtifacts::DestroyAllExpiredService.new artifacts_count = service.execute log_extra_metadata_on_done(:destroyed_pipeline_artifacts_count, artifacts_count) end diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index d101ef100d8..0de26e27631 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -18,7 +18,7 @@ module ApplicationWorker set_queue def structured_payload(payload = {}) - context = Labkit::Context.current.to_h.merge( + context = Gitlab::ApplicationContext.current.merge( 'class' => self.class.name, 'job_status' => 'running', 'queue' => self.class.queue, diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb index 955387b5ad4..b89d6bba72c 100644 --- a/app/workers/concerns/cronjob_queue.rb +++ b/app/workers/concerns/cronjob_queue.rb @@ -15,7 +15,7 @@ module CronjobQueue # Cronjobs never get scheduled with arguments, so this is safe to # override def context_for_arguments(_args) - return if Gitlab::ApplicationContext.current_context_include?('meta.caller_id') + return if Gitlab::ApplicationContext.current_context_include?(:caller_id) Gitlab::ApplicationContext.new(caller_id: "Cronjob") end diff --git a/app/workers/concerns/each_shard_worker.rb b/app/workers/concerns/each_shard_worker.rb index 00f589f957e..d1d558f55fe 100644 --- a/app/workers/concerns/each_shard_worker.rb +++ b/app/workers/concerns/each_shard_worker.rb @@ -24,7 +24,13 @@ module EachShardWorker end def healthy_ready_shards - ready_shards.select(&:success) + success_checks, failed_checks = ready_shards.partition(&:success) + + if failed_checks.any? + ::Gitlab::AppLogger.error(message: 'Excluding unhealthy shards', failed_checks: failed_checks.map(&:payload), class: self.class.name) + end + + success_checks end def ready_shards diff --git a/app/workers/concerns/reactive_cacheable_worker.rb b/app/workers/concerns/reactive_cacheable_worker.rb index 189b0607605..9e882c8ac7a 100644 --- a/app/workers/concerns/reactive_cacheable_worker.rb +++ b/app/workers/concerns/reactive_cacheable_worker.rb @@ -17,10 +17,10 @@ module ReactiveCacheableWorker def perform(class_name, id, *args) klass = begin - class_name.constantize - rescue NameError - nil - end + class_name.constantize + rescue NameError + nil + end return unless klass diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index 042508d08f2..6f99fd089ac 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -11,6 +11,8 @@ module WorkerAttributes # Urgencies that workers can declare through the `urgencies` attribute VALID_URGENCIES = [:high, :low, :throttled].freeze + VALID_DATA_CONSISTENCIES = [:always, :sticky, :delayed].freeze + NAMESPACE_WEIGHTS = { auto_devops: 2, auto_merge: 3, @@ -69,6 +71,35 @@ module WorkerAttributes class_attributes[:urgency] || :low end + def data_consistency(data_consistency, feature_flag: nil) + raise ArgumentError, "Invalid data consistency: #{data_consistency}" unless VALID_DATA_CONSISTENCIES.include?(data_consistency) + raise ArgumentError, 'Data consistency is already set' if class_attributes[:data_consistency] + + class_attributes[:data_consistency_feature_flag] = feature_flag if feature_flag + class_attributes[:data_consistency] = data_consistency + + validate_worker_attributes! + end + + def validate_worker_attributes! + # Since the deduplication should always take into account the latest binary replication pointer into account, + # not the first one, the deduplication will not work with sticky or delayed. + # Follow up issue to improve this: https://gitlab.com/gitlab-org/gitlab/-/issues/325291 + if idempotent? && get_data_consistency != :always + raise ArgumentError, "Class can't be marked as idempotent if data_consistency is not set to :always" + end + end + + def get_data_consistency + class_attributes[:data_consistency] || :always + end + + def get_data_consistency_feature_flag_enabled? + return true unless class_attributes[:data_consistency_feature_flag] + + Feature.enabled?(class_attributes[:data_consistency_feature_flag], default_enabled: :yaml) + end + # Set this attribute on a job when it will call to services outside of the # application, such as 3rd party applications, other k8s clusters etc See # doc/development/sidekiq_style_guide.md#jobs-with-external-dependencies for @@ -96,6 +127,8 @@ module WorkerAttributes def idempotent! class_attributes[:idempotent] = true + + validate_worker_attributes! end def idempotent? diff --git a/app/workers/container_expiration_policy_worker.rb b/app/workers/container_expiration_policy_worker.rb index 43dbea027f2..5ca89179099 100644 --- a/app/workers/container_expiration_policy_worker.rb +++ b/app/workers/container_expiration_policy_worker.rb @@ -9,7 +9,7 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo InvalidPolicyError = Class.new(StandardError) - BATCH_SIZE = 1000.freeze + BATCH_SIZE = 1000 def perform throttling_enabled? ? perform_throttled : perform_unthrottled @@ -29,13 +29,15 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo def perform_throttled try_obtain_lease do - with_runnable_policy do |policy| - ContainerExpirationPolicy.transaction do - policy.schedule_next_run! - ContainerRepository.for_project_id(policy.id) - .each_batch do |relation| - relation.update_all(expiration_policy_cleanup_status: :cleanup_scheduled) - end + unless loopless_enabled? + with_runnable_policy do |policy| + ContainerExpirationPolicy.transaction do + policy.schedule_next_run! + ContainerRepository.for_project_id(policy.id) + .each_batch do |relation| + relation.update_all(expiration_policy_cleanup_status: :cleanup_scheduled) + end + end end end @@ -75,6 +77,10 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo Feature.enabled?(:container_registry_expiration_policies_throttling) end + def loopless_enabled? + Feature.enabled?(:container_registry_expiration_policies_loopless) + end + def lease_timeout 5.hours end diff --git a/app/workers/database/batched_background_migration_worker.rb b/app/workers/database/batched_background_migration_worker.rb new file mode 100644 index 00000000000..de274d58ad7 --- /dev/null +++ b/app/workers/database/batched_background_migration_worker.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +module Database + class BatchedBackgroundMigrationWorker + include ApplicationWorker + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + + feature_category :database + idempotent! + + LEASE_TIMEOUT_MULTIPLIER = 3 + MINIMUM_LEASE_TIMEOUT = 10.minutes.freeze + INTERVAL_VARIANCE = 5.seconds.freeze + + def perform + return unless Feature.enabled?(:execute_batched_migrations_on_schedule, type: :ops) && active_migration + + with_exclusive_lease(active_migration.interval) do + # Now that we have the exclusive lease, reload migration in case another process has changed it. + # This is a temporary solution until we have better concurrency handling around job execution + # + # We also have to disable this cop, because ApplicationRecord aliases reset to reload, but our database + # models don't inherit from ApplicationRecord + active_migration.reload # rubocop:disable Cop/ActiveRecordAssociationReload + + run_active_migration if active_migration.active? && active_migration.interval_elapsed?(variance: INTERVAL_VARIANCE) + end + end + + private + + def active_migration + @active_migration ||= Gitlab::Database::BackgroundMigration::BatchedMigration.active_migration + end + + def run_active_migration + Gitlab::Database::BackgroundMigration::BatchedMigrationRunner.new.run_migration_job(active_migration) + end + + def with_exclusive_lease(interval) + timeout = max(interval * LEASE_TIMEOUT_MULTIPLIER, MINIMUM_LEASE_TIMEOUT) + lease = Gitlab::ExclusiveLease.new(lease_key, timeout: timeout) + + yield if lease.try_obtain + ensure + lease&.cancel + end + + def max(left, right) + left >= right ? left : right + end + + def lease_key + self.class.name.demodulize.underscore + end + end +end diff --git a/app/workers/delete_stored_files_worker.rb b/app/workers/delete_stored_files_worker.rb index 689ac3dd0ce..9cf5631b7d8 100644 --- a/app/workers/delete_stored_files_worker.rb +++ b/app/workers/delete_stored_files_worker.rb @@ -9,8 +9,8 @@ class DeleteStoredFilesWorker # rubocop:disable Scalability/IdempotentWorker def perform(class_name, keys) klass = begin class_name.constantize - rescue NameError - nil + rescue NameError + nil end unless klass diff --git a/app/workers/emails_on_push_worker.rb b/app/workers/emails_on_push_worker.rb index 1a34bf50d87..978b65802dd 100644 --- a/app/workers/emails_on_push_worker.rb +++ b/app/workers/emails_on_push_worker.rb @@ -56,7 +56,7 @@ class EmailsOnPushWorker # rubocop:disable Scalability/IdempotentWorker end end - valid_recipients(recipients).each do |recipient| + EmailsOnPushService.valid_recipients(recipients).each do |recipient| send_email( recipient, project_id, @@ -92,10 +92,4 @@ class EmailsOnPushWorker # rubocop:disable Scalability/IdempotentWorker email.header[:skip_premailer] = true if skip_premailer email.deliver_now end - - def valid_recipients(recipients) - recipients.split.select do |recipient| - recipient.include?('@') - end.uniq(&:downcase) - end end diff --git a/app/workers/expire_build_artifacts_worker.rb b/app/workers/expire_build_artifacts_worker.rb index 5db9f0b67e0..50fdd046491 100644 --- a/app/workers/expire_build_artifacts_worker.rb +++ b/app/workers/expire_build_artifacts_worker.rb @@ -10,7 +10,7 @@ class ExpireBuildArtifactsWorker # rubocop:disable Scalability/IdempotentWorker feature_category :continuous_integration def perform - service = Ci::DestroyExpiredJobArtifactsService.new + service = Ci::JobArtifacts::DestroyAllExpiredService.new artifacts_count = service.execute log_extra_metadata_on_done(:destroyed_job_artifacts_count, artifacts_count) end diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb index 77b0edfd7de..48bb1160ae8 100644 --- a/app/workers/expire_job_cache_worker.rb +++ b/app/workers/expire_job_cache_worker.rb @@ -10,7 +10,7 @@ class ExpireJobCacheWorker # rubocop: disable CodeReuse/ActiveRecord def perform(job_id) - job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) + job = CommitStatus.eager_load_pipeline.find_by(id: job_id) return unless job pipeline = job.pipeline diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index 02039e40e15..cbea46cdccd 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -12,7 +12,7 @@ class ExpirePipelineCacheWorker # rubocop: disable CodeReuse/ActiveRecord def perform(pipeline_id) - pipeline = Ci::Pipeline.find_by(id: pipeline_id) + pipeline = Ci::Pipeline.eager_load_project.find_by(id: pipeline_id) return unless pipeline Ci::ExpirePipelineCacheService.new.execute(pipeline) diff --git a/app/workers/irker_worker.rb b/app/workers/irker_worker.rb index 687fb1cd02a..c5bdb3e0970 100644 --- a/app/workers/irker_worker.rb +++ b/app/workers/irker_worker.rb @@ -147,8 +147,7 @@ class IrkerWorker # rubocop:disable Scalability/IdempotentWorker def files_count(commit) diff_size = commit.raw_deltas.size - files = "#{diff_size} file".pluralize(diff_size) - files + "#{diff_size} file".pluralize(diff_size) end def colorize_sha(sha) diff --git a/app/workers/merge_requests/assignees_change_worker.rb b/app/workers/merge_requests/assignees_change_worker.rb new file mode 100644 index 00000000000..9865563e357 --- /dev/null +++ b/app/workers/merge_requests/assignees_change_worker.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +class MergeRequests::AssigneesChangeWorker + include ApplicationWorker + + feature_category :source_code_management + urgency :high + deduplicate :until_executed + idempotent! + + def perform(merge_request_id, user_id, old_assignee_ids) + merge_request = MergeRequest.find(merge_request_id) + current_user = User.find(user_id) + + # if a user was added and then removed, or removed and then added + # while waiting for this job to run, assume that nothing happened. + users = User.id_in(old_assignee_ids - merge_request.assignee_ids) + + return if users.blank? + + ::MergeRequests::HandleAssigneesChangeService + .new(merge_request.target_project, current_user) + .execute(merge_request, users, execute_hooks: true) + rescue ActiveRecord::RecordNotFound + end +end diff --git a/app/workers/merge_requests/create_pipeline_worker.rb b/app/workers/merge_requests/create_pipeline_worker.rb new file mode 100644 index 00000000000..244ba1af300 --- /dev/null +++ b/app/workers/merge_requests/create_pipeline_worker.rb @@ -0,0 +1,28 @@ +# frozen_string_literal: true + +module MergeRequests + class CreatePipelineWorker + include ApplicationWorker + include PipelineQueue + + queue_namespace :pipeline_creation + feature_category :continuous_integration + urgency :high + worker_resource_boundary :cpu + idempotent! + + def perform(project_id, user_id, merge_request_id) + project = Project.find_by_id(project_id) + return unless project + + user = User.find_by_id(user_id) + return unless user + + merge_request = MergeRequest.find_by_id(merge_request_id) + return unless merge_request + + MergeRequests::CreatePipelineService.new(project, user).execute(merge_request) + merge_request.update_head_pipeline + end + end +end diff --git a/app/workers/merge_requests/handle_assignees_change_worker.rb b/app/workers/merge_requests/handle_assignees_change_worker.rb new file mode 100644 index 00000000000..e79d8293bae --- /dev/null +++ b/app/workers/merge_requests/handle_assignees_change_worker.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +class MergeRequests::HandleAssigneesChangeWorker + include ApplicationWorker + + feature_category :code_review + urgency :high + deduplicate :until_executed + idempotent! + + def perform(merge_request_id, user_id, old_assignee_ids, options = {}) + merge_request = MergeRequest.find(merge_request_id) + user = User.find(user_id) + + old_assignees = User.id_in(old_assignee_ids) + + ::MergeRequests::HandleAssigneesChangeService + .new(merge_request.target_project, user) + .execute(merge_request, old_assignees, options) + rescue ActiveRecord::RecordNotFound + end +end diff --git a/app/workers/merge_requests/resolve_todos_worker.rb b/app/workers/merge_requests/resolve_todos_worker.rb new file mode 100644 index 00000000000..2a5f742f809 --- /dev/null +++ b/app/workers/merge_requests/resolve_todos_worker.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +class MergeRequests::ResolveTodosWorker + include ApplicationWorker + + feature_category :code_review + urgency :high + deduplicate :until_executed + idempotent! + + def perform(merge_request_id, user_id) + merge_request = MergeRequest.find(merge_request_id) + user = User.find(user_id) + + MergeRequests::ResolveTodosService.new(merge_request, user).execute + rescue ActiveRecord::RecordNotFound + end +end diff --git a/app/workers/namespaces/in_product_marketing_emails_worker.rb b/app/workers/namespaces/in_product_marketing_emails_worker.rb index f8fa393264a..3070afed3d6 100644 --- a/app/workers/namespaces/in_product_marketing_emails_worker.rb +++ b/app/workers/namespaces/in_product_marketing_emails_worker.rb @@ -9,10 +9,27 @@ module Namespaces urgency :low def perform - return unless Gitlab::CurrentSettings.in_product_marketing_emails_enabled - return unless Gitlab::Experimentation.active?(:in_product_marketing_emails) + return if paid_self_managed_instance? + return if setting_disabled? + return if experiment_inactive? Namespaces::InProductMarketingEmailsService.send_for_all_tracks_and_intervals end + + private + + def paid_self_managed_instance? + false + end + + def setting_disabled? + !Gitlab::CurrentSettings.in_product_marketing_emails_enabled + end + + def experiment_inactive? + Gitlab.com? && !Gitlab::Experimentation.active?(:in_product_marketing_emails) + end end end + +Namespaces::InProductMarketingEmailsWorker.prepend_if_ee('EE::Namespaces::InProductMarketingEmailsWorker') diff --git a/app/workers/new_issue_worker.rb b/app/workers/new_issue_worker.rb index be9a168c3f6..c08f4b4cd75 100644 --- a/app/workers/new_issue_worker.rb +++ b/app/workers/new_issue_worker.rb @@ -14,7 +14,12 @@ class NewIssueWorker # rubocop:disable Scalability/IdempotentWorker ::EventCreateService.new.open_issue(issuable, user) ::NotificationService.new.new_issue(issuable, user) + issuable.create_cross_references!(user) + + Issues::AfterCreateService + .new(issuable.project, user) + .execute(issuable) end def issuable_class diff --git a/app/workers/object_storage/migrate_uploads_worker.rb b/app/workers/object_storage/migrate_uploads_worker.rb index f489e428e8d..666bacb0188 100644 --- a/app/workers/object_storage/migrate_uploads_worker.rb +++ b/app/workers/object_storage/migrate_uploads_worker.rb @@ -16,7 +16,8 @@ module ObjectStorage attr_accessor :error def initialize(upload, error = nil) - @upload, @error = upload, error + @upload = upload + @error = error end def success? diff --git a/app/workers/packages/go/sync_packages_worker.rb b/app/workers/packages/go/sync_packages_worker.rb new file mode 100644 index 00000000000..e41f27f2252 --- /dev/null +++ b/app/workers/packages/go/sync_packages_worker.rb @@ -0,0 +1,33 @@ +# frozen_string_literal: true + +module Packages + module Go + class SyncPackagesWorker + include ApplicationWorker + include Gitlab::Golang + + queue_namespace :package_repositories + feature_category :package_registry + + deduplicate :until_executing + idempotent! + + def perform(project_id, ref_name, path) + project = Project.find_by_id(project_id) + return unless project && project.repository.find_tag(ref_name) + + module_name = go_path(project, path) + mod = Packages::Go::ModuleFinder.new(project, module_name).execute + return unless mod + + ver = Packages::Go::VersionFinder.new(mod).find(ref_name) + return unless ver + + Packages::Go::CreatePackageService.new(project, nil, version: ver).execute + + rescue ::Packages::Go::CreatePackageService::GoZipSizeError => ex + Gitlab::ErrorTracking.log_exception(ex) + end + end + end +end diff --git a/app/workers/packages/rubygems/extraction_worker.rb b/app/workers/packages/rubygems/extraction_worker.rb new file mode 100644 index 00000000000..1e5cd0b54ce --- /dev/null +++ b/app/workers/packages/rubygems/extraction_worker.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module Packages + module Rubygems + class ExtractionWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + queue_namespace :package_repositories + feature_category :package_registry + deduplicate :until_executing + + idempotent! + + def perform(package_file_id) + package_file = ::Packages::PackageFile.find_by_id(package_file_id) + + return unless package_file + + ::Packages::Rubygems::ProcessGemService.new(package_file).execute + + rescue ::Packages::Rubygems::ProcessGemService::ExtractionError => e + Gitlab::ErrorTracking.log_exception(e, project_id: package_file.project_id) + package_file.package.destroy! + end + end + end +end diff --git a/app/workers/pages_update_configuration_worker.rb b/app/workers/pages_update_configuration_worker.rb index 6e82e2099c7..ca5016dc782 100644 --- a/app/workers/pages_update_configuration_worker.rb +++ b/app/workers/pages_update_configuration_worker.rb @@ -7,7 +7,7 @@ class PagesUpdateConfigurationWorker feature_category :pages def self.perform_async(*args) - return unless Feature.enabled?(:pages_update_legacy_storage, default_enabled: true) + return unless ::Settings.pages.local_store.enabled super(*args) end diff --git a/app/workers/projects/post_creation_worker.rb b/app/workers/projects/post_creation_worker.rb new file mode 100644 index 00000000000..2ca62e582b6 --- /dev/null +++ b/app/workers/projects/post_creation_worker.rb @@ -0,0 +1,34 @@ +# frozen_string_literal: true + +module Projects + class PostCreationWorker + include ApplicationWorker + + feature_category :source_code_management + idempotent! + + def perform(project_id) + project = Project.find_by_id(project_id) + + return unless project + + create_prometheus_service(project) + end + + private + + def create_prometheus_service(project) + service = project.find_or_initialize_service(::PrometheusService.to_param) + + # If the service has already been inserted in the database, that + # means it came from a template, and there's nothing more to do. + return if service.persisted? + + return unless service.prometheus_available? + + service.save! + rescue ActiveRecord::RecordInvalid => e + Gitlab::ErrorTracking.track_exception(e, extra: { project_id: project.id }) + end + end +end diff --git a/app/workers/remove_expired_members_worker.rb b/app/workers/remove_expired_members_worker.rb index 35844fdf297..fc2ec047e1c 100644 --- a/app/workers/remove_expired_members_worker.rb +++ b/app/workers/remove_expired_members_worker.rb @@ -2,20 +2,29 @@ class RemoveExpiredMembersWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker - include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + include CronjobQueue feature_category :authentication_and_authorization worker_resource_boundary :cpu # rubocop: disable CodeReuse/ActiveRecord def perform - Member.expired.preload(:user).find_each do |member| - Members::DestroyService.new.execute(member, skip_authorization: true) + Member.expired.preload(:user, :source).find_each do |member| + context = { + user: member.user, + # The ApplicationContext will reject type-mismatches. So a GroupMemeber will only populate `namespace`. + # while a `ProjectMember` will populate `project + project: member.source, + namespace: member.source + } + with_context(context) do + Members::DestroyService.new.execute(member, skip_authorization: true) - expired_user = member.user + expired_user = member.user - if expired_user.project_bot? - Users::DestroyService.new(nil).execute(expired_user, skip_authorization: true) + if expired_user.project_bot? + Users::DestroyService.new(nil).execute(expired_user, skip_authorization: true) + end end rescue => ex logger.error("Expired Member ID=#{member.id} cannot be removed - #{ex}") diff --git a/app/workers/ssh_keys/expired_notification_worker.rb b/app/workers/ssh_keys/expired_notification_worker.rb new file mode 100644 index 00000000000..ab6d1998773 --- /dev/null +++ b/app/workers/ssh_keys/expired_notification_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module SshKeys + class ExpiredNotificationWorker + include ApplicationWorker + include CronjobQueue + + feature_category :compliance_management + idempotent! + + def perform + return unless ::Feature.enabled?(:ssh_key_expiration_email_notification, default_enabled: :yaml) + + User.with_ssh_key_expired_today.find_each do |user| + with_context(user: user) do + Gitlab::AppLogger.info "#{self.class}: Notifying User #{user.id} about expired ssh key(s)" + + keys = user.expired_today_and_unnotified_keys + + Keys::ExpiryNotificationService.new(user, { keys: keys, expiring_soon: false }).execute + end + end + end + end +end diff --git a/app/workers/ssh_keys/expiring_soon_notification_worker.rb b/app/workers/ssh_keys/expiring_soon_notification_worker.rb new file mode 100644 index 00000000000..3214cd7a242 --- /dev/null +++ b/app/workers/ssh_keys/expiring_soon_notification_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module SshKeys + class ExpiringSoonNotificationWorker + include ApplicationWorker + include CronjobQueue + + feature_category :compliance_management + idempotent! + + def perform + return unless ::Feature.enabled?(:ssh_key_expiration_email_notification, default_enabled: :yaml) + + User.with_ssh_key_expiring_soon.find_each do |user| + with_context(user: user) do + Gitlab::AppLogger.info "#{self.class}: Notifying User #{user.id} about expiring soon ssh key(s)" + + keys = user.expiring_soon_and_unnotified_keys + + Keys::ExpiryNotificationService.new(user, { keys: keys, expiring_soon: true }).execute + end + end + end + end +end diff --git a/app/workers/todos_destroyer/destroyed_issuable_worker.rb b/app/workers/todos_destroyer/destroyed_issuable_worker.rb new file mode 100644 index 00000000000..6ca1959ff34 --- /dev/null +++ b/app/workers/todos_destroyer/destroyed_issuable_worker.rb @@ -0,0 +1,14 @@ +# frozen_string_literal: true + +module TodosDestroyer + class DestroyedIssuableWorker + include ApplicationWorker + include TodosDestroyerQueue + + idempotent! + + def perform(target_id, target_type) + ::Todos::Destroy::DestroyedIssuableService.new(target_id, target_type).execute + end + end +end diff --git a/app/workers/update_highest_role_worker.rb b/app/workers/update_highest_role_worker.rb index 1e2c974b6e5..952f1e511ea 100644 --- a/app/workers/update_highest_role_worker.rb +++ b/app/workers/update_highest_role_worker.rb @@ -3,7 +3,7 @@ class UpdateHighestRoleWorker include ApplicationWorker - feature_category :authentication_and_authorization + feature_category :utilization urgency :high weight 2 |