diff options
Diffstat (limited to 'app/workers')
39 files changed, 511 insertions, 177 deletions
diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb new file mode 100644 index 00000000000..45ce49bb5c0 --- /dev/null +++ b/app/workers/background_migration_worker.rb @@ -0,0 +1,39 @@ +class BackgroundMigrationWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + # Enqueues a number of jobs in bulk. + # + # The `jobs` argument should be an Array of Arrays, each sub-array must be in + # the form: + # + # [migration-class, [arg1, arg2, ...]] + def self.perform_bulk(jobs) + Sidekiq::Client.push_bulk('class' => self, + 'queue' => sidekiq_options['queue'], + 'args' => jobs) + end + + # Schedules multiple jobs in bulk, with a delay. + # + def self.perform_bulk_in(delay, jobs) + now = Time.now.to_i + schedule = now + delay.to_i + + if schedule <= now + raise ArgumentError, 'The schedule time must be in the future!' + end + + Sidekiq::Client.push_bulk('class' => self, + 'queue' => sidekiq_options['queue'], + 'args' => jobs, + 'at' => schedule) + end + + # Performs the background migration. + # + # See Gitlab::BackgroundMigration.perform for more information. + def perform(class_name, arguments = []) + Gitlab::BackgroundMigration.perform(class_name, arguments) + end +end diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb index e17add7421f..bf009dfab0f 100644 --- a/app/workers/build_success_worker.rb +++ b/app/workers/build_success_worker.rb @@ -11,15 +11,6 @@ class BuildSuccessWorker private def create_deployment(build) - service = CreateDeploymentService.new( - build.project, build.user, - environment: build.environment, - sha: build.sha, - ref: build.ref, - tag: build.tag, - options: build.options.to_h[:environment], - variables: build.variables) - - service.execute(build) + CreateDeploymentService.new(build).execute end end diff --git a/app/workers/concerns/new_issuable.rb b/app/workers/concerns/new_issuable.rb new file mode 100644 index 00000000000..eb0d6c9c36c --- /dev/null +++ b/app/workers/concerns/new_issuable.rb @@ -0,0 +1,26 @@ +module NewIssuable + attr_reader :issuable, :user + + def objects_found?(issuable_id, user_id) + set_user(user_id) + set_issuable(issuable_id) + + user && issuable + end + + def set_user(user_id) + @user = User.find_by(id: user_id) + + log_error(User, user_id) unless @user + end + + def set_issuable(issuable_id) + @issuable = issuable_class.find_by(id: issuable_id) + + log_error(issuable_class, issuable_id) unless @issuable + end + + def log_error(record_class, record_id) + Rails.logger.error("#{self.class}: couldn't find #{record_class} with ID=#{record_id}, skipping job") + end +end diff --git a/app/workers/create_gpg_signature_worker.rb b/app/workers/create_gpg_signature_worker.rb new file mode 100644 index 00000000000..f34dff2d656 --- /dev/null +++ b/app/workers/create_gpg_signature_worker.rb @@ -0,0 +1,12 @@ +class CreateGpgSignatureWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + def perform(commit_sha, project_id) + project = Project.find_by(id: project_id) + return unless project + + # This calculates and caches the signature in the database + Gitlab::Gpg::Commit.new(project, commit_sha).signature + end +end diff --git a/app/workers/email_receiver_worker.rb b/app/workers/email_receiver_worker.rb index d3f7e479a8d..1afa24c8e2a 100644 --- a/app/workers/email_receiver_worker.rb +++ b/app/workers/email_receiver_worker.rb @@ -31,8 +31,6 @@ class EmailReceiverWorker when Gitlab::Email::EmptyEmailError can_retry = true "It appears that the email is blank. Make sure your reply is at the top of the email, we can't process inline replies." - when Gitlab::Email::AutoGeneratedEmailError - "The email was marked as 'auto generated', which we can't accept. Please create your comment through the web interface." when Gitlab::Email::UserNotFoundError "We couldn't figure out what user corresponds to the email. Please create your comment through the web interface." when Gitlab::Email::UserBlockedError diff --git a/app/workers/expire_build_instance_artifacts_worker.rb b/app/workers/expire_build_instance_artifacts_worker.rb index eb403c134d1..7b59e976492 100644 --- a/app/workers/expire_build_instance_artifacts_worker.rb +++ b/app/workers/expire_build_instance_artifacts_worker.rb @@ -8,7 +8,7 @@ class ExpireBuildInstanceArtifactsWorker .reorder(nil) .find_by(id: build_id) - return unless build.try(:project) + return unless build&.project && !build.project.pending_delete Rails.logger.info "Removing artifacts for build #{build.id}..." build.erase_artifacts! diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb new file mode 100644 index 00000000000..e383202260d --- /dev/null +++ b/app/workers/expire_job_cache_worker.rb @@ -0,0 +1,27 @@ +class ExpireJobCacheWorker + include Sidekiq::Worker + include BuildQueue + + def perform(job_id) + job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id) + return unless job + + pipeline = job.pipeline + project = job.project + + Gitlab::EtagCaching::Store.new.tap do |store| + store.touch(project_pipeline_path(project, pipeline)) + store.touch(project_job_path(project, job)) + end + end + + private + + def project_pipeline_path(project, pipeline) + Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json) + end + + def project_job_path(project, job) + Gitlab::Routing.url_helpers.project_build_path(project, job.id, format: :json) + end +end diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb index 603e2f1aaea..7c02d6cf892 100644 --- a/app/workers/expire_pipeline_cache_worker.rb +++ b/app/workers/expire_pipeline_cache_worker.rb @@ -10,6 +10,7 @@ class ExpirePipelineCacheWorker store = Gitlab::EtagCaching::Store.new store.touch(project_pipelines_path(project)) + store.touch(project_pipeline_path(project, pipeline)) store.touch(commit_pipelines_path(project, pipeline.commit)) if pipeline.commit store.touch(new_merge_request_pipelines_path(project)) each_pipelines_merge_request_path(project, pipeline) do |path| @@ -22,34 +23,24 @@ class ExpirePipelineCacheWorker private def project_pipelines_path(project) - Gitlab::Routing.url_helpers.namespace_project_pipelines_path( - project.namespace, - project, - format: :json) + Gitlab::Routing.url_helpers.project_pipelines_path(project, format: :json) + end + + def project_pipeline_path(project, pipeline) + Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json) end def commit_pipelines_path(project, commit) - Gitlab::Routing.url_helpers.pipelines_namespace_project_commit_path( - project.namespace, - project, - commit.id, - format: :json) + Gitlab::Routing.url_helpers.pipelines_project_commit_path(project, commit.id, format: :json) end def new_merge_request_pipelines_path(project) - Gitlab::Routing.url_helpers.new_namespace_project_merge_request_path( - project.namespace, - project, - format: :json) + Gitlab::Routing.url_helpers.project_new_merge_request_path(project, format: :json) end def each_pipelines_merge_request_path(project, pipeline) pipeline.all_merge_requests.each do |merge_request| - path = Gitlab::Routing.url_helpers.pipelines_namespace_project_merge_request_path( - project.namespace, - project, - merge_request, - format: :json) + path = Gitlab::Routing.url_helpers.pipelines_project_merge_request_path(project, merge_request, format: :json) yield(path) end diff --git a/app/workers/git_garbage_collect_worker.rb b/app/workers/git_garbage_collect_worker.rb index d369b639ae9..c95497dfaba 100644 --- a/app/workers/git_garbage_collect_worker.rb +++ b/app/workers/git_garbage_collect_worker.rb @@ -5,6 +5,12 @@ class GitGarbageCollectWorker sidekiq_options retry: false + GITALY_MIGRATED_TASKS = { + gc: :garbage_collect, + full_repack: :repack_full, + incremental_repack: :repack_incremental + }.freeze + def perform(project_id, task = :gc, lease_key = nil, lease_uuid = nil) project = Project.find(project_id) task = task.to_sym @@ -15,8 +21,14 @@ class GitGarbageCollectWorker Gitlab::GitLogger.info(description) - output, status = Gitlab::Popen.popen(cmd, repo_path) - Gitlab::GitLogger.error("#{description} failed:\n#{output}") unless status.zero? + gitaly_migrate(GITALY_MIGRATED_TASKS[task]) do |is_enabled| + if is_enabled + gitaly_call(task, project.repository.raw_repository) + else + output, status = Gitlab::Popen.popen(cmd, repo_path) + Gitlab::GitLogger.error("#{description} failed:\n#{output}") unless status.zero? + end + end # Refresh the branch cache in case garbage collection caused a ref lookup to fail flush_ref_caches(project) if task == :gc @@ -26,6 +38,19 @@ class GitGarbageCollectWorker private + ## `repository` has to be a Gitlab::Git::Repository + def gitaly_call(task, repository) + client = Gitlab::GitalyClient::RepositoryService.new(repository) + case task + when :gc + client.garbage_collect(bitmaps_enabled?) + when :full_repack + client.repack_full(bitmaps_enabled?) + when :incremental_repack + client.repack_incremental + end + end + def command(task) case task when :gc @@ -55,4 +80,14 @@ class GitGarbageCollectWorker config_value = write_bitmaps ? 'true' : 'false' %W[git -c repack.writeBitmaps=#{config_value}] end + + def gitaly_migrate(method, &block) + Gitlab::GitalyClient.migrate(method, &block) + rescue GRPC::NotFound => e + Gitlab::GitLogger.error("#{method} failed:\nRepository not found") + raise Gitlab::Git::Repository::NoRepository.new(e) + rescue GRPC::BadStatus => e + Gitlab::GitLogger.error("#{method} failed:\n#{e}") + raise Gitlab::Git::CommandError.new(e) + end end diff --git a/app/workers/gitlab_shell_worker.rb b/app/workers/gitlab_shell_worker.rb index 964287a1793..0ec871e00e1 100644 --- a/app/workers/gitlab_shell_worker.rb +++ b/app/workers/gitlab_shell_worker.rb @@ -4,6 +4,6 @@ class GitlabShellWorker include DedicatedSidekiqQueue def perform(action, *arg) - gitlab_shell.send(action, *arg) + gitlab_shell.__send__(action, *arg) # rubocop:disable GitlabSecurity/PublicSend end end diff --git a/app/workers/gitlab_usage_ping_worker.rb b/app/workers/gitlab_usage_ping_worker.rb index 2f02235b0ac..0a55aab63fd 100644 --- a/app/workers/gitlab_usage_ping_worker.rb +++ b/app/workers/gitlab_usage_ping_worker.rb @@ -3,29 +3,17 @@ class GitlabUsagePingWorker include Sidekiq::Worker include CronjobQueue - include HTTParty def perform - return unless current_application_settings.usage_ping_enabled - # Multiple Sidekiq workers could run this. We should only do this at most once a day. return unless try_obtain_lease - begin - HTTParty.post(url, - body: Gitlab::UsageData.to_json(force_refresh: true), - headers: { 'Content-type' => 'application/json' } - ) - rescue HTTParty::Error => e - Rails.logger.info "Unable to contact GitLab, Inc.: #{e}" - end + SubmitUsagePingService.new.execute end + private + def try_obtain_lease Gitlab::ExclusiveLease.new('gitlab_usage_ping_worker:ping', timeout: LEASE_TIMEOUT).try_obtain end - - def url - 'https://version.gitlab.com/usage_data' - end end diff --git a/app/workers/invalid_gpg_signature_update_worker.rb b/app/workers/invalid_gpg_signature_update_worker.rb new file mode 100644 index 00000000000..db6b1ea8e8d --- /dev/null +++ b/app/workers/invalid_gpg_signature_update_worker.rb @@ -0,0 +1,12 @@ +class InvalidGpgSignatureUpdateWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + def perform(gpg_key_id) + gpg_key = GpgKey.find_by(id: gpg_key_id) + + return unless gpg_key + + Gitlab::Gpg::InvalidGpgSignatureUpdater.new(gpg_key).run + end +end diff --git a/app/workers/irker_worker.rb b/app/workers/irker_worker.rb index c9658b3fe17..3dd14466994 100644 --- a/app/workers/irker_worker.rb +++ b/app/workers/irker_worker.rb @@ -66,7 +66,7 @@ class IrkerWorker end def send_new_branch(project, repo_name, committer, branch) - repo_path = project.path_with_namespace + repo_path = project.full_path newbranch = "#{Gitlab.config.gitlab.url}/#{repo_path}/branches" newbranch = "\x0302\x1f#{newbranch}\x0f" if @colors @@ -109,7 +109,7 @@ class IrkerWorker end def send_commits_count(data, project, repo, committer, branch) - url = compare_url data, project.path_with_namespace + url = compare_url data, project.full_path commits = colorize_commits data['total_commits_count'] new_commits = 'new commit' @@ -142,10 +142,10 @@ class IrkerWorker end def files_count(commit) - diffs = commit.raw_diffs(deltas_only: true) + diff_size = commit.raw_deltas.size - files = "#{diffs.real_size} file" - files += 's' if diffs.size > 1 + files = "#{diff_size} file" + files += 's' if diff_size > 1 files end diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb index 79efca4f2f9..c3b58df92c1 100644 --- a/app/workers/merge_worker.rb +++ b/app/workers/merge_worker.rb @@ -7,7 +7,9 @@ class MergeWorker current_user = User.find(current_user_id) merge_request = MergeRequest.find(merge_request_id) - MergeRequests::MergeService.new(merge_request.target_project, current_user, params). - execute(merge_request) + merge_request.update_column(:merge_jid, jid) + + MergeRequests::MergeService.new(merge_request.target_project, current_user, params) + .execute(merge_request) end end diff --git a/app/workers/namespaceless_project_destroy_worker.rb b/app/workers/namespaceless_project_destroy_worker.rb new file mode 100644 index 00000000000..a9073742ff7 --- /dev/null +++ b/app/workers/namespaceless_project_destroy_worker.rb @@ -0,0 +1,39 @@ +# Worker to destroy projects that do not have a namespace +# +# It destroys everything it can without having the info about the namespace it +# used to belong to. Projects in this state should be rare. +# The worker will reject doing anything for projects that *do* have a +# namespace. For those use ProjectDestroyWorker instead. +class NamespacelessProjectDestroyWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + def self.bulk_perform_async(args_list) + Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list) + end + + def perform(project_id) + begin + project = Project.unscoped.find(project_id) + rescue ActiveRecord::RecordNotFound + return + end + return unless project.namespace_id.nil? # Reject doing anything for projects that *do* have a namespace + + project.team.truncate + + unlink_fork(project) if project.forked? + + project.destroy! + end + + private + + def unlink_fork(project) + merge_requests = project.forked_from_project.merge_requests.opened.from_project(project) + + merge_requests.update_all(state: 'closed') + + project.forked_project_link.destroy + end +end diff --git a/app/workers/new_issue_worker.rb b/app/workers/new_issue_worker.rb new file mode 100644 index 00000000000..d9a8e892e90 --- /dev/null +++ b/app/workers/new_issue_worker.rb @@ -0,0 +1,17 @@ +class NewIssueWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + include NewIssuable + + def perform(issue_id, user_id) + return unless objects_found?(issue_id, user_id) + + EventCreateService.new.open_issue(issuable, user) + NotificationService.new.new_issue(issuable, user) + issuable.create_cross_references!(user) + end + + def issuable_class + Issue + end +end diff --git a/app/workers/new_merge_request_worker.rb b/app/workers/new_merge_request_worker.rb new file mode 100644 index 00000000000..1910c490159 --- /dev/null +++ b/app/workers/new_merge_request_worker.rb @@ -0,0 +1,17 @@ +class NewMergeRequestWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + include NewIssuable + + def perform(merge_request_id, user_id) + return unless objects_found?(merge_request_id, user_id) + + EventCreateService.new.open_mr(issuable, user) + NotificationService.new.new_merge_request(issuable, user) + issuable.create_cross_references!(user) + end + + def issuable_class + MergeRequest + end +end diff --git a/app/workers/pages_worker.rb b/app/workers/pages_worker.rb index 4eeb9666bb0..64788da7299 100644 --- a/app/workers/pages_worker.rb +++ b/app/workers/pages_worker.rb @@ -4,7 +4,7 @@ class PagesWorker sidekiq_options queue: :pages, retry: false def perform(action, *arg) - send(action, *arg) + send(action, *arg) # rubocop:disable GitlabSecurity/PublicSend end def deploy(build_id) diff --git a/app/workers/pipeline_schedule_worker.rb b/app/workers/pipeline_schedule_worker.rb new file mode 100644 index 00000000000..d7087f20dfc --- /dev/null +++ b/app/workers/pipeline_schedule_worker.rb @@ -0,0 +1,22 @@ +class PipelineScheduleWorker + include Sidekiq::Worker + include CronjobQueue + + def perform + Ci::PipelineSchedule.active.where("next_run_at < ?", Time.now) + .preload(:owner, :project).find_each do |schedule| + begin + pipeline = Ci::CreatePipelineService.new(schedule.project, + schedule.owner, + ref: schedule.ref) + .execute(:schedule, save_on_errors: false, schedule: schedule) + + schedule.deactivate! unless pipeline.persisted? + rescue => e + Rails.logger.error "#{schedule.id}: Failed to create a scheduled pipeline: #{e.message}" + ensure + schedule.schedule_next_run! + end + end + end +end diff --git a/app/workers/post_receive.rb b/app/workers/post_receive.rb index 015a41b6e82..b8f8d3750d9 100644 --- a/app/workers/post_receive.rb +++ b/app/workers/post_receive.rb @@ -2,34 +2,34 @@ class PostReceive include Sidekiq::Worker include DedicatedSidekiqQueue - def perform(repo_path, identifier, changes) - repo_relative_path = Gitlab::RepoPath.strip_storage_path(repo_path) + def perform(gl_repository, identifier, changes) + project, is_wiki = Gitlab::GlRepository.parse(gl_repository) + + if project.nil? + log("Triggered hook for non-existing project with gl_repository \"#{gl_repository}\"") + return false + end changes = Base64.decode64(changes) unless changes.include?(' ') # Use Sidekiq.logger so arguments can be correlated with execution # time and thread ID's. Sidekiq.logger.info "changes: #{changes.inspect}" if ENV['SIDEKIQ_LOG_ARGUMENTS'] - post_received = Gitlab::GitPostReceive.new(repo_relative_path, identifier, changes) - - if post_received.project.nil? - log("Triggered hook for non-existing project with full path \"#{repo_relative_path}\"") - return false - end + post_received = Gitlab::GitPostReceive.new(project, identifier, changes) - if post_received.wiki? - # Nothing defined here yet. - elsif post_received.regular_project? - process_project_changes(post_received) + if is_wiki + process_wiki_changes(post_received) else - log("Triggered hook for unidentifiable repository type with full path \"#{repo_relative_path}\"") - false + process_project_changes(post_received) end end + private + def process_project_changes(post_received) - post_received.changes.each do |change| - oldrev, newrev, ref = change.strip.split(' ') + changes = [] + refs = Set.new + post_received.changes_refs do |oldrev, newrev, ref| @user ||= post_received.identify(newrev) unless @user @@ -42,10 +42,22 @@ class PostReceive elsif Gitlab::Git.branch_ref?(ref) GitPushService.new(post_received.project, @user, oldrev: oldrev, newrev: newrev, ref: ref).execute end + + changes << Gitlab::DataBuilder::Repository.single_change(oldrev, newrev, ref) + refs << ref end + + after_project_changes_hooks(post_received, @user, refs.to_a, changes) end - private + def after_project_changes_hooks(post_received, user, refs, changes) + hook_data = Gitlab::DataBuilder::Repository.update(post_received.project, user, changes, refs) + SystemHooksService.new.execute_hooks(hook_data, :repository_update_hooks) + end + + def process_wiki_changes(post_received) + # Nothing defined here yet. + end def log(message) Gitlab::GitLogger.error("POST-RECEIVE: #{message}") diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb index 2f7967cf531..c0c03848a40 100644 --- a/app/workers/process_commit_worker.rb +++ b/app/workers/process_commit_worker.rb @@ -17,12 +17,14 @@ class ProcessCommitWorker project = Project.find_by(id: project_id) return unless project + return if commit_exists_in_upstream?(project, commit_hash) user = User.find_by(id: user_id) return unless user commit = build_commit(project, commit_hash) + author = commit.author || user process_commit_message(project, commit, user, author, default) @@ -45,8 +47,8 @@ class ProcessCommitWorker # therefor we use IssueCollection here and skip the authorization check in # Issues::CloseService#execute. IssueCollection.new(issues).updatable_by_user(user).each do |issue| - Issues::CloseService.new(project, author). - close_issue(issue, commit: commit) + Issues::CloseService.new(project, author) + .close_issue(issue, commit: commit) end end @@ -55,8 +57,8 @@ class ProcessCommitWorker return if mentioned_issues.empty? - Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil). - update_all(first_mentioned_in_commit_at: commit.committed_date) + Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil) + .update_all(first_mentioned_in_commit_at: commit.committed_date) end def build_commit(project, hash) @@ -73,4 +75,16 @@ class ProcessCommitWorker Commit.from_hash(hash, project) end + + private + + # Avoid reprocessing commits that already exist in the upstream + # when project is forked. This will also prevent duplicated system notes. + def commit_exists_in_upstream?(project, commit_hash) + return false unless project.forked? + + upstream_project = project.forked_from_project + commit_id = commit_hash.with_indifferent_access[:id] + upstream_project.commit(commit_id).present? + end end diff --git a/app/workers/project_cache_worker.rb b/app/workers/project_cache_worker.rb index 8ff9d07860f..505ff9e086e 100644 --- a/app/workers/project_cache_worker.rb +++ b/app/workers/project_cache_worker.rb @@ -32,8 +32,8 @@ class ProjectCacheWorker private def try_obtain_lease_for(project_id, section) - Gitlab::ExclusiveLease. - new("project_cache_worker:#{project_id}:#{section}", timeout: LEASE_TIMEOUT). - try_obtain + Gitlab::ExclusiveLease + .new("project_cache_worker:#{project_id}:#{section}", timeout: LEASE_TIMEOUT) + .try_obtain end end diff --git a/app/workers/project_destroy_worker.rb b/app/workers/project_destroy_worker.rb index b462327490e..a9188b78460 100644 --- a/app/workers/project_destroy_worker.rb +++ b/app/workers/project_destroy_worker.rb @@ -3,14 +3,11 @@ class ProjectDestroyWorker include DedicatedSidekiqQueue def perform(project_id, user_id, params) - begin - project = Project.unscoped.find(project_id) - rescue ActiveRecord::RecordNotFound - return - end - + project = Project.find(project_id) user = User.find(user_id) ::Projects::DestroyService.new(project, user, params.symbolize_keys).execute + rescue ActiveRecord::RecordNotFound => error + logger.error("Failed to delete project (#{project_id}): #{error.message}") end end diff --git a/app/workers/project_service_worker.rb b/app/workers/project_service_worker.rb index fdfdeab7b41..4883d848c53 100644 --- a/app/workers/project_service_worker.rb +++ b/app/workers/project_service_worker.rb @@ -2,6 +2,8 @@ class ProjectServiceWorker include Sidekiq::Worker include DedicatedSidekiqQueue + sidekiq_options dead: false + def perform(hook_id, data) data = data.with_indifferent_access Service.find(hook_id).execute(data) diff --git a/app/workers/propagate_service_template_worker.rb b/app/workers/propagate_service_template_worker.rb new file mode 100644 index 00000000000..6b607451c7a --- /dev/null +++ b/app/workers/propagate_service_template_worker.rb @@ -0,0 +1,21 @@ +# Worker for updating any project specific caches. +class PropagateServiceTemplateWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + LEASE_TIMEOUT = 4.hours.to_i + + def perform(template_id) + return unless try_obtain_lease_for(template_id) + + Projects::PropagateServiceTemplate.propagate(Service.find_by(id: template_id)) + end + + private + + def try_obtain_lease_for(template_id) + Gitlab::ExclusiveLease + .new("propagate_service_template_worker:#{template_id}", timeout: LEASE_TIMEOUT) + .try_obtain + end +end diff --git a/app/workers/prune_old_events_worker.rb b/app/workers/prune_old_events_worker.rb index 392abb9c21b..2b43bb19ad1 100644 --- a/app/workers/prune_old_events_worker.rb +++ b/app/workers/prune_old_events_worker.rb @@ -10,9 +10,9 @@ class PruneOldEventsWorker '(id IN (SELECT id FROM (?) ids_to_remove))', Event.unscoped.where( 'created_at < ?', - (12.months + 1.day).ago). - select(:id). - limit(10_000)). - delete_all + (12.months + 1.day).ago) + .select(:id) + .limit(10_000)) + .delete_all end end diff --git a/app/workers/remove_old_web_hook_logs_worker.rb b/app/workers/remove_old_web_hook_logs_worker.rb new file mode 100644 index 00000000000..555e1bb8691 --- /dev/null +++ b/app/workers/remove_old_web_hook_logs_worker.rb @@ -0,0 +1,10 @@ +class RemoveOldWebHookLogsWorker + include Sidekiq::Worker + include CronjobQueue + + WEB_HOOK_LOG_LIFETIME = 2.days + + def perform + WebHookLog.destroy_all(['created_at < ?', Time.now - WEB_HOOK_LOG_LIFETIME]) + end +end diff --git a/app/workers/repository_check/batch_worker.rb b/app/workers/repository_check/batch_worker.rb index c3e7491ec4e..b94d83bd709 100644 --- a/app/workers/repository_check/batch_worker.rb +++ b/app/workers/repository_check/batch_worker.rb @@ -32,10 +32,10 @@ module RepositoryCheck # has to sit and wait for this query to finish. def project_ids limit = 10_000 - never_checked_projects = Project.where('last_repository_check_at IS NULL AND created_at < ?', 24.hours.ago). - limit(limit).pluck(:id) - old_check_projects = Project.where('last_repository_check_at < ?', 1.month.ago). - reorder('last_repository_check_at ASC').limit(limit).pluck(:id) + never_checked_projects = Project.where('last_repository_check_at IS NULL AND created_at < ?', 24.hours.ago) + .limit(limit).pluck(:id) + old_check_projects = Project.where('last_repository_check_at < ?', 1.month.ago) + .reorder('last_repository_check_at ASC').limit(limit).pluck(:id) never_checked_projects + old_check_projects end diff --git a/app/workers/repository_check/clear_worker.rb b/app/workers/repository_check/clear_worker.rb index 1f1b38540ee..85bc9103538 100644 --- a/app/workers/repository_check/clear_worker.rb +++ b/app/workers/repository_check/clear_worker.rb @@ -8,7 +8,7 @@ module RepositoryCheck Project.select(:id).find_in_batches(batch_size: 100) do |batch| Project.where(id: batch.map(&:id)).update_all( last_repository_check_failed: nil, - last_repository_check_at: nil, + last_repository_check_at: nil ) end end diff --git a/app/workers/repository_check/single_repository_worker.rb b/app/workers/repository_check/single_repository_worker.rb index 3d8bfc6fc6c..164586cf0b7 100644 --- a/app/workers/repository_check/single_repository_worker.rb +++ b/app/workers/repository_check/single_repository_worker.rb @@ -7,7 +7,7 @@ module RepositoryCheck project = Project.find(project_id) project.update_columns( last_repository_check_failed: !check(project), - last_repository_check_at: Time.now, + last_repository_check_at: Time.now ) end diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb index efc99ec962a..cde5b45ad41 100644 --- a/app/workers/repository_fork_worker.rb +++ b/app/workers/repository_fork_worker.rb @@ -1,36 +1,50 @@ class RepositoryForkWorker + ForkError = Class.new(StandardError) + include Sidekiq::Worker include Gitlab::ShellAdapter include DedicatedSidekiqQueue + sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION + def perform(project_id, forked_from_repository_storage_path, source_path, target_path) + project = Project.find(project_id) + + return unless start_fork(project) + Gitlab::Metrics.add_event(:fork_repository, source_path: source_path, target_path: target_path) - project = Project.find_by_id(project_id) - - unless project.present? - logger.error("Project #{project_id} no longer exists!") - return - end - result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_path, project.repository_storage_path, target_path) - unless result - logger.error("Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}") - project.mark_import_as_failed('The project could not be forked.') - return - end + raise ForkError, "Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}" unless result project.repository.after_import - - unless project.valid_repo? - logger.error("Project #{project_id} had an invalid repository after fork") - project.mark_import_as_failed('The forked repository is invalid.') - return - end + raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo? project.import_finish + rescue ForkError => ex + fail_fork(project, ex.message) + raise + rescue => ex + return unless project + + fail_fork(project, ex.message) + raise ForkError, "#{ex.class} #{ex.message}" + end + + private + + def start_fork(project) + return true if project.import_start + + Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.") + false + end + + def fail_fork(project, message) + Rails.logger.error(message) + project.mark_import_as_failed(message) end end diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb index b33ba2ed7c1..2c2d1e8b91f 100644 --- a/app/workers/repository_import_worker.rb +++ b/app/workers/repository_import_worker.rb @@ -1,29 +1,45 @@ class RepositoryImportWorker + ImportError = Class.new(StandardError) + include Sidekiq::Worker include DedicatedSidekiqQueue - sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION - - attr_accessor :project, :current_user + sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION def perform(project_id) - @project = Project.find(project_id) - @current_user = @project.creator - - Gitlab::Metrics.add_event(:import_repository, - import_url: @project.import_url, - path: @project.path_with_namespace) + project = Project.find(project_id) - project.update_columns(import_jid: self.jid, import_error: nil) + return unless start_import(project) - result = Projects::ImportService.new(project, current_user).execute + Gitlab::Metrics.add_event(:import_repository, + import_url: project.import_url, + path: project.full_path) - if result[:status] == :error - project.mark_import_as_failed(result[:message]) - return - end + result = Projects::ImportService.new(project, project.creator).execute + raise ImportError, result[:message] if result[:status] == :error project.repository.after_import project.import_finish + rescue ImportError => ex + fail_import(project, ex.message) + raise + rescue => ex + return unless project + + fail_import(project, ex.message) + raise ImportError, "#{ex.class} #{ex.message}" + end + + private + + def start_import(project) + return true if project.import_start + + Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.") + false + end + + def fail_import(project, message) + project.mark_import_as_failed(message) end end diff --git a/app/workers/stuck_ci_jobs_worker.rb b/app/workers/stuck_ci_jobs_worker.rb index ae8c980c9e4..8b0cfcc8af8 100644 --- a/app/workers/stuck_ci_jobs_worker.rb +++ b/app/workers/stuck_ci_jobs_worker.rb @@ -45,7 +45,7 @@ class StuckCiJobsWorker def search(status, timeout) builds = Ci::Build.where(status: status).where('ci_builds.updated_at < ?', timeout.ago) - builds.joins(:project).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build| + builds.joins(:project).merge(Project.without_deleted).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build| yield(build) end end diff --git a/app/workers/stuck_import_jobs_worker.rb b/app/workers/stuck_import_jobs_worker.rb index bfc5e667bb6..f850e459cd9 100644 --- a/app/workers/stuck_import_jobs_worker.rb +++ b/app/workers/stuck_import_jobs_worker.rb @@ -2,36 +2,60 @@ class StuckImportJobsWorker include Sidekiq::Worker include CronjobQueue - IMPORT_EXPIRATION = 15.hours.to_i + IMPORT_JOBS_EXPIRATION = 15.hours.to_i def perform - stuck_projects.find_in_batches(batch_size: 500) do |group| + projects_without_jid_count = mark_projects_without_jid_as_failed! + projects_with_jid_count = mark_projects_with_jid_as_failed! + + Gitlab::Metrics.add_event(:stuck_import_jobs, + projects_without_jid_count: projects_without_jid_count, + projects_with_jid_count: projects_with_jid_count) + end + + private + + def mark_projects_without_jid_as_failed! + started_projects_without_jid.each do |project| + project.mark_import_as_failed(error_message) + end.count + end + + def mark_projects_with_jid_as_failed! + completed_jids_count = 0 + + started_projects_with_jid.find_in_batches(batch_size: 500) do |group| jids = group.map(&:import_jid) # Find the jobs that aren't currently running or that exceeded the threshold. - completed_jids = Gitlab::SidekiqStatus.completed_jids(jids) + completed_jids = Gitlab::SidekiqStatus.completed_jids(jids).to_set if completed_jids.any? - completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id) + completed_jids_count += completed_jids.count + group.each do |project| + project.mark_import_as_failed(error_message) if completed_jids.include?(project.import_jid) + end - fail_batch!(completed_jids, completed_ids) + Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.to_a.join(', ')}") end end - end - private + completed_jids_count + end - def stuck_projects - Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil) + def started_projects + Project.with_import_status(:started) end - def fail_batch!(completed_jids, completed_ids) - Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message) + def started_projects_with_jid + started_projects.where.not(import_jid: nil) + end - Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}") + def started_projects_without_jid + started_projects.where(import_jid: nil) end def error_message - "Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds" + "Import timed out. Import took longer than #{IMPORT_JOBS_EXPIRATION} seconds" end end diff --git a/app/workers/stuck_merge_jobs_worker.rb b/app/workers/stuck_merge_jobs_worker.rb new file mode 100644 index 00000000000..7843179d77c --- /dev/null +++ b/app/workers/stuck_merge_jobs_worker.rb @@ -0,0 +1,34 @@ +class StuckMergeJobsWorker + include Sidekiq::Worker + include CronjobQueue + + def perform + stuck_merge_requests.find_in_batches(batch_size: 100) do |group| + jids = group.map(&:merge_jid) + + # Find the jobs that aren't currently running or that exceeded the threshold. + completed_jids = Gitlab::SidekiqStatus.completed_jids(jids) + + if completed_jids.any? + completed_ids = group.select { |merge_request| completed_jids.include?(merge_request.merge_jid) }.map(&:id) + + apply_current_state!(completed_jids, completed_ids) + end + end + end + + private + + def apply_current_state!(completed_jids, completed_ids) + merge_requests = MergeRequest.where(id: completed_ids) + + merge_requests.where.not(merge_commit_sha: nil).update_all(state: :merged) + merge_requests.where(merge_commit_sha: nil).update_all(state: :opened) + + Rails.logger.info("Updated state of locked merge jobs. JIDs: #{completed_jids.join(', ')}") + end + + def stuck_merge_requests + MergeRequest.select('id, merge_jid').with_state(:locked).where.not(merge_jid: nil).reorder(nil) + end +end diff --git a/app/workers/system_hook_worker.rb b/app/workers/system_hook_worker.rb deleted file mode 100644 index 55d4e7d6dab..00000000000 --- a/app/workers/system_hook_worker.rb +++ /dev/null @@ -1,10 +0,0 @@ -class SystemHookWorker - include Sidekiq::Worker - include DedicatedSidekiqQueue - - sidekiq_options retry: 4 - - def perform(hook_id, data, hook_name) - SystemHook.find(hook_id).execute(data, hook_name) - end -end diff --git a/app/workers/trigger_schedule_worker.rb b/app/workers/trigger_schedule_worker.rb deleted file mode 100644 index 9c1baf7e6c5..00000000000 --- a/app/workers/trigger_schedule_worker.rb +++ /dev/null @@ -1,18 +0,0 @@ -class TriggerScheduleWorker - include Sidekiq::Worker - include CronjobQueue - - def perform - Ci::TriggerSchedule.active.where("next_run_at < ?", Time.now).find_each do |trigger_schedule| - begin - Ci::CreateTriggerRequestService.new.execute(trigger_schedule.project, - trigger_schedule.trigger, - trigger_schedule.ref) - rescue => e - Rails.logger.error "#{trigger_schedule.id}: Failed to trigger_schedule job: #{e.message}" - ensure - trigger_schedule.schedule_next_run! - end - end - end -end diff --git a/app/workers/update_user_activity_worker.rb b/app/workers/update_user_activity_worker.rb index b3c2f13aa33..31bbdb69edb 100644 --- a/app/workers/update_user_activity_worker.rb +++ b/app/workers/update_user_activity_worker.rb @@ -7,8 +7,8 @@ class UpdateUserActivityWorker ids = pairs.keys conditions = 'WHEN id = ? THEN ? ' * ids.length - User.where(id: ids). - update_all([ + User.where(id: ids) + .update_all([ "last_activity_on = CASE #{conditions} ELSE last_activity_on END", *pairs.to_a.flatten ]) diff --git a/app/workers/project_web_hook_worker.rb b/app/workers/web_hook_worker.rb index d973e662ff2..713c0228040 100644 --- a/app/workers/project_web_hook_worker.rb +++ b/app/workers/web_hook_worker.rb @@ -1,11 +1,13 @@ -class ProjectWebHookWorker +class WebHookWorker include Sidekiq::Worker include DedicatedSidekiqQueue - sidekiq_options retry: 4 + sidekiq_options retry: 4, dead: false def perform(hook_id, data, hook_name) + hook = WebHook.find(hook_id) data = data.with_indifferent_access - WebHook.find(hook_id).execute(data, hook_name) + + WebHookService.new(hook, data, hook_name).execute end end |