diff options
Diffstat (limited to 'app/workers')
32 files changed, 426 insertions, 136 deletions
diff --git a/app/workers/authorized_projects_worker.rb b/app/workers/authorized_projects_worker.rb index 0e20df506a3..13207a8bc71 100644 --- a/app/workers/authorized_projects_worker.rb +++ b/app/workers/authorized_projects_worker.rb @@ -10,7 +10,7 @@ class AuthorizedProjectsWorker end def self.bulk_perform_async(args_list) - Sidekiq::Client.push_bulk('class' => self, 'args' => args_list) + Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list) end def perform(user_id) diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb new file mode 100644 index 00000000000..45ce49bb5c0 --- /dev/null +++ b/app/workers/background_migration_worker.rb @@ -0,0 +1,39 @@ +class BackgroundMigrationWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + # Enqueues a number of jobs in bulk. + # + # The `jobs` argument should be an Array of Arrays, each sub-array must be in + # the form: + # + # [migration-class, [arg1, arg2, ...]] + def self.perform_bulk(jobs) + Sidekiq::Client.push_bulk('class' => self, + 'queue' => sidekiq_options['queue'], + 'args' => jobs) + end + + # Schedules multiple jobs in bulk, with a delay. + # + def self.perform_bulk_in(delay, jobs) + now = Time.now.to_i + schedule = now + delay.to_i + + if schedule <= now + raise ArgumentError, 'The schedule time must be in the future!' + end + + Sidekiq::Client.push_bulk('class' => self, + 'queue' => sidekiq_options['queue'], + 'args' => jobs, + 'at' => schedule) + end + + # Performs the background migration. + # + # See Gitlab::BackgroundMigration.perform for more information. + def perform(class_name, arguments = []) + Gitlab::BackgroundMigration.perform(class_name, arguments) + end +end diff --git a/app/workers/build_coverage_worker.rb b/app/workers/build_coverage_worker.rb index def0ab1dde1..f7ae996bb17 100644 --- a/app/workers/build_coverage_worker.rb +++ b/app/workers/build_coverage_worker.rb @@ -3,7 +3,6 @@ class BuildCoverageWorker include BuildQueue def perform(build_id) - Ci::Build.find_by(id: build_id) - .try(:update_coverage) + Ci::Build.find_by(id: build_id)&.update_coverage end end diff --git a/app/workers/build_email_worker.rb b/app/workers/build_email_worker.rb deleted file mode 100644 index 5fdb1f2baa0..00000000000 --- a/app/workers/build_email_worker.rb +++ /dev/null @@ -1,20 +0,0 @@ -class BuildEmailWorker - include Sidekiq::Worker - include BuildQueue - - def perform(build_id, recipients, push_data) - recipients.each do |recipient| - begin - case push_data['build_status'] - when 'success' - Notify.build_success_email(build_id, recipient).deliver_now - when 'failed' - Notify.build_fail_email(build_id, recipient).deliver_now - end - # These are input errors and won't be corrected even if Sidekiq retries - rescue Net::SMTPFatalError, Net::SMTPSyntaxError => e - logger.info("Failed to send e-mail for project '#{push_data['project_name']}' to #{recipient}: #{e}") - end - end - end -end diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb index e17add7421f..bf009dfab0f 100644 --- a/app/workers/build_success_worker.rb +++ b/app/workers/build_success_worker.rb @@ -11,15 +11,6 @@ class BuildSuccessWorker private def create_deployment(build) - service = CreateDeploymentService.new( - build.project, build.user, - environment: build.environment, - sha: build.sha, - ref: build.ref, - tag: build.tag, - options: build.options.to_h[:environment], - variables: build.variables) - - service.execute(build) + CreateDeploymentService.new(build).execute end end diff --git a/app/workers/clear_database_cache_worker.rb b/app/workers/clear_database_cache_worker.rb deleted file mode 100644 index c4cb4733482..00000000000 --- a/app/workers/clear_database_cache_worker.rb +++ /dev/null @@ -1,24 +0,0 @@ -# This worker clears all cache fields in the database, working in batches. -class ClearDatabaseCacheWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue - - BATCH_SIZE = 1000 - - def perform - CacheMarkdownField.caching_classes.each do |kls| - fields = kls.cached_markdown_fields.html_fields - clear_cache_fields = fields.each_with_object({}) do |field, memo| - memo[field] = nil - end - - Rails.logger.debug("Clearing Markdown cache for #{kls}: #{fields.inspect}") - - kls.unscoped.in_batches(of: BATCH_SIZE) do |relation| - relation.update_all(clear_cache_fields) - end - end - - nil - end -end diff --git a/app/workers/expire_build_instance_artifacts_worker.rb b/app/workers/expire_build_instance_artifacts_worker.rb index eb403c134d1..7b59e976492 100644 --- a/app/workers/expire_build_instance_artifacts_worker.rb +++ b/app/workers/expire_build_instance_artifacts_worker.rb @@ -8,7 +8,7 @@ class ExpireBuildInstanceArtifactsWorker .reorder(nil) .find_by(id: build_id) - return unless build.try(:project) + return unless build&.project && !build.project.pending_delete Rails.logger.info "Removing artifacts for build #{build.id}..." build.erase_artifacts! diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb new file mode 100644 index 00000000000..e383202260d --- /dev/null +++ b/app/workers/expire_job_cache_worker.rb @@ -0,0 +1,27 @@ +class ExpireJobCacheWorker + include Sidekiq::Worker + include BuildQueue + + def perform(job_id) + job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) + return unless job + + pipeline = job.pipeline + project = job.project + + Gitlab::EtagCaching::Store.new.tap do |store| + store.touch(project_pipeline_path(project, pipeline)) + store.touch(project_job_path(project, job)) + end + end + + private + + def project_pipeline_path(project, pipeline) + Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json) + end + + def project_job_path(project, job) + Gitlab::Routing.url_helpers.project_build_path(project, job.id, format: :json) + end +end diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb new file mode 100644 index 00000000000..7c02d6cf892 --- /dev/null +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -0,0 +1,48 @@ +class ExpirePipelineCacheWorker + include Sidekiq::Worker + include PipelineQueue + + def perform(pipeline_id) + pipeline = Ci::Pipeline.find_by(id: pipeline_id) + return unless pipeline + + project = pipeline.project + store = Gitlab::EtagCaching::Store.new + + store.touch(project_pipelines_path(project)) + store.touch(project_pipeline_path(project, pipeline)) + store.touch(commit_pipelines_path(project, pipeline.commit)) if pipeline.commit + store.touch(new_merge_request_pipelines_path(project)) + each_pipelines_merge_request_path(project, pipeline) do |path| + store.touch(path) + end + + Gitlab::Cache::Ci::ProjectPipelineStatus.update_for_pipeline(pipeline) + end + + private + + def project_pipelines_path(project) + Gitlab::Routing.url_helpers.project_pipelines_path(project, format: :json) + end + + def project_pipeline_path(project, pipeline) + Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json) + end + + def commit_pipelines_path(project, commit) + Gitlab::Routing.url_helpers.pipelines_project_commit_path(project, commit.id, format: :json) + end + + def new_merge_request_pipelines_path(project) + Gitlab::Routing.url_helpers.project_new_merge_request_path(project, format: :json) + end + + def each_pipelines_merge_request_path(project, pipeline) + pipeline.all_merge_requests.each do |merge_request| + path = Gitlab::Routing.url_helpers.pipelines_project_merge_request_path(project, merge_request, format: :json) + + yield(path) + end + end +end diff --git a/app/workers/gitlab_usage_ping_worker.rb b/app/workers/gitlab_usage_ping_worker.rb new file mode 100644 index 00000000000..0a55aab63fd --- /dev/null +++ b/app/workers/gitlab_usage_ping_worker.rb @@ -0,0 +1,19 @@ +class GitlabUsagePingWorker + LEASE_TIMEOUT = 86400 + + include Sidekiq::Worker + include CronjobQueue + + def perform + # Multiple Sidekiq workers could run this. We should only do this at most once a day. + return unless try_obtain_lease + + SubmitUsagePingService.new.execute + end + + private + + def try_obtain_lease + Gitlab::ExclusiveLease.new('gitlab_usage_ping_worker:ping', timeout: LEASE_TIMEOUT).try_obtain + end +end diff --git a/app/workers/irker_worker.rb b/app/workers/irker_worker.rb index c9658b3fe17..22f67fa9e9f 100644 --- a/app/workers/irker_worker.rb +++ b/app/workers/irker_worker.rb @@ -142,10 +142,10 @@ class IrkerWorker end def files_count(commit) - diffs = commit.raw_diffs(deltas_only: true) + diff_size = commit.raw_deltas.size - files = "#{diffs.real_size} file" - files += 's' if diffs.size > 1 + files = "#{diff_size} file" + files += 's' if diff_size > 1 files end diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb index 79efca4f2f9..48e2da338f6 100644 --- a/app/workers/merge_worker.rb +++ b/app/workers/merge_worker.rb @@ -7,7 +7,7 @@ class MergeWorker current_user = User.find(current_user_id) merge_request = MergeRequest.find(merge_request_id) - MergeRequests::MergeService.new(merge_request.target_project, current_user, params). - execute(merge_request) + MergeRequests::MergeService.new(merge_request.target_project, current_user, params) + .execute(merge_request) end end diff --git a/app/workers/namespaceless_project_destroy_worker.rb b/app/workers/namespaceless_project_destroy_worker.rb new file mode 100644 index 00000000000..bfae0c77700 --- /dev/null +++ b/app/workers/namespaceless_project_destroy_worker.rb @@ -0,0 +1,43 @@ +# Worker to destroy projects that do not have a namespace +# +# It destroys everything it can without having the info about the namespace it +# used to belong to. Projects in this state should be rare. +# The worker will reject doing anything for projects that *do* have a +# namespace. For those use ProjectDestroyWorker instead. +class NamespacelessProjectDestroyWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + def self.bulk_perform_async(args_list) + Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list) + end + + def perform(project_id) + begin + project = Project.unscoped.find(project_id) + rescue ActiveRecord::RecordNotFound + return + end + return unless project.namespace_id.nil? # Reject doing anything for projects that *do* have a namespace + + project.team.truncate + + unlink_fork(project) if project.forked? + + # Override Project#remove_pages for this instance so it doesn't do anything + def project.remove_pages + end + + project.destroy! + end + + private + + def unlink_fork(project) + merge_requests = project.forked_from_project.merge_requests.opened.from_project(project) + + merge_requests.update_all(state: 'closed') + + project.forked_project_link.destroy + end +end diff --git a/app/workers/pipeline_schedule_worker.rb b/app/workers/pipeline_schedule_worker.rb new file mode 100644 index 00000000000..7b485b3363c --- /dev/null +++ b/app/workers/pipeline_schedule_worker.rb @@ -0,0 +1,25 @@ +class PipelineScheduleWorker + include Sidekiq::Worker + include CronjobQueue + + def perform + Ci::PipelineSchedule.active.where("next_run_at < ?", Time.now) + .preload(:owner, :project).find_each do |schedule| + begin + unless schedule.runnable_by_owner? + schedule.deactivate! + next + end + + Ci::CreatePipelineService.new(schedule.project, + schedule.owner, + ref: schedule.ref) + .execute(:schedule, save_on_errors: false, schedule: schedule) + rescue => e + Rails.logger.error "#{schedule.id}: Failed to create a scheduled pipeline: #{e.message}" + ensure + schedule.schedule_next_run! + end + end + end +end diff --git a/app/workers/post_receive.rb b/app/workers/post_receive.rb index 2cd87895c55..b8f8d3750d9 100644 --- a/app/workers/post_receive.rb +++ b/app/workers/post_receive.rb @@ -2,38 +2,34 @@ class PostReceive include Sidekiq::Worker include DedicatedSidekiqQueue - def perform(repo_path, identifier, changes) - if repository_storage = Gitlab.config.repositories.storages.find { |p| repo_path.start_with?(p[1]['path'].to_s) } - repo_path.gsub!(repository_storage[1]['path'].to_s, "") - else - log("Check gitlab.yml config for correct repositories.storages values. No repository storage path matches \"#{repo_path}\"") + def perform(gl_repository, identifier, changes) + project, is_wiki = Gitlab::GlRepository.parse(gl_repository) + + if project.nil? + log("Triggered hook for non-existing project with gl_repository \"#{gl_repository}\"") + return false end changes = Base64.decode64(changes) unless changes.include?(' ') # Use Sidekiq.logger so arguments can be correlated with execution # time and thread ID's. Sidekiq.logger.info "changes: #{changes.inspect}" if ENV['SIDEKIQ_LOG_ARGUMENTS'] - post_received = Gitlab::GitPostReceive.new(repo_path, identifier, changes) - - if post_received.project.nil? - log("Triggered hook for non-existing project with full path \"#{repo_path}\"") - return false - end + post_received = Gitlab::GitPostReceive.new(project, identifier, changes) - if post_received.wiki? - # Nothing defined here yet. - elsif post_received.regular_project? - process_project_changes(post_received) + if is_wiki + process_wiki_changes(post_received) else - log("Triggered hook for unidentifiable repository type with full path \"#{repo_path}\"") - false + process_project_changes(post_received) end end + private + def process_project_changes(post_received) - post_received.changes.each do |change| - oldrev, newrev, ref = change.strip.split(' ') + changes = [] + refs = Set.new + post_received.changes_refs do |oldrev, newrev, ref| @user ||= post_received.identify(newrev) unless @user @@ -46,10 +42,22 @@ class PostReceive elsif Gitlab::Git.branch_ref?(ref) GitPushService.new(post_received.project, @user, oldrev: oldrev, newrev: newrev, ref: ref).execute end + + changes << Gitlab::DataBuilder::Repository.single_change(oldrev, newrev, ref) + refs << ref end + + after_project_changes_hooks(post_received, @user, refs.to_a, changes) end - private + def after_project_changes_hooks(post_received, user, refs, changes) + hook_data = Gitlab::DataBuilder::Repository.update(post_received.project, user, changes, refs) + SystemHooksService.new.execute_hooks(hook_data, :repository_update_hooks) + end + + def process_wiki_changes(post_received) + # Nothing defined here yet. + end def log(message) Gitlab::GitLogger.error("POST-RECEIVE: #{message}") diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb index e9a5bd7f24e..c0c03848a40 100644 --- a/app/workers/process_commit_worker.rb +++ b/app/workers/process_commit_worker.rb @@ -17,12 +17,14 @@ class ProcessCommitWorker project = Project.find_by(id: project_id) return unless project + return if commit_exists_in_upstream?(project, commit_hash) user = User.find_by(id: user_id) return unless user commit = build_commit(project, commit_hash) + author = commit.author || user process_commit_message(project, commit, user, author, default) @@ -45,16 +47,18 @@ class ProcessCommitWorker # therefor we use IssueCollection here and skip the authorization check in # Issues::CloseService#execute. IssueCollection.new(issues).updatable_by_user(user).each do |issue| - Issues::CloseService.new(project, author). - close_issue(issue, commit: commit) + Issues::CloseService.new(project, author) + .close_issue(issue, commit: commit) end end def update_issue_metrics(commit, author) mentioned_issues = commit.all_references(author).issues - Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil). - update_all(first_mentioned_in_commit_at: commit.committed_date) + return if mentioned_issues.empty? + + Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil) + .update_all(first_mentioned_in_commit_at: commit.committed_date) end def build_commit(project, hash) @@ -71,4 +75,16 @@ class ProcessCommitWorker Commit.from_hash(hash, project) end + + private + + # Avoid reprocessing commits that already exist in the upstream + # when project is forked. This will also prevent duplicated system notes. + def commit_exists_in_upstream?(project, commit_hash) + return false unless project.forked? + + upstream_project = project.forked_from_project + commit_id = commit_hash.with_indifferent_access[:id] + upstream_project.commit(commit_id).present? + end end diff --git a/app/workers/project_cache_worker.rb b/app/workers/project_cache_worker.rb index 8ff9d07860f..505ff9e086e 100644 --- a/app/workers/project_cache_worker.rb +++ b/app/workers/project_cache_worker.rb @@ -32,8 +32,8 @@ class ProjectCacheWorker private def try_obtain_lease_for(project_id, section) - Gitlab::ExclusiveLease. - new("project_cache_worker:#{project_id}:#{section}", timeout: LEASE_TIMEOUT). - try_obtain + Gitlab::ExclusiveLease + .new("project_cache_worker:#{project_id}:#{section}", timeout: LEASE_TIMEOUT) + .try_obtain end end diff --git a/app/workers/project_service_worker.rb b/app/workers/project_service_worker.rb index fdfdeab7b41..4883d848c53 100644 --- a/app/workers/project_service_worker.rb +++ b/app/workers/project_service_worker.rb @@ -2,6 +2,8 @@ class ProjectServiceWorker include Sidekiq::Worker include DedicatedSidekiqQueue + sidekiq_options dead: false + def perform(hook_id, data) data = data.with_indifferent_access Service.find(hook_id).execute(data) diff --git a/app/workers/propagate_service_template_worker.rb b/app/workers/propagate_service_template_worker.rb new file mode 100644 index 00000000000..6b607451c7a --- /dev/null +++ b/app/workers/propagate_service_template_worker.rb @@ -0,0 +1,21 @@ +# Worker for updating any project specific caches. +class PropagateServiceTemplateWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + LEASE_TIMEOUT = 4.hours.to_i + + def perform(template_id) + return unless try_obtain_lease_for(template_id) + + Projects::PropagateServiceTemplate.propagate(Service.find_by(id: template_id)) + end + + private + + def try_obtain_lease_for(template_id) + Gitlab::ExclusiveLease + .new("propagate_service_template_worker:#{template_id}", timeout: LEASE_TIMEOUT) + .try_obtain + end +end diff --git a/app/workers/prune_old_events_worker.rb b/app/workers/prune_old_events_worker.rb index 392abb9c21b..2b43bb19ad1 100644 --- a/app/workers/prune_old_events_worker.rb +++ b/app/workers/prune_old_events_worker.rb @@ -10,9 +10,9 @@ class PruneOldEventsWorker '(id IN (SELECT id FROM (?) ids_to_remove))', Event.unscoped.where( 'created_at < ?', - (12.months + 1.day).ago). - select(:id). - limit(10_000)). - delete_all + (12.months + 1.day).ago) + .select(:id) + .limit(10_000)) + .delete_all end end diff --git a/app/workers/remove_old_web_hook_logs_worker.rb b/app/workers/remove_old_web_hook_logs_worker.rb new file mode 100644 index 00000000000..555e1bb8691 --- /dev/null +++ b/app/workers/remove_old_web_hook_logs_worker.rb @@ -0,0 +1,10 @@ +class RemoveOldWebHookLogsWorker + include Sidekiq::Worker + include CronjobQueue + + WEB_HOOK_LOG_LIFETIME = 2.days + + def perform + WebHookLog.destroy_all(['created_at < ?', Time.now - WEB_HOOK_LOG_LIFETIME]) + end +end diff --git a/app/workers/repository_check/batch_worker.rb b/app/workers/repository_check/batch_worker.rb index c3e7491ec4e..b94d83bd709 100644 --- a/app/workers/repository_check/batch_worker.rb +++ b/app/workers/repository_check/batch_worker.rb @@ -32,10 +32,10 @@ module RepositoryCheck # has to sit and wait for this query to finish. def project_ids limit = 10_000 - never_checked_projects = Project.where('last_repository_check_at IS NULL AND created_at < ?', 24.hours.ago). - limit(limit).pluck(:id) - old_check_projects = Project.where('last_repository_check_at < ?', 1.month.ago). - reorder('last_repository_check_at ASC').limit(limit).pluck(:id) + never_checked_projects = Project.where('last_repository_check_at IS NULL AND created_at < ?', 24.hours.ago) + .limit(limit).pluck(:id) + old_check_projects = Project.where('last_repository_check_at < ?', 1.month.ago) + .reorder('last_repository_check_at ASC').limit(limit).pluck(:id) never_checked_projects + old_check_projects end diff --git a/app/workers/repository_check/clear_worker.rb b/app/workers/repository_check/clear_worker.rb index 1f1b38540ee..85bc9103538 100644 --- a/app/workers/repository_check/clear_worker.rb +++ b/app/workers/repository_check/clear_worker.rb @@ -8,7 +8,7 @@ module RepositoryCheck Project.select(:id).find_in_batches(batch_size: 100) do |batch| Project.where(id: batch.map(&:id)).update_all( last_repository_check_failed: nil, - last_repository_check_at: nil, + last_repository_check_at: nil ) end end diff --git a/app/workers/repository_check/single_repository_worker.rb b/app/workers/repository_check/single_repository_worker.rb index 3d8bfc6fc6c..164586cf0b7 100644 --- a/app/workers/repository_check/single_repository_worker.rb +++ b/app/workers/repository_check/single_repository_worker.rb @@ -7,7 +7,7 @@ module RepositoryCheck project = Project.find(project_id) project.update_columns( last_repository_check_failed: !check(project), - last_repository_check_at: Time.now, + last_repository_check_at: Time.now ) end diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb index efc99ec962a..a338523dc6b 100644 --- a/app/workers/repository_fork_worker.rb +++ b/app/workers/repository_fork_worker.rb @@ -1,4 +1,6 @@ class RepositoryForkWorker + ForkError = Class.new(StandardError) + include Sidekiq::Worker include Gitlab::ShellAdapter include DedicatedSidekiqQueue @@ -8,29 +10,31 @@ class RepositoryForkWorker source_path: source_path, target_path: target_path) - project = Project.find_by_id(project_id) - - unless project.present? - logger.error("Project #{project_id} no longer exists!") - return - end + project = Project.find(project_id) + project.import_start result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_path, project.repository_storage_path, target_path) - unless result - logger.error("Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}") - project.mark_import_as_failed('The project could not be forked.') - return - end + raise ForkError, "Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}" unless result project.repository.after_import - - unless project.valid_repo? - logger.error("Project #{project_id} had an invalid repository after fork") - project.mark_import_as_failed('The forked repository is invalid.') - return - end + raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo? project.import_finish + rescue ForkError => ex + fail_fork(project, ex.message) + raise + rescue => ex + return unless project + + fail_fork(project, ex.message) + raise ForkError, "#{ex.class} #{ex.message}" + end + + private + + def fail_fork(project, message) + Rails.logger.error(message) + project.mark_import_as_failed(message) end end diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb index c8a77e21c12..625476b7e01 100644 --- a/app/workers/repository_import_worker.rb +++ b/app/workers/repository_import_worker.rb @@ -1,28 +1,43 @@ class RepositoryImportWorker + ImportError = Class.new(StandardError) + include Sidekiq::Worker - include Gitlab::ShellAdapter include DedicatedSidekiqQueue + sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION + attr_accessor :project, :current_user def perform(project_id) @project = Project.find(project_id) @current_user = @project.creator + project.import_start + Gitlab::Metrics.add_event(:import_repository, import_url: @project.import_url, path: @project.path_with_namespace) - project.update_column(:import_error, nil) + project.update_columns(import_jid: self.jid, import_error: nil) result = Projects::ImportService.new(project, current_user).execute - - if result[:status] == :error - project.mark_import_as_failed(result[:message]) - return - end + raise ImportError, result[:message] if result[:status] == :error project.repository.after_import project.import_finish + rescue ImportError => ex + fail_import(project, ex.message) + raise + rescue => ex + return unless project + + fail_import(project, ex.message) + raise ImportError, "#{ex.class} #{ex.message}" + end + + private + + def fail_import(project, message) + project.mark_import_as_failed(message) end end diff --git a/app/workers/schedule_update_user_activity_worker.rb b/app/workers/schedule_update_user_activity_worker.rb new file mode 100644 index 00000000000..6c2c3e437f3 --- /dev/null +++ b/app/workers/schedule_update_user_activity_worker.rb @@ -0,0 +1,10 @@ +class ScheduleUpdateUserActivityWorker + include Sidekiq::Worker + include CronjobQueue + + def perform(batch_size = 500) + Gitlab::UserActivities.new.each_slice(batch_size) do |batch| + UpdateUserActivityWorker.perform_async(Hash[batch]) + end + end +end diff --git a/app/workers/stuck_ci_jobs_worker.rb b/app/workers/stuck_ci_jobs_worker.rb index ae8c980c9e4..8b0cfcc8af8 100644 --- a/app/workers/stuck_ci_jobs_worker.rb +++ b/app/workers/stuck_ci_jobs_worker.rb @@ -45,7 +45,7 @@ class StuckCiJobsWorker def search(status, timeout) builds = Ci::Build.where(status: status).where('ci_builds.updated_at < ?', timeout.ago) - builds.joins(:project).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build| + builds.joins(:project).merge(Project.without_deleted).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build| yield(build) end end diff --git a/app/workers/stuck_import_jobs_worker.rb b/app/workers/stuck_import_jobs_worker.rb new file mode 100644 index 00000000000..bfc5e667bb6 --- /dev/null +++ b/app/workers/stuck_import_jobs_worker.rb @@ -0,0 +1,37 @@ +class StuckImportJobsWorker + include Sidekiq::Worker + include CronjobQueue + + IMPORT_EXPIRATION = 15.hours.to_i + + def perform + stuck_projects.find_in_batches(batch_size: 500) do |group| + jids = group.map(&:import_jid) + + # Find the jobs that aren't currently running or that exceeded the threshold. + completed_jids = Gitlab::SidekiqStatus.completed_jids(jids) + + if completed_jids.any? + completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id) + + fail_batch!(completed_jids, completed_ids) + end + end + end + + private + + def stuck_projects + Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil) + end + + def fail_batch!(completed_jids, completed_ids) + Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message) + + Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}") + end + + def error_message + "Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds" + end +end diff --git a/app/workers/system_hook_worker.rb b/app/workers/system_hook_worker.rb deleted file mode 100644 index baf2f12eeac..00000000000 --- a/app/workers/system_hook_worker.rb +++ /dev/null @@ -1,8 +0,0 @@ -class SystemHookWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue - - def perform(hook_id, data, hook_name) - SystemHook.find(hook_id).execute(data, hook_name) - end -end diff --git a/app/workers/update_user_activity_worker.rb b/app/workers/update_user_activity_worker.rb new file mode 100644 index 00000000000..31bbdb69edb --- /dev/null +++ b/app/workers/update_user_activity_worker.rb @@ -0,0 +1,26 @@ +class UpdateUserActivityWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + def perform(pairs) + pairs = cast_data(pairs) + ids = pairs.keys + conditions = 'WHEN id = ? THEN ? ' * ids.length + + User.where(id: ids) + .update_all([ + "last_activity_on = CASE #{conditions} ELSE last_activity_on END", + *pairs.to_a.flatten + ]) + + Gitlab::UserActivities.new.delete(*ids) + end + + private + + def cast_data(pairs) + pairs.each_with_object({}) do |(key, value), new_pairs| + new_pairs[key.to_i] = Time.at(value.to_i).to_s(:db) + end + end +end diff --git a/app/workers/project_web_hook_worker.rb b/app/workers/web_hook_worker.rb index d973e662ff2..713c0228040 100644 --- a/app/workers/project_web_hook_worker.rb +++ b/app/workers/web_hook_worker.rb @@ -1,11 +1,13 @@ -class ProjectWebHookWorker +class WebHookWorker include Sidekiq::Worker include DedicatedSidekiqQueue - sidekiq_options retry: 4 + sidekiq_options retry: 4, dead: false def perform(hook_id, data, hook_name) + hook = WebHook.find(hook_id) data = data.with_indifferent_access - WebHook.find(hook_id).execute(data, hook_name) + + WebHookService.new(hook, data, hook_name).execute end end |