diff options
author | Matija Čupić <matteeyah@gmail.com> | 2017-12-21 18:30:34 +0100 |
---|---|---|
committer | Matija Čupić <matteeyah@gmail.com> | 2017-12-21 18:30:34 +0100 |
commit | 305bce8d246d2c6e88b5f22439c0ce0833eba1a3 (patch) | |
tree | e043cb4041c121957610f81d6a65790e91f84fb9 /app/workers | |
parent | 614c0e0bf9c404ba43f835166183a2f1883071d1 (diff) | |
parent | b8d79cc479200ff714f89dc43a3bbec18af3c5b5 (diff) | |
download | gitlab-ce-305bce8d246d2c6e88b5f22439c0ce0833eba1a3.tar.gz |
Merge branch 'master' into 39957-redirect-to-gpc-page-if-users-try-to-create-a-cluster-but-the-account-is-not-enabled
Diffstat (limited to 'app/workers')
29 files changed, 220 insertions, 74 deletions
diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml new file mode 100644 index 00000000000..268b7028fd9 --- /dev/null +++ b/app/workers/all_queues.yml @@ -0,0 +1,99 @@ +--- +- cronjob:admin_email +- cronjob:expire_build_artifacts +- cronjob:gitlab_usage_ping +- cronjob:import_export_project_cleanup +- cronjob:pipeline_schedule +- cronjob:prune_old_events +- cronjob:remove_expired_group_links +- cronjob:remove_expired_members +- cronjob:remove_old_web_hook_logs +- cronjob:remove_unreferenced_lfs_objects +- cronjob:repository_archive_cache +- cronjob:repository_check_batch +- cronjob:requests_profiles +- cronjob:schedule_update_user_activity +- cronjob:stuck_ci_jobs +- cronjob:stuck_import_jobs +- cronjob:stuck_merge_jobs +- cronjob:trending_projects + +- gcp_cluster:cluster_install_app +- gcp_cluster:cluster_provision +- gcp_cluster:cluster_wait_for_app_installation +- gcp_cluster:wait_for_cluster_creation + +- github_import_advance_stage +- github_importer:github_import_import_diff_note +- github_importer:github_import_import_issue +- github_importer:github_import_import_note +- github_importer:github_import_import_pull_request +- github_importer:github_import_refresh_import_jid +- github_importer:github_import_stage_finish_import +- github_importer:github_import_stage_import_base_data +- github_importer:github_import_stage_import_issues_and_diff_notes +- github_importer:github_import_stage_import_notes +- github_importer:github_import_stage_import_pull_requests +- github_importer:github_import_stage_import_repository + +- pipeline_cache:expire_job_cache +- pipeline_cache:expire_pipeline_cache +- pipeline_creation:create_pipeline +- pipeline_creation:run_pipeline_schedule +- pipeline_default:build_coverage +- pipeline_default:build_trace_sections +- pipeline_default:pipeline_metrics +- pipeline_default:pipeline_notification +- pipeline_default:update_head_pipeline_for_merge_request +- pipeline_hooks:build_hooks +- pipeline_hooks:pipeline_hooks +- pipeline_processing:build_finished +- pipeline_processing:build_queue +- pipeline_processing:build_success +- pipeline_processing:pipeline_process +- pipeline_processing:pipeline_success +- pipeline_processing:pipeline_update +- pipeline_processing:stage_update + +- repository_check:repository_check_clear +- repository_check:repository_check_single_repository + +- default +- mailers # ActionMailer::DeliveryJob.queue_name + +- authorized_projects +- background_migration +- create_gpg_signature +- delete_merged_branches +- delete_user +- email_receiver +- emails_on_push +- expire_build_instance_artifacts +- git_garbage_collect +- gitlab_shell +- group_destroy +- invalid_gpg_signature_update +- irker +- merge +- namespaceless_project_destroy +- new_issue +- new_merge_request +- new_note +- pages +- post_receive +- process_commit +- project_cache +- project_destroy +- project_export +- project_migrate_hashed_storage +- project_service +- propagate_service_template +- reactive_caching +- repository_fork +- repository_import +- storage_migrator +- system_hook_push +- update_merge_requests +- update_user_activity +- upload_checksum +- web_hook diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index 5efa9180f5e..97d80305bec 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -2,7 +2,7 @@ class BuildFinishedWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_hooks_worker.rb b/app/workers/build_hooks_worker.rb index 6705a1c2709..cbfca8c342c 100644 --- a/app/workers/build_hooks_worker.rb +++ b/app/workers/build_hooks_worker.rb @@ -2,7 +2,7 @@ class BuildHooksWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :hooks + queue_namespace :pipeline_hooks def perform(build_id) Ci::Build.find_by(id: build_id) diff --git a/app/workers/build_queue_worker.rb b/app/workers/build_queue_worker.rb index fc775a84dc0..e4f4e6c1d9e 100644 --- a/app/workers/build_queue_worker.rb +++ b/app/workers/build_queue_worker.rb @@ -2,7 +2,7 @@ class BuildQueueWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb index ec049821ad7..4b9097bc5e4 100644 --- a/app/workers/build_success_worker.rb +++ b/app/workers/build_success_worker.rb @@ -2,7 +2,7 @@ class BuildSuccessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index 9c3bdabc49e..37586e161c9 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -3,13 +3,23 @@ Sidekiq::Worker.extend ActiveSupport::Concern module ApplicationWorker extend ActiveSupport::Concern - include Sidekiq::Worker + include Sidekiq::Worker # rubocop:disable Cop/IncludeSidekiqWorker included do - sidekiq_options queue: base_queue_name + set_queue end module ClassMethods + def inherited(subclass) + subclass.set_queue + end + + def set_queue + queue_name = [queue_namespace, base_queue_name].compact.join(':') + + sidekiq_options queue: queue_name # rubocop:disable Cop/SidekiqOptionsQueue + end + def base_queue_name name .sub(/\AGitlab::/, '') @@ -18,6 +28,16 @@ module ApplicationWorker .tr('/', '_') end + def queue_namespace(new_namespace = nil) + if new_namespace + sidekiq_options queue_namespace: new_namespace + + set_queue + else + get_sidekiq_options['queue_namespace']&.to_s + end + end + def queue get_sidekiq_options['queue'].to_s end diff --git a/app/workers/concerns/cluster_queue.rb b/app/workers/concerns/cluster_queue.rb index a5074d13220..24b9f145220 100644 --- a/app/workers/concerns/cluster_queue.rb +++ b/app/workers/concerns/cluster_queue.rb @@ -5,6 +5,6 @@ module ClusterQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :gcp_cluster + queue_namespace :gcp_cluster end end diff --git a/app/workers/concerns/cronjob_queue.rb b/app/workers/concerns/cronjob_queue.rb index e918bb011e0..b6581779f6a 100644 --- a/app/workers/concerns/cronjob_queue.rb +++ b/app/workers/concerns/cronjob_queue.rb @@ -4,6 +4,7 @@ module CronjobQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :cronjob, retry: false + queue_namespace :cronjob + sidekiq_options retry: false end end diff --git a/app/workers/concerns/gitlab/github_import/queue.rb b/app/workers/concerns/gitlab/github_import/queue.rb index a2bee361b86..22c2ce458e8 100644 --- a/app/workers/concerns/gitlab/github_import/queue.rb +++ b/app/workers/concerns/gitlab/github_import/queue.rb @@ -4,12 +4,14 @@ module Gitlab extend ActiveSupport::Concern included do + queue_namespace :github_importer + # If a job produces an error it may block a stage from advancing # forever. To prevent this from happening we prevent jobs from going to # the dead queue. This does mean some resources may not be imported, but # this is better than a project being stuck in the "import" state # forever. - sidekiq_options queue: 'github_importer', dead: false, retry: 5 + sidekiq_options dead: false, retry: 5 end end end diff --git a/app/workers/concerns/new_issuable.rb b/app/workers/concerns/new_issuable.rb index eb0d6c9c36c..526ed0bad07 100644 --- a/app/workers/concerns/new_issuable.rb +++ b/app/workers/concerns/new_issuable.rb @@ -9,15 +9,15 @@ module NewIssuable end def set_user(user_id) - @user = User.find_by(id: user_id) + @user = User.find_by(id: user_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables - log_error(User, user_id) unless @user + log_error(User, user_id) unless @user # rubocop:disable Gitlab/ModuleWithInstanceVariables end def set_issuable(issuable_id) - @issuable = issuable_class.find_by(id: issuable_id) + @issuable = issuable_class.find_by(id: issuable_id) # rubocop:disable Gitlab/ModuleWithInstanceVariables - log_error(issuable_class, issuable_id) unless @issuable + log_error(issuable_class, issuable_id) unless @issuable # rubocop:disable Gitlab/ModuleWithInstanceVariables end def log_error(record_class, record_id) diff --git a/app/workers/concerns/pipeline_queue.rb b/app/workers/concerns/pipeline_queue.rb index ddf45b91345..e77093a6902 100644 --- a/app/workers/concerns/pipeline_queue.rb +++ b/app/workers/concerns/pipeline_queue.rb @@ -5,14 +5,6 @@ module PipelineQueue extend ActiveSupport::Concern included do - sidekiq_options queue: 'pipeline_default' - end - - class_methods do - def enqueue_in(group:) - raise ArgumentError, 'Unspecified queue group!' if group.empty? - - sidekiq_options queue: "pipeline_#{group}" - end + queue_namespace :pipeline_default end end diff --git a/app/workers/concerns/project_import_options.rb b/app/workers/concerns/project_import_options.rb new file mode 100644 index 00000000000..10b971344f7 --- /dev/null +++ b/app/workers/concerns/project_import_options.rb @@ -0,0 +1,23 @@ +module ProjectImportOptions + extend ActiveSupport::Concern + + included do + IMPORT_RETRY_COUNT = 5 + + sidekiq_options retry: IMPORT_RETRY_COUNT, status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION + + # We only want to mark the project as failed once we exhausted all retries + sidekiq_retries_exhausted do |job| + project = Project.find(job['args'].first) + + action = if project.forked? + "fork" + else + "import" + end + + project.mark_import_as_failed("Every #{action} attempt has failed: #{job['error_message']}. Please try again.") + Sidekiq.logger.warn "Failed #{job['class']} with #{job['args']}: #{job['error_message']}" + end + end +end diff --git a/app/workers/concerns/project_start_import.rb b/app/workers/concerns/project_start_import.rb index 0704ebbb0fd..4e55a1ee3d6 100644 --- a/app/workers/concerns/project_start_import.rb +++ b/app/workers/concerns/project_start_import.rb @@ -1,3 +1,4 @@ +# Used in EE by mirroring module ProjectStartImport def start(project) if project.import_started? && project.import_jid == self.jid diff --git a/app/workers/concerns/repository_check_queue.rb b/app/workers/concerns/repository_check_queue.rb index a597321ccf4..43fb66c31b0 100644 --- a/app/workers/concerns/repository_check_queue.rb +++ b/app/workers/concerns/repository_check_queue.rb @@ -3,6 +3,8 @@ module RepositoryCheckQueue extend ActiveSupport::Concern included do - sidekiq_options queue: :repository_check, retry: false + queue_namespace :repository_check + + sidekiq_options retry: false end end diff --git a/app/workers/create_pipeline_worker.rb b/app/workers/create_pipeline_worker.rb index 00cd7b85b9f..c3ac35e54f5 100644 --- a/app/workers/create_pipeline_worker.rb +++ b/app/workers/create_pipeline_worker.rb @@ -2,7 +2,7 @@ class CreatePipelineWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :creation + queue_namespace :pipeline_creation def perform(project_id, user_id, ref, source, params = {}) project = Project.find(project_id) diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb index a591e2da519..7217364a9f2 100644 --- a/app/workers/expire_job_cache_worker.rb +++ b/app/workers/expire_job_cache_worker.rb @@ -2,7 +2,7 @@ class ExpireJobCacheWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :cache + queue_namespace :pipeline_cache def perform(job_id) job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index a3ac32b437d..db73d37868a 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -2,7 +2,7 @@ class ExpirePipelineCacheWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :cache + queue_namespace :pipeline_cache def perform(pipeline_id) pipeline = Ci::Pipeline.find_by(id: pipeline_id) @@ -13,7 +13,7 @@ class ExpirePipelineCacheWorker store.touch(project_pipelines_path(project)) store.touch(project_pipeline_path(project, pipeline)) - store.touch(commit_pipelines_path(project, pipeline.commit)) if pipeline.commit + store.touch(commit_pipelines_path(project, pipeline.commit)) unless pipeline.commit.nil? store.touch(new_merge_request_pipelines_path(project)) each_pipelines_merge_request_path(project, pipeline) do |path| store.touch(path) diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb index 400396d5755..f7f498af840 100644 --- a/app/workers/gitlab/github_import/advance_stage_worker.rb +++ b/app/workers/gitlab/github_import/advance_stage_worker.rb @@ -9,7 +9,7 @@ module Gitlab class AdvanceStageWorker include ApplicationWorker - sidekiq_options queue: 'github_importer_advance_stage', dead: false + sidekiq_options dead: false INTERVAL = 30.seconds.to_i diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb index 62f733c02fc..3ec81d040b4 100644 --- a/app/workers/pages_worker.rb +++ b/app/workers/pages_worker.rb @@ -1,7 +1,7 @@ class PagesWorker include ApplicationWorker - sidekiq_options queue: :pages, retry: false + sidekiq_options retry: false def perform(action, *arg) send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend diff --git a/app/workers/pipeline_hooks_worker.rb b/app/workers/pipeline_hooks_worker.rb index 661c29efe88..c94918ff4ee 100644 --- a/app/workers/pipeline_hooks_worker.rb +++ b/app/workers/pipeline_hooks_worker.rb @@ -2,7 +2,7 @@ class PipelineHooksWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :hooks + queue_namespace :pipeline_hooks def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/pipeline_process_worker.rb b/app/workers/pipeline_process_worker.rb index 07dbf6a971e..24424b3f472 100644 --- a/app/workers/pipeline_process_worker.rb +++ b/app/workers/pipeline_process_worker.rb @@ -2,7 +2,7 @@ class PipelineProcessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/pipeline_success_worker.rb b/app/workers/pipeline_success_worker.rb index 68c40a259e1..2ab0739a17f 100644 --- a/app/workers/pipeline_success_worker.rb +++ b/app/workers/pipeline_success_worker.rb @@ -2,7 +2,7 @@ class PipelineSuccessWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id).try do |pipeline| diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb index 24a8a9fbed5..fc9da2d45b1 100644 --- a/app/workers/pipeline_update_worker.rb +++ b/app/workers/pipeline_update_worker.rb @@ -2,7 +2,7 @@ class PipelineUpdateWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(pipeline_id) Ci::Pipeline.find_by(id: pipeline_id) diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb index a07ef1705a1..d1c57b82681 100644 --- a/app/workers/repository_fork_worker.rb +++ b/app/workers/repository_fork_worker.rb @@ -1,11 +1,8 @@ class RepositoryForkWorker - ForkError = Class.new(StandardError) - include ApplicationWorker include Gitlab::ShellAdapter include ProjectStartImport - - sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION + include ProjectImportOptions def perform(project_id, forked_from_repository_storage_path, source_disk_path) project = Project.find(project_id) @@ -18,20 +15,12 @@ class RepositoryForkWorker result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_disk_path, project.repository_storage_path, project.disk_path) - raise ForkError, "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result + raise "Unable to fork project #{project_id} for repository #{source_disk_path} -> #{project.disk_path}" unless result project.repository.after_import - raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo? + raise "Project #{project_id} had an invalid repository after fork" unless project.valid_repo? project.import_finish - rescue ForkError => ex - fail_fork(project, ex.message) - raise - rescue => ex - return unless project - - fail_fork(project, ex.message) - raise ForkError, "#{ex.class} #{ex.message}" end private @@ -42,9 +31,4 @@ class RepositoryForkWorker Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.") false end - - def fail_fork(project, message) - Rails.logger.error(message) - project.mark_import_as_failed(message) - end end diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb index 55715c83cb1..31e2798c36b 100644 --- a/app/workers/repository_import_worker.rb +++ b/app/workers/repository_import_worker.rb @@ -1,11 +1,8 @@ class RepositoryImportWorker - ImportError = Class.new(StandardError) - include ApplicationWorker include ExceptionBacktrace include ProjectStartImport - - sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION + include ProjectImportOptions def perform(project_id) project = Project.find(project_id) @@ -23,17 +20,9 @@ class RepositoryImportWorker # to those importers to mark the import process as complete. return if service.async? - raise ImportError, result[:message] if result[:status] == :error + raise result[:message] if result[:status] == :error project.after_import - rescue ImportError => ex - fail_import(project, ex.message) - raise - rescue => ex - return unless project - - fail_import(project, ex.message) - raise ImportError, "#{ex.class} #{ex.message}" end private @@ -44,8 +33,4 @@ class RepositoryImportWorker Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.") false end - - def fail_import(project, message) - project.mark_import_as_failed(message) - end end diff --git a/app/workers/run_pipeline_schedule_worker.rb b/app/workers/run_pipeline_schedule_worker.rb new file mode 100644 index 00000000000..8f5138fc873 --- /dev/null +++ b/app/workers/run_pipeline_schedule_worker.rb @@ -0,0 +1,22 @@ +class RunPipelineScheduleWorker + include ApplicationWorker + include PipelineQueue + + queue_namespace :pipeline_creation + + def perform(schedule_id, user_id) + schedule = Ci::PipelineSchedule.find_by(id: schedule_id) + user = User.find_by(id: user_id) + + return unless schedule && user + + run_pipeline_schedule(schedule, user) + end + + def run_pipeline_schedule(schedule, user) + Ci::CreatePipelineService.new(schedule.project, + user, + ref: schedule.ref) + .execute(:schedule, ignore_skip_ci: true, save_on_errors: false, schedule: schedule) + end +end diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb index 69f2318d83b..e4b683fca33 100644 --- a/app/workers/stage_update_worker.rb +++ b/app/workers/stage_update_worker.rb @@ -2,7 +2,7 @@ class StageUpdateWorker include ApplicationWorker include PipelineQueue - enqueue_in group: :processing + queue_namespace :pipeline_processing def perform(stage_id) Ci::Stage.find_by(id: stage_id).try do |stage| diff --git a/app/workers/stuck_merge_jobs_worker.rb b/app/workers/stuck_merge_jobs_worker.rb index 36d2a2e6466..16394293c79 100644 --- a/app/workers/stuck_merge_jobs_worker.rb +++ b/app/workers/stuck_merge_jobs_worker.rb @@ -23,7 +23,12 @@ class StuckMergeJobsWorker merge_requests = MergeRequest.where(id: completed_ids) merge_requests.where.not(merge_commit_sha: nil).update_all(state: :merged) - merge_requests.where(merge_commit_sha: nil).update_all(state: :opened, merge_jid: nil) + + merge_requests_to_reopen = merge_requests.where(merge_commit_sha: nil) + + # Do not reopen merge requests using direct queries. + # We rely on state machine callbacks to update head_pipeline_id + merge_requests_to_reopen.each(&:unlock_mr) Rails.logger.info("Updated state of locked merge jobs. JIDs: #{completed_jids.join(', ')}") end diff --git a/app/workers/update_head_pipeline_for_merge_request_worker.rb b/app/workers/update_head_pipeline_for_merge_request_worker.rb index 0a2e9b63578..f09d89aa170 100644 --- a/app/workers/update_head_pipeline_for_merge_request_worker.rb +++ b/app/workers/update_head_pipeline_for_merge_request_worker.rb @@ -1,15 +1,25 @@ class UpdateHeadPipelineForMergeRequestWorker include ApplicationWorker - - sidekiq_options queue: 'pipeline_default' + include PipelineQueue def perform(merge_request_id) merge_request = MergeRequest.find(merge_request_id) pipeline = Ci::Pipeline.where(project: merge_request.source_project, ref: merge_request.source_branch).last return unless pipeline && pipeline.latest? - raise ArgumentError, 'merge request sha does not equal pipeline sha' if merge_request.diff_head_sha != pipeline.sha + + if merge_request.diff_head_sha != pipeline.sha + log_error_message_for(merge_request) + + return + end merge_request.update_attribute(:head_pipeline_id, pipeline.id) end + + def log_error_message_for(merge_request) + Rails.logger.error( + "Outdated head pipeline for active merge request: id=#{merge_request.id}, source_branch=#{merge_request.source_branch}, diff_head_sha=#{merge_request.diff_head_sha}" + ) + end end |