diff options
Diffstat (limited to 'app/workers')
47 files changed, 450 insertions, 221 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 31c590183d1..8d08beb56aa 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -247,6 +247,15 @@ :idempotent: true :tags: - :exclude_from_kubernetes +- :name: cronjob:database_partition_management + :worker_name: Database::PartitionManagementWorker + :feature_category: :database + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: cronjob:environments_auto_stop_cron :worker_name: Environments::AutoStopCronWorker :feature_category: :continuous_delivery @@ -265,9 +274,9 @@ :weight: 1 :idempotent: :tags: [] -- :name: cronjob:gitlab_usage_ping - :worker_name: GitlabUsagePingWorker - :feature_category: :usage_ping +- :name: cronjob:gitlab_service_ping + :worker_name: GitlabServicePingWorker + :feature_category: :service_ping :has_external_dependencies: :urgency: :low :resource_boundary: :unknown @@ -1078,6 +1087,15 @@ :weight: 2 :idempotent: true :tags: [] +- :name: jira_connect:jira_connect_forward_event + :worker_name: JiraConnect::ForwardEventWorker + :feature_category: :integrations + :has_external_dependencies: true + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] - :name: jira_connect:jira_connect_sync_branch :worker_name: JiraConnect::SyncBranchWorker :feature_category: :integrations @@ -1085,7 +1103,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: true + :idempotent: :tags: [] - :name: jira_connect:jira_connect_sync_builds :worker_name: JiraConnect::SyncBuildsWorker @@ -1094,7 +1112,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: true + :idempotent: :tags: - :exclude_from_kubernetes - :name: jira_connect:jira_connect_sync_deployments @@ -1104,7 +1122,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: true + :idempotent: :tags: - :exclude_from_kubernetes - :name: jira_connect:jira_connect_sync_feature_flags @@ -1114,7 +1132,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: true + :idempotent: :tags: - :exclude_from_kubernetes - :name: jira_connect:jira_connect_sync_merge_request @@ -1124,7 +1142,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: true + :idempotent: :tags: [] - :name: jira_connect:jira_connect_sync_project :worker_name: JiraConnect::SyncProjectWorker @@ -1133,7 +1151,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 1 - :idempotent: true + :idempotent: :tags: - :exclude_from_kubernetes - :name: jira_importer:jira_import_advance_stage @@ -1309,6 +1327,15 @@ :idempotent: true :tags: - :exclude_from_kubernetes +- :name: package_repositories:packages_helm_extraction + :worker_name: Packages::Helm::ExtractionWorker + :feature_category: :package_registry + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] - :name: package_repositories:packages_maven_metadata_sync :worker_name: Packages::Maven::Metadata::SyncWorker :feature_category: :package_registry @@ -1347,6 +1374,15 @@ :weight: 1 :idempotent: :tags: [] +- :name: pipeline_background:ci_archive_trace + :worker_name: Ci::ArchiveTraceWorker + :feature_category: :continuous_integration + :has_external_dependencies: + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: + :tags: [] - :name: pipeline_background:ci_build_trace_chunk_flush :worker_name: Ci::BuildTraceChunkFlushWorker :feature_category: :continuous_integration @@ -1567,6 +1603,15 @@ :weight: 5 :idempotent: :tags: [] +- :name: pipeline_processing:ci_build_finished + :worker_name: Ci::BuildFinishedWorker + :feature_category: :continuous_integration + :has_external_dependencies: + :urgency: :high + :resource_boundary: :cpu + :weight: 5 + :idempotent: + :tags: [] - :name: pipeline_processing:ci_build_prepare :worker_name: Ci::BuildPrepareWorker :feature_category: :continuous_integration @@ -1601,7 +1646,7 @@ :urgency: :low :resource_boundary: :unknown :weight: 5 - :idempotent: + :idempotent: true :tags: [] - :name: pipeline_processing:pipeline_process :worker_name: PipelineProcessWorker diff --git a/app/workers/archive_trace_worker.rb b/app/workers/archive_trace_worker.rb index 629526ec17c..ecde05f94dc 100644 --- a/app/workers/archive_trace_worker.rb +++ b/app/workers/archive_trace_worker.rb @@ -1,16 +1,5 @@ # frozen_string_literal: true -class ArchiveTraceWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker - - sidekiq_options retry: 3 - include PipelineBackgroundQueue - - # rubocop: disable CodeReuse/ActiveRecord - def perform(job_id) - Ci::Build.without_archived_trace.find_by(id: job_id).try do |job| - Ci::ArchiveTraceService.new.execute(job, worker_name: self.class.name) - end - end - # rubocop: enable CodeReuse/ActiveRecord +class ArchiveTraceWorker < ::Ci::ArchiveTraceWorker # rubocop:disable Scalability/IdempotentWorker + # DEPRECATED: Not triggered since https://gitlab.com/gitlab-org/gitlab/-/merge_requests/64934/ end diff --git a/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb b/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb index 5ca9de63fd7..10f7cb20df0 100644 --- a/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb +++ b/app/workers/authorized_project_update/user_refresh_from_replica_worker.rb @@ -1,15 +1,54 @@ # frozen_string_literal: true module AuthorizedProjectUpdate - class UserRefreshFromReplicaWorker < ::AuthorizedProjectsWorker + class UserRefreshFromReplicaWorker + include ApplicationWorker + + sidekiq_options retry: 3 feature_category :authentication_and_authorization urgency :low queue_namespace :authorized_project_update - deduplicate :until_executing, including_scheduled: true idempotent! + deduplicate :until_executing, including_scheduled: true + + def perform(user_id) + if Feature.enabled?(:user_refresh_from_replica_worker_uses_replica_db) + use_replica_if_available do + user = User.find_by_id(user_id) + + if user && project_authorizations_needs_refresh?(user) + enqueue_project_authorizations_refresh(user) + end + end + else + user = User.find_by_id(user_id) + return unless user + + user.refresh_authorized_projects(source: self.class.name) + end + end + + private + + # We use this approach instead of specifying `data_consistency :delayed` because these jobs + # are enqueued in large numbers, and using `data_consistency :delayed` + # does not allow us to deduplicate these jobs. + # https://gitlab.com/gitlab-org/gitlab/-/issues/325291 + def use_replica_if_available(&block) + return yield unless ::Gitlab::Database::LoadBalancing.enable? + + ::Gitlab::Database::LoadBalancing::Session.current.use_replicas_for_read_queries(&block) + end + + def project_authorizations_needs_refresh?(user) + AuthorizedProjectUpdate::FindRecordsDueForRefreshService.new(user).needs_refresh? + end - # This worker will start reading data from the replica database soon - # Issue: https://gitlab.com/gitlab-org/gitlab/-/issues/333219 + def enqueue_project_authorizations_refresh(user) + with_context(user: user) do + AuthorizedProjectUpdate::UserRefreshWithLowUrgencyWorker.perform_async(user.id) + end + end end end diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index a3eaacec8a2..0d41f7b9438 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -1,61 +1,9 @@ # frozen_string_literal: true -class BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker - include ApplicationWorker +class BuildFinishedWorker < ::Ci::BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker + # DEPRECATED: Not triggered since https://gitlab.com/gitlab-org/gitlab/-/merge_requests/64934/ - sidekiq_options retry: 3 - include PipelineQueue - - queue_namespace :pipeline_processing + # We need to explicitly specify these settings. They aren't inheriting from the parent class. urgency :high worker_resource_boundary :cpu - - ARCHIVE_TRACES_IN = 2.minutes.freeze - - # rubocop: disable CodeReuse/ActiveRecord - def perform(build_id) - Ci::Build.find_by(id: build_id).try do |build| - process_build(build) - end - end - # rubocop: enable CodeReuse/ActiveRecord - - private - - # Processes a single CI build that has finished. - # - # This logic resides in a separate method so that EE can extend it more - # easily. - # - # @param [Ci::Build] build The build to process. - def process_build(build) - # We execute these in sync to reduce IO. - build.parse_trace_sections! - build.update_coverage - Ci::BuildReportResultService.new.execute(build) - - # We execute these async as these are independent operations. - BuildHooksWorker.perform_async(build.id) - ChatNotificationWorker.perform_async(build.id) if build.pipeline.chat? - - if build.failed? - ::Ci::MergeRequests::AddTodoWhenBuildFailsWorker.perform_async(build.id) - end - - ## - # We want to delay sending a build trace to object storage operation to - # validate that this fixes a race condition between this and flushing live - # trace chunks and chunks being removed after consolidation and putting - # them into object storage archive. - # - # TODO This is temporary fix we should improve later, after we validate - # that this is indeed the culprit. - # - # See https://gitlab.com/gitlab-org/gitlab/-/issues/267112 for more - # details. - # - ArchiveTraceWorker.perform_in(ARCHIVE_TRACES_IN, build.id) - end end - -BuildFinishedWorker.prepend_mod_with('BuildFinishedWorker') diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb index aa3c03f773e..4ab08bbd7fe 100644 --- a/app/workers/build_queue_worker.rb +++ b/app/workers/build_queue_worker.rb @@ -10,7 +10,7 @@ class BuildQueueWorker # rubocop:disable Scalability/IdempotentWorker feature_category :continuous_integration urgency :high worker_resource_boundary :cpu - data_consistency :sticky, feature_flag: :load_balancing_for_build_queue_worker + data_consistency :sticky # rubocop: disable CodeReuse/ActiveRecord def perform(build_id) diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb index 24e75ad0f85..d3bb36d830f 100644 --- a/app/workers/bulk_imports/export_request_worker.rb +++ b/app/workers/bulk_imports/export_request_worker.rb @@ -25,7 +25,7 @@ module BulkImports def http_client(configuration) @client ||= Clients::HTTP.new( - uri: configuration.url, + url: configuration.url, token: configuration.access_token ) end diff --git a/app/workers/ci/archive_trace_worker.rb b/app/workers/ci/archive_trace_worker.rb new file mode 100644 index 00000000000..16288faf370 --- /dev/null +++ b/app/workers/ci/archive_trace_worker.rb @@ -0,0 +1,18 @@ +# frozen_string_literal: true + +module Ci + class ArchiveTraceWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + sidekiq_options retry: 3 + include PipelineBackgroundQueue + + # rubocop: disable CodeReuse/ActiveRecord + def perform(job_id) + Ci::Build.without_archived_trace.find_by(id: job_id).try do |job| + Ci::ArchiveTraceService.new.execute(job, worker_name: self.class.name) + end + end + # rubocop: enable CodeReuse/ActiveRecord + end +end diff --git a/app/workers/ci/archive_traces_cron_worker.rb b/app/workers/ci/archive_traces_cron_worker.rb index c748bc33ada..5fe3adf870f 100644 --- a/app/workers/ci/archive_traces_cron_worker.rb +++ b/app/workers/ci/archive_traces_cron_worker.rb @@ -12,7 +12,7 @@ module Ci # rubocop: disable CodeReuse/ActiveRecord def perform # Archive stale live traces which still resides in redis or database - # This could happen when ArchiveTraceWorker sidekiq jobs were lost by receiving SIGKILL + # This could happen when Ci::ArchiveTraceWorker sidekiq jobs were lost by receiving SIGKILL # More details in https://gitlab.com/gitlab-org/gitlab-foss/issues/36791 Ci::Build.with_stale_live_trace.find_each(batch_size: 100) do |build| Ci::ArchiveTraceService.new.execute(build, worker_name: self.class.name) diff --git a/app/workers/ci/build_finished_worker.rb b/app/workers/ci/build_finished_worker.rb new file mode 100644 index 00000000000..1d6e3b1fa3c --- /dev/null +++ b/app/workers/ci/build_finished_worker.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +module Ci + class BuildFinishedWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + sidekiq_options retry: 3 + include PipelineQueue + + queue_namespace :pipeline_processing + urgency :high + worker_resource_boundary :cpu + + ARCHIVE_TRACES_IN = 2.minutes.freeze + + # rubocop: disable CodeReuse/ActiveRecord + def perform(build_id) + Ci::Build.find_by(id: build_id).try do |build| + process_build(build) + end + end + # rubocop: enable CodeReuse/ActiveRecord + + private + + # Processes a single CI build that has finished. + # + # This logic resides in a separate method so that EE can extend it more + # easily. + # + # @param [Ci::Build] build The build to process. + def process_build(build) + # We execute these in sync to reduce IO. + build.parse_trace_sections! + build.update_coverage + Ci::BuildReportResultService.new.execute(build) + + # We execute these async as these are independent operations. + BuildHooksWorker.perform_async(build.id) + ChatNotificationWorker.perform_async(build.id) if build.pipeline.chat? + + if build.failed? + ::Ci::MergeRequests::AddTodoWhenBuildFailsWorker.perform_async(build.id) + end + + ## + # We want to delay sending a build trace to object storage operation to + # validate that this fixes a race condition between this and flushing live + # trace chunks and chunks being removed after consolidation and putting + # them into object storage archive. + # + # TODO This is temporary fix we should improve later, after we validate + # that this is indeed the culprit. + # + # See https://gitlab.com/gitlab-org/gitlab/-/issues/267112 for more + # details. + # + archive_trace_worker_class(build).perform_in(ARCHIVE_TRACES_IN, build.id) + end + + def archive_trace_worker_class(build) + if Feature.enabled?(:ci_build_finished_worker_namespace_changed, build.project, default_enabled: :yaml) + Ci::ArchiveTraceWorker + else + ::ArchiveTraceWorker + end + end + end +end + +Ci::BuildFinishedWorker.prepend_mod_with('Ci::BuildFinishedWorker') diff --git a/app/workers/ci/resource_groups/assign_resource_from_resource_group_worker.rb b/app/workers/ci/resource_groups/assign_resource_from_resource_group_worker.rb index 15ed89fd00e..ad0ed3d16f1 100644 --- a/app/workers/ci/resource_groups/assign_resource_from_resource_group_worker.rb +++ b/app/workers/ci/resource_groups/assign_resource_from_resource_group_worker.rb @@ -2,7 +2,10 @@ module Ci module ResourceGroups - class AssignResourceFromResourceGroupWorker # rubocop:disable Scalability/IdempotentWorker + # This worker is to assign a resource to a pipeline job from a resource group + # and enqueue the job to be executed by a runner. + # See https://docs.gitlab.com/ee/ci/yaml/#resource_group for more information. + class AssignResourceFromResourceGroupWorker include ApplicationWorker sidekiq_options retry: 3 @@ -11,6 +14,13 @@ module Ci queue_namespace :pipeline_processing feature_category :continuous_delivery + # This worker is idempotent that it produces the same result + # as long as the same resource group id is passed as an argument. + # Therefore, we can deduplicate the sidekiq jobs until the on-going + # assignment process has been finished. + idempotent! + deduplicate :until_executed + def perform(resource_group_id) ::Ci::ResourceGroup.find_by_id(resource_group_id).try do |resource_group| Ci::ResourceGroups::AssignResourceFromResourceGroupService.new(resource_group.project, nil) diff --git a/app/workers/clusters/applications/activate_service_worker.rb b/app/workers/clusters/applications/activate_service_worker.rb index d4d0ae96e03..a7073b78a81 100644 --- a/app/workers/clusters/applications/activate_service_worker.rb +++ b/app/workers/clusters/applications/activate_service_worker.rb @@ -15,7 +15,7 @@ module Clusters return unless cluster cluster.all_projects.find_each do |project| - project.find_or_initialize_service(service_name).update!(active: true) + project.find_or_initialize_integration(service_name).update!(active: true) end end end diff --git a/app/workers/clusters/applications/deactivate_service_worker.rb b/app/workers/clusters/applications/deactivate_service_worker.rb index 935b455a4fc..9337af56623 100644 --- a/app/workers/clusters/applications/deactivate_service_worker.rb +++ b/app/workers/clusters/applications/deactivate_service_worker.rb @@ -10,18 +10,18 @@ module Clusters loggable_arguments 1 - def perform(cluster_id, service_name) + def perform(cluster_id, integration_name) cluster = Clusters::Cluster.find_by_id(cluster_id) - raise cluster_missing_error(service_name) unless cluster + raise cluster_missing_error(integration_name) unless cluster - service = "#{service_name}_service".to_sym - cluster.all_projects.with_service(service).find_each do |project| - project.public_send(service).update!(active: false) # rubocop:disable GitlabSecurity/PublicSend + integration = ::Project.integration_association_name(integration_name).to_sym + cluster.all_projects.with_integration(integration).find_each do |project| + project.public_send(integration).update!(active: false) # rubocop:disable GitlabSecurity/PublicSend end end - def cluster_missing_error(service) - ActiveRecord::RecordNotFound.new("Can't deactivate #{service} services, host cluster not found! Some inconsistent records may be left in database.") + def cluster_missing_error(integration_name) + ActiveRecord::RecordNotFound.new("Can't deactivate #{integration_name} integrations, host cluster not found! Some inconsistent records may be left in database.") end end end diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 3cba1eb31c5..e158ae0c298 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -47,11 +47,36 @@ module ApplicationWorker end class_methods do + extend ::Gitlab::Utils::Override + def inherited(subclass) subclass.set_queue subclass.after_set_class_attribute { subclass.set_queue } end + override :validate_worker_attributes! + def validate_worker_attributes! + super + + # Since the delayed data_consistency will use sidekiq built in retry mechanism, it is required that this mechanism + # is not disabled. + if retry_disabled? && get_data_consistency == :delayed + raise ArgumentError, "Retry support cannot be disabled if data_consistency is set to :delayed" + end + end + + # Checks if sidekiq retry support is disabled + def retry_disabled? + get_sidekiq_options['retry'] == 0 || get_sidekiq_options['retry'] == false + end + + override :sidekiq_options + def sidekiq_options(opts = {}) + super.tap do + validate_worker_attributes! + end + end + def perform_async(*args) # Worker execution for workers with data_consistency set to :delayed or :sticky # will be delayed to give replication enough time to complete diff --git a/app/workers/concerns/gitlab/github_import/object_importer.rb b/app/workers/concerns/gitlab/github_import/object_importer.rb index 6ebf7c7c263..1eff53cea01 100644 --- a/app/workers/concerns/gitlab/github_import/object_importer.rb +++ b/app/workers/concerns/gitlab/github_import/object_importer.rb @@ -36,14 +36,15 @@ module Gitlab importer_class.new(object, project, client).execute - counter.increment + Gitlab::GithubImport::ObjectCounter.increment(project, object_type, :imported) + info(project.id, message: 'importer finished') rescue StandardError => e error(project.id, e, hash) end - def counter - @counter ||= Gitlab::Metrics.counter(counter_name, counter_description) + def object_type + raise NotImplementedError end # Returns the representation class to use for the object. This class must @@ -57,16 +58,6 @@ module Gitlab raise NotImplementedError end - # Returns the name (as a Symbol) of the Prometheus counter. - def counter_name - raise NotImplementedError - end - - # Returns the description (as a String) of the Prometheus counter. - def counter_description - raise NotImplementedError - end - private attr_accessor :github_id diff --git a/app/workers/concerns/waitable_worker.rb b/app/workers/concerns/waitable_worker.rb index e62bd8d9885..f8b945b8892 100644 --- a/app/workers/concerns/waitable_worker.rb +++ b/app/workers/concerns/waitable_worker.rb @@ -32,7 +32,9 @@ module WaitableWorker failed = [] args_list.each do |args| - new.perform(*args) + worker = new + Gitlab::AppJsonLogger.info(worker.structured_payload(message: 'running inline')) + worker.perform(*args) rescue StandardError failed << args end diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index 096be808787..806fce38636 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -12,6 +12,7 @@ module WorkerAttributes VALID_URGENCIES = [:high, :low, :throttled].freeze VALID_DATA_CONSISTENCIES = [:always, :sticky, :delayed].freeze + DEFAULT_DATA_CONSISTENCY = :always NAMESPACE_WEIGHTS = { auto_devops: 2, @@ -110,7 +111,7 @@ module WorkerAttributes end def get_data_consistency - class_attributes[:data_consistency] || :always + class_attributes[:data_consistency] || DEFAULT_DATA_CONSISTENCY end def get_data_consistency_feature_flag_enabled? diff --git a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb index 3027d46b8b1..33dda6a8f0c 100644 --- a/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb +++ b/app/workers/container_expiration_policies/cleanup_container_repository_worker.rb @@ -49,15 +49,11 @@ module ContainerExpirationPolicies end def remaining_work_count - total_count = cleanup_scheduled_count + cleanup_unfinished_count + count = cleanup_scheduled_count - log_info( - cleanup_scheduled_count: cleanup_scheduled_count, - cleanup_unfinished_count: cleanup_unfinished_count, - cleanup_total_count: total_count - ) + return count if count > max_running_jobs - total_count + count + cleanup_unfinished_count end private diff --git a/app/workers/container_expiration_policy_worker.rb b/app/workers/container_expiration_policy_worker.rb index 8fc139ac87c..a35ca5d184e 100644 --- a/app/workers/container_expiration_policy_worker.rb +++ b/app/workers/container_expiration_policy_worker.rb @@ -17,6 +17,7 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo process_stale_ongoing_cleanups disable_policies_without_container_repositories throttling_enabled? ? perform_throttled : perform_unthrottled + log_counts end private @@ -28,6 +29,26 @@ class ContainerExpirationPolicyWorker # rubocop:disable Scalability/IdempotentWo end end + def log_counts + use_replica_if_available do + required_count = ContainerRepository.requiring_cleanup.count + unfinished_count = ContainerRepository.with_unfinished_cleanup.count + + log_extra_metadata_on_done(:cleanup_required_count, required_count) + log_extra_metadata_on_done(:cleanup_unfinished_count, unfinished_count) + log_extra_metadata_on_done(:cleanup_total_count, required_count + unfinished_count) + end + end + + # data_consistency :delayed not used as this is a cron job and those jobs are + # not perfomed with a delay + # https://gitlab.com/gitlab-org/gitlab/-/merge_requests/63635#note_603771207 + def use_replica_if_available(&blk) + return yield unless ::Gitlab::Database::LoadBalancing.enable? + + ::Gitlab::Database::LoadBalancing::Session.current.use_replicas_for_read_queries(&blk) + end + def process_stale_ongoing_cleanups threshold = delete_tags_service_timeout.seconds + 30.minutes ContainerRepository.with_stale_ongoing_cleanup(threshold.ago) diff --git a/app/workers/database/partition_management_worker.rb b/app/workers/database/partition_management_worker.rb new file mode 100644 index 00000000000..c9b1cd6d261 --- /dev/null +++ b/app/workers/database/partition_management_worker.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Database + class PartitionManagementWorker + include ApplicationWorker + + sidekiq_options retry: 3 + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext + + feature_category :database + idempotent! + + def perform + Gitlab::Database::Partitioning::PartitionManager.new.sync_partitions + ensure + Gitlab::Database::Partitioning::PartitionMonitoring.new.report_metrics + end + end +end diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index 9702fac39ba..64f73d1fba1 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -10,7 +10,7 @@ class ExpirePipelineCacheWorker queue_namespace :pipeline_cache urgency :high worker_resource_boundary :cpu - data_consistency :delayed, feature_flag: :load_balancing_for_expire_pipeline_cache_worker + data_consistency :delayed # This worker _should_ be idempotent, but due to us moving this to data_consistency :delayed # and an ongoing incompatibility between the two switches, we need to disable this. diff --git a/app/workers/gitlab/github_import/import_diff_note_worker.rb b/app/workers/gitlab/github_import/import_diff_note_worker.rb index 25fb0375692..85b7d6c76bd 100644 --- a/app/workers/gitlab/github_import/import_diff_note_worker.rb +++ b/app/workers/gitlab/github_import/import_diff_note_worker.rb @@ -13,12 +13,8 @@ module Gitlab Importer::DiffNoteImporter end - def counter_name - :github_importer_imported_diff_notes - end - - def counter_description - 'The number of imported GitHub pull request review comments' + def object_type + :diff_note end end end diff --git a/app/workers/gitlab/github_import/import_issue_worker.rb b/app/workers/gitlab/github_import/import_issue_worker.rb index d9c496e3eb3..8fdc0219ffd 100644 --- a/app/workers/gitlab/github_import/import_issue_worker.rb +++ b/app/workers/gitlab/github_import/import_issue_worker.rb @@ -13,12 +13,8 @@ module Gitlab Importer::IssueAndLabelLinksImporter end - def counter_name - :github_importer_imported_issues - end - - def counter_description - 'The number of imported GitHub issues' + def object_type + :issue end end end diff --git a/app/workers/gitlab/github_import/import_lfs_object_worker.rb b/app/workers/gitlab/github_import/import_lfs_object_worker.rb index 78f78fdb160..2a95366bac7 100644 --- a/app/workers/gitlab/github_import/import_lfs_object_worker.rb +++ b/app/workers/gitlab/github_import/import_lfs_object_worker.rb @@ -13,12 +13,8 @@ module Gitlab Importer::LfsObjectImporter end - def counter_name - :github_importer_imported_lfs_objects - end - - def counter_description - 'The number of imported GitHub Lfs Objects' + def object_type + :lfs_object end end end diff --git a/app/workers/gitlab/github_import/import_note_worker.rb b/app/workers/gitlab/github_import/import_note_worker.rb index d0f97a15afd..2125c953778 100644 --- a/app/workers/gitlab/github_import/import_note_worker.rb +++ b/app/workers/gitlab/github_import/import_note_worker.rb @@ -13,12 +13,8 @@ module Gitlab Importer::NoteImporter end - def counter_name - :github_importer_imported_notes - end - - def counter_description - 'The number of imported GitHub comments' + def object_type + :note end end end diff --git a/app/workers/gitlab/github_import/import_pull_request_merged_by_worker.rb b/app/workers/gitlab/github_import/import_pull_request_merged_by_worker.rb index a8b79cf9b3a..91dab3470d9 100644 --- a/app/workers/gitlab/github_import/import_pull_request_merged_by_worker.rb +++ b/app/workers/gitlab/github_import/import_pull_request_merged_by_worker.rb @@ -15,12 +15,8 @@ module Gitlab Importer::PullRequestMergedByImporter end - def counter_name - :github_importer_imported_pull_requests_merged_by - end - - def counter_description - 'The number of imported GitHub pull requests merged by' + def object_type + :pull_request_merged_by end end end diff --git a/app/workers/gitlab/github_import/import_pull_request_review_worker.rb b/app/workers/gitlab/github_import/import_pull_request_review_worker.rb index 5ee88d5d32b..de10fe40589 100644 --- a/app/workers/gitlab/github_import/import_pull_request_review_worker.rb +++ b/app/workers/gitlab/github_import/import_pull_request_review_worker.rb @@ -15,12 +15,8 @@ module Gitlab Importer::PullRequestReviewImporter end - def counter_name - :github_importer_imported_pull_request_reviews - end - - def counter_description - 'The number of imported GitHub pull request reviews' + def object_type + :pull_request_review end end end diff --git a/app/workers/gitlab/github_import/import_pull_request_worker.rb b/app/workers/gitlab/github_import/import_pull_request_worker.rb index 9560874f247..79938a157d7 100644 --- a/app/workers/gitlab/github_import/import_pull_request_worker.rb +++ b/app/workers/gitlab/github_import/import_pull_request_worker.rb @@ -13,12 +13,8 @@ module Gitlab Importer::PullRequestImporter end - def counter_name - :github_importer_imported_pull_requests - end - - def counter_description - 'The number of imported GitHub pull requests' + def object_type + :pull_request end end end diff --git a/app/workers/gitlab/github_import/stage/finish_import_worker.rb b/app/workers/gitlab/github_import/stage/finish_import_worker.rb index f5980cc248e..f909d7e2f34 100644 --- a/app/workers/gitlab/github_import/stage/finish_import_worker.rb +++ b/app/workers/gitlab/github_import/stage/finish_import_worker.rb @@ -29,7 +29,8 @@ module Gitlab info( project.id, message: "GitHub project import finished", - duration_s: duration.round(2) + duration_s: duration.round(2), + object_counts: ::Gitlab::GithubImport::ObjectCounter.summary(project) ) end diff --git a/app/workers/gitlab/import/stuck_import_job.rb b/app/workers/gitlab/import/stuck_import_job.rb index ac789ce1188..57fb3baf2b5 100644 --- a/app/workers/gitlab/import/stuck_import_job.rb +++ b/app/workers/gitlab/import/stuck_import_job.rb @@ -5,7 +5,7 @@ module Gitlab module StuckImportJob extend ActiveSupport::Concern - IMPORT_JOBS_EXPIRATION = 15.hours.seconds.to_i + IMPORT_JOBS_EXPIRATION = 24.hours.seconds.to_i included do include ApplicationWorker diff --git a/app/workers/gitlab_usage_ping_worker.rb b/app/workers/gitlab_service_ping_worker.rb index 782b089261f..a27629eac0a 100644 --- a/app/workers/gitlab_usage_ping_worker.rb +++ b/app/workers/gitlab_service_ping_worker.rb @@ -1,19 +1,19 @@ # frozen_string_literal: true -class GitlabUsagePingWorker # rubocop:disable Scalability/IdempotentWorker - LEASE_KEY = 'gitlab_usage_ping_worker:ping' +class GitlabServicePingWorker # rubocop:disable Scalability/IdempotentWorker + LEASE_KEY = 'gitlab_service_ping_worker:ping' LEASE_TIMEOUT = 86400 include ApplicationWorker include CronjobQueue # rubocop:disable Scalability/CronWorkerContext include Gitlab::ExclusiveLeaseHelpers - feature_category :usage_ping + feature_category :service_ping sidekiq_options retry: 3, dead: false sidekiq_retry_in { |count| (count + 1) * 8.hours.to_i } def perform - # Disable usage ping for GitLab.com + # Disable service ping for GitLab.com # See https://gitlab.com/gitlab-org/gitlab/-/issues/292929 for details return if Gitlab.com? @@ -22,7 +22,7 @@ class GitlabUsagePingWorker # rubocop:disable Scalability/IdempotentWorker # Splay the request over a minute to avoid thundering herd problems. sleep(rand(0.0..60.0).round(3)) - SubmitUsagePingService.new.execute + ServicePing::SubmitService.new.execute end end end diff --git a/app/workers/jira_connect/forward_event_worker.rb b/app/workers/jira_connect/forward_event_worker.rb new file mode 100644 index 00000000000..877ab46cfe5 --- /dev/null +++ b/app/workers/jira_connect/forward_event_worker.rb @@ -0,0 +1,25 @@ +# frozen_string_literal: true + +module JiraConnect + class ForwardEventWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + queue_namespace :jira_connect + feature_category :integrations + worker_has_external_dependencies! + + def perform(installation_id, base_path, event_path) + installation = JiraConnectInstallation.find_by_id(installation_id) + + return if installation&.instance_url.nil? + + proxy_url = installation.instance_url + event_path + qsh = Atlassian::Jwt.create_query_string_hash(proxy_url, 'POST', installation.instance_url + base_path) + jwt = Atlassian::Jwt.encode({ iss: installation.client_key, qsh: qsh }, installation.shared_secret) + + Gitlab::HTTP.post(proxy_url, headers: { 'Authorization' => "JWT #{jwt}" }) + ensure + installation.destroy if installation + end + end +end diff --git a/app/workers/jira_connect/sync_branch_worker.rb b/app/workers/jira_connect/sync_branch_worker.rb index 4e8566d86c9..2723287b77b 100644 --- a/app/workers/jira_connect/sync_branch_worker.rb +++ b/app/workers/jira_connect/sync_branch_worker.rb @@ -1,16 +1,17 @@ # frozen_string_literal: true module JiraConnect - class SyncBranchWorker + class SyncBranchWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker sidekiq_options retry: 3 queue_namespace :jira_connect feature_category :integrations + data_consistency :delayed loggable_arguments 1, 2 + worker_has_external_dependencies! - idempotent! def perform(project_id, branch_name, commit_shas, update_sequence_id) project = Project.find_by_id(project_id) diff --git a/app/workers/jira_connect/sync_builds_worker.rb b/app/workers/jira_connect/sync_builds_worker.rb index 11a3b598035..4c4daba3314 100644 --- a/app/workers/jira_connect/sync_builds_worker.rb +++ b/app/workers/jira_connect/sync_builds_worker.rb @@ -1,18 +1,18 @@ # frozen_string_literal: true module JiraConnect - class SyncBuildsWorker + class SyncBuildsWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker sidekiq_options retry: 3 - idempotent! - worker_has_external_dependencies! - queue_namespace :jira_connect feature_category :integrations + data_consistency :delayed tags :exclude_from_kubernetes + worker_has_external_dependencies! + def perform(pipeline_id, sequence_id) pipeline = Ci::Pipeline.find_by_id(pipeline_id) diff --git a/app/workers/jira_connect/sync_deployments_worker.rb b/app/workers/jira_connect/sync_deployments_worker.rb index 9f75b1161f0..0dc34b5999f 100644 --- a/app/workers/jira_connect/sync_deployments_worker.rb +++ b/app/workers/jira_connect/sync_deployments_worker.rb @@ -1,18 +1,18 @@ # frozen_string_literal: true module JiraConnect - class SyncDeploymentsWorker + class SyncDeploymentsWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker sidekiq_options retry: 3 - idempotent! - worker_has_external_dependencies! - queue_namespace :jira_connect feature_category :integrations + data_consistency :delayed tags :exclude_from_kubernetes + worker_has_external_dependencies! + def perform(deployment_id, sequence_id) deployment = Deployment.find_by_id(deployment_id) diff --git a/app/workers/jira_connect/sync_feature_flags_worker.rb b/app/workers/jira_connect/sync_feature_flags_worker.rb index 0d8d3d3142e..c484cabbe6b 100644 --- a/app/workers/jira_connect/sync_feature_flags_worker.rb +++ b/app/workers/jira_connect/sync_feature_flags_worker.rb @@ -1,18 +1,18 @@ # frozen_string_literal: true module JiraConnect - class SyncFeatureFlagsWorker + class SyncFeatureFlagsWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker sidekiq_options retry: 3 - idempotent! - worker_has_external_dependencies! - queue_namespace :jira_connect feature_category :integrations + data_consistency :delayed tags :exclude_from_kubernetes + worker_has_external_dependencies! + def perform(feature_flag_id, sequence_id) feature_flag = ::Operations::FeatureFlag.find_by_id(feature_flag_id) diff --git a/app/workers/jira_connect/sync_merge_request_worker.rb b/app/workers/jira_connect/sync_merge_request_worker.rb index bf31df2271f..bb0d24667e9 100644 --- a/app/workers/jira_connect/sync_merge_request_worker.rb +++ b/app/workers/jira_connect/sync_merge_request_worker.rb @@ -1,14 +1,14 @@ # frozen_string_literal: true module JiraConnect - class SyncMergeRequestWorker + class SyncMergeRequestWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker sidekiq_options retry: 3 queue_namespace :jira_connect feature_category :integrations - idempotent! + data_consistency :delayed worker_has_external_dependencies! diff --git a/app/workers/jira_connect/sync_project_worker.rb b/app/workers/jira_connect/sync_project_worker.rb index dfff0c4b3b6..317bace89b4 100644 --- a/app/workers/jira_connect/sync_project_worker.rb +++ b/app/workers/jira_connect/sync_project_worker.rb @@ -1,15 +1,16 @@ # frozen_string_literal: true module JiraConnect - class SyncProjectWorker + class SyncProjectWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker sidekiq_options retry: 3 queue_namespace :jira_connect feature_category :integrations + data_consistency :delayed tags :exclude_from_kubernetes - idempotent! + worker_has_external_dependencies! MERGE_REQUEST_LIMIT = 400 diff --git a/app/workers/merge_request_cleanup_refs_worker.rb b/app/workers/merge_request_cleanup_refs_worker.rb index 162c6dc2a88..408d070d56f 100644 --- a/app/workers/merge_request_cleanup_refs_worker.rb +++ b/app/workers/merge_request_cleanup_refs_worker.rb @@ -2,6 +2,8 @@ class MergeRequestCleanupRefsWorker include ApplicationWorker + include LimitedCapacity::Worker + include Gitlab::Utils::StrongMemoize sidekiq_options retry: 3 @@ -9,20 +11,60 @@ class MergeRequestCleanupRefsWorker tags :exclude_from_kubernetes idempotent! - def perform(merge_request_id) - return unless Feature.enabled?(:merge_request_refs_cleanup, default_enabled: false) + # Hard-coded to 4 for now. Will be configurable later on via application settings. + # This means, there can only be 4 jobs running at the same time at maximum. + MAX_RUNNING_JOBS = 4 + FAILURE_THRESHOLD = 3 - merge_request = MergeRequest.find_by_id(merge_request_id) + def perform_work + return unless Feature.enabled?(:merge_request_refs_cleanup, default_enabled: false) unless merge_request - logger.error("Failed to find merge request with ID: #{merge_request_id}") + logger.error('No existing merge request to be cleaned up.') return end - result = ::MergeRequests::CleanupRefsService.new(merge_request).execute + log_extra_metadata_on_done(:merge_request_id, merge_request.id) + + result = MergeRequests::CleanupRefsService.new(merge_request).execute + + if result[:status] == :success + merge_request_cleanup_schedule.complete! + else + if merge_request_cleanup_schedule.failed_count < FAILURE_THRESHOLD + merge_request_cleanup_schedule.retry! + else + merge_request_cleanup_schedule.mark_as_failed! + end + + log_extra_metadata_on_done(:message, result[:message]) + end + + log_extra_metadata_on_done(:status, merge_request_cleanup_schedule.status) + end + + def remaining_work_count + MergeRequest::CleanupSchedule + .scheduled_and_unstarted + .limit(max_running_jobs) + .count + end + + def max_running_jobs + MAX_RUNNING_JOBS + end + + private - return if result[:status] == :success + def merge_request + strong_memoize(:merge_request) do + merge_request_cleanup_schedule&.merge_request + end + end - logger.error("Failed cleanup refs of merge request (#{merge_request_id}): #{result[:message]}") + def merge_request_cleanup_schedule + strong_memoize(:merge_request_cleanup_schedule) do + MergeRequest::CleanupSchedule.start_next + end end end diff --git a/app/workers/namespaces/in_product_marketing_emails_worker.rb b/app/workers/namespaces/in_product_marketing_emails_worker.rb index 7985325d1ad..1f46be29553 100644 --- a/app/workers/namespaces/in_product_marketing_emails_worker.rb +++ b/app/workers/namespaces/in_product_marketing_emails_worker.rb @@ -14,7 +14,6 @@ module Namespaces def perform return if paid_self_managed_instance? return if setting_disabled? - return if experiment_inactive? Namespaces::InProductMarketingEmailsService.send_for_all_tracks_and_intervals end @@ -28,10 +27,6 @@ module Namespaces def setting_disabled? !Gitlab::CurrentSettings.in_product_marketing_emails_enabled end - - def experiment_inactive? - Gitlab.com? && !Gitlab::Experimentation.active?(:in_product_marketing_emails) - end end end diff --git a/app/workers/packages/helm/extraction_worker.rb b/app/workers/packages/helm/extraction_worker.rb new file mode 100644 index 00000000000..fd4e720da94 --- /dev/null +++ b/app/workers/packages/helm/extraction_worker.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Packages + module Helm + class ExtractionWorker + include ApplicationWorker + + queue_namespace :package_repositories + feature_category :package_registry + deduplicate :until_executing + + idempotent! + + def perform(channel, package_file_id) + package_file = ::Packages::PackageFile.find_by_id(package_file_id) + + return unless package_file && !package_file.package.default? + + ::Packages::Helm::ProcessFileService.new(channel, package_file).execute + + rescue ::Packages::Helm::ExtractFileMetadataService::ExtractionError, + ::Packages::Helm::ProcessFileService::ExtractionError, + ::ActiveModel::ValidationError => e + Gitlab::ErrorTracking.log_exception(e, project_id: package_file.project_id) + package_file.package.update_column(:status, :error) + end + end + end +end diff --git a/app/workers/partition_creation_worker.rb b/app/workers/partition_creation_worker.rb index 2b21741d6c2..bb4834ab2dd 100644 --- a/app/workers/partition_creation_worker.rb +++ b/app/workers/partition_creation_worker.rb @@ -10,8 +10,7 @@ class PartitionCreationWorker idempotent! def perform - Gitlab::Database::Partitioning::PartitionCreator.new.create_partitions - ensure - Gitlab::Database::Partitioning::PartitionMonitoring.new.report_metrics + # This worker has been removed in favor of Database::PartitionManagementWorker + Database::PartitionManagementWorker.new.perform end end diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb index 97e6adbbf18..40d138752b4 100644 --- a/app/workers/pipeline_hooks_worker.rb +++ b/app/workers/pipeline_hooks_worker.rb @@ -8,7 +8,7 @@ class PipelineHooksWorker # rubocop:disable Scalability/IdempotentWorker queue_namespace :pipeline_hooks worker_resource_boundary :cpu - data_consistency :delayed, feature_flag: :load_balancing_for_pipeline_hooks_worker + data_consistency :delayed # rubocop: disable CodeReuse/ActiveRecord def perform(pipeline_id) diff --git a/app/workers/project_service_worker.rb b/app/workers/project_service_worker.rb index 967be3b3e81..da38d2fc0cd 100644 --- a/app/workers/project_service_worker.rb +++ b/app/workers/project_service_worker.rb @@ -15,6 +15,6 @@ class ProjectServiceWorker # rubocop:disable Scalability/IdempotentWorker integration.execute(data) rescue StandardError => error integration_class = integration&.class&.name || "Not Found" - logger.error class: self.class.name, service_class: integration_class, message: error.message + Gitlab::ErrorTracking.log_exception(error, integration_class: integration_class) end end diff --git a/app/workers/projects/post_creation_worker.rb b/app/workers/projects/post_creation_worker.rb index 1970f79729f..389e987e81a 100644 --- a/app/workers/projects/post_creation_worker.rb +++ b/app/workers/projects/post_creation_worker.rb @@ -15,21 +15,21 @@ module Projects return unless project - create_prometheus_service(project) + create_prometheus_integration(project) end private - def create_prometheus_service(project) - service = project.find_or_initialize_service(::PrometheusService.to_param) + def create_prometheus_integration(project) + integration = project.find_or_initialize_integration(::Integrations::Prometheus.to_param) # If the service has already been inserted in the database, that # means it came from a template, and there's nothing more to do. - return if service.persisted? + return if integration.persisted? - return unless service.prometheus_available? + return unless integration.prometheus_available? - service.save! + integration.save! rescue ActiveRecord::RecordInvalid => e Gitlab::ErrorTracking.track_exception(e, extra: { project_id: project.id }) end diff --git a/app/workers/prometheus/create_default_alerts_worker.rb b/app/workers/prometheus/create_default_alerts_worker.rb index 0dba752ced1..9d163cd828e 100644 --- a/app/workers/prometheus/create_default_alerts_worker.rb +++ b/app/workers/prometheus/create_default_alerts_worker.rb @@ -15,7 +15,7 @@ module Prometheus return unless project - result = Prometheus::CreateDefaultAlertsService.new(project: project).execute + result = ::Prometheus::CreateDefaultAlertsService.new(project: project).execute log_info(result.message) if result.error? end diff --git a/app/workers/repository_check/single_repository_worker.rb b/app/workers/repository_check/single_repository_worker.rb index a9a8201205e..31d68e65b23 100644 --- a/app/workers/repository_check/single_repository_worker.rb +++ b/app/workers/repository_check/single_repository_worker.rb @@ -46,7 +46,7 @@ module RepositoryCheck true rescue Gitlab::Git::Repository::GitError => e - Gitlab::RepositoryCheckLogger.error(e.message) + Gitlab::RepositoryCheckLogger.error("#{repository.full_path}: #{e.message}") false end diff --git a/app/workers/schedule_merge_request_cleanup_refs_worker.rb b/app/workers/schedule_merge_request_cleanup_refs_worker.rb index b5ea5298879..40a773ca58f 100644 --- a/app/workers/schedule_merge_request_cleanup_refs_worker.rb +++ b/app/workers/schedule_merge_request_cleanup_refs_worker.rb @@ -10,21 +10,10 @@ class ScheduleMergeRequestCleanupRefsWorker tags :exclude_from_kubernetes idempotent! - # Based on existing data, MergeRequestCleanupRefsWorker can run 3 jobs per - # second. This means that 180 jobs can be performed but since there are some - # spikes from time time, it's better to give it some allowance. - LIMIT = 180 - DELAY = 10.seconds - BATCH_SIZE = 30 - def perform return if Gitlab::Database.read_only? return unless Feature.enabled?(:merge_request_refs_cleanup, default_enabled: false) - ids = MergeRequest::CleanupSchedule.scheduled_merge_request_ids(LIMIT).map { |id| [id] } - - MergeRequestCleanupRefsWorker.bulk_perform_in(DELAY, ids, batch_size: BATCH_SIZE) # rubocop:disable Scalability/BulkPerformWithContext - - log_extra_metadata_on_done(:merge_requests_count, ids.size) + MergeRequestCleanupRefsWorker.perform_with_capacity end end |