summaryrefslogtreecommitdiff
path: root/app/workers
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/authorized_projects_worker.rb2
-rw-r--r--app/workers/background_migration_worker.rb39
-rw-r--r--app/workers/build_coverage_worker.rb3
-rw-r--r--app/workers/build_email_worker.rb20
-rw-r--r--app/workers/build_success_worker.rb11
-rw-r--r--app/workers/clear_database_cache_worker.rb24
-rw-r--r--app/workers/expire_build_instance_artifacts_worker.rb2
-rw-r--r--app/workers/expire_job_cache_worker.rb27
-rw-r--r--app/workers/expire_pipeline_cache_worker.rb48
-rw-r--r--app/workers/gitlab_usage_ping_worker.rb19
-rw-r--r--app/workers/irker_worker.rb6
-rw-r--r--app/workers/merge_worker.rb4
-rw-r--r--app/workers/namespaceless_project_destroy_worker.rb43
-rw-r--r--app/workers/pipeline_schedule_worker.rb25
-rw-r--r--app/workers/post_receive.rb48
-rw-r--r--app/workers/process_commit_worker.rb24
-rw-r--r--app/workers/project_cache_worker.rb6
-rw-r--r--app/workers/project_service_worker.rb2
-rw-r--r--app/workers/propagate_service_template_worker.rb21
-rw-r--r--app/workers/prune_old_events_worker.rb8
-rw-r--r--app/workers/remove_old_web_hook_logs_worker.rb10
-rw-r--r--app/workers/repository_check/batch_worker.rb8
-rw-r--r--app/workers/repository_check/clear_worker.rb2
-rw-r--r--app/workers/repository_check/single_repository_worker.rb2
-rw-r--r--app/workers/repository_fork_worker.rb38
-rw-r--r--app/workers/repository_import_worker.rb29
-rw-r--r--app/workers/schedule_update_user_activity_worker.rb10
-rw-r--r--app/workers/stuck_ci_jobs_worker.rb2
-rw-r--r--app/workers/stuck_import_jobs_worker.rb37
-rw-r--r--app/workers/system_hook_worker.rb8
-rw-r--r--app/workers/update_user_activity_worker.rb26
-rw-r--r--app/workers/web_hook_worker.rb (renamed from app/workers/project_web_hook_worker.rb)8
32 files changed, 426 insertions, 136 deletions
diff --git a/app/workers/authorized_projects_worker.rb b/app/workers/authorized_projects_worker.rb
index 0e20df506a3..13207a8bc71 100644
--- a/app/workers/authorized_projects_worker.rb
+++ b/app/workers/authorized_projects_worker.rb
@@ -10,7 +10,7 @@ class AuthorizedProjectsWorker
end
def self.bulk_perform_async(args_list)
- Sidekiq::Client.push_bulk('class' => self, 'args' => args_list)
+ Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
end
def perform(user_id)
diff --git a/app/workers/background_migration_worker.rb b/app/workers/background_migration_worker.rb
new file mode 100644
index 00000000000..45ce49bb5c0
--- /dev/null
+++ b/app/workers/background_migration_worker.rb
@@ -0,0 +1,39 @@
+class BackgroundMigrationWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ # Enqueues a number of jobs in bulk.
+ #
+ # The `jobs` argument should be an Array of Arrays, each sub-array must be in
+ # the form:
+ #
+ # [migration-class, [arg1, arg2, ...]]
+ def self.perform_bulk(jobs)
+ Sidekiq::Client.push_bulk('class' => self,
+ 'queue' => sidekiq_options['queue'],
+ 'args' => jobs)
+ end
+
+ # Schedules multiple jobs in bulk, with a delay.
+ #
+ def self.perform_bulk_in(delay, jobs)
+ now = Time.now.to_i
+ schedule = now + delay.to_i
+
+ if schedule <= now
+ raise ArgumentError, 'The schedule time must be in the future!'
+ end
+
+ Sidekiq::Client.push_bulk('class' => self,
+ 'queue' => sidekiq_options['queue'],
+ 'args' => jobs,
+ 'at' => schedule)
+ end
+
+ # Performs the background migration.
+ #
+ # See Gitlab::BackgroundMigration.perform for more information.
+ def perform(class_name, arguments = [])
+ Gitlab::BackgroundMigration.perform(class_name, arguments)
+ end
+end
diff --git a/app/workers/build_coverage_worker.rb b/app/workers/build_coverage_worker.rb
index def0ab1dde1..f7ae996bb17 100644
--- a/app/workers/build_coverage_worker.rb
+++ b/app/workers/build_coverage_worker.rb
@@ -3,7 +3,6 @@ class BuildCoverageWorker
include BuildQueue
def perform(build_id)
- Ci::Build.find_by(id: build_id)
- .try(:update_coverage)
+ Ci::Build.find_by(id: build_id)&.update_coverage
end
end
diff --git a/app/workers/build_email_worker.rb b/app/workers/build_email_worker.rb
deleted file mode 100644
index 5fdb1f2baa0..00000000000
--- a/app/workers/build_email_worker.rb
+++ /dev/null
@@ -1,20 +0,0 @@
-class BuildEmailWorker
- include Sidekiq::Worker
- include BuildQueue
-
- def perform(build_id, recipients, push_data)
- recipients.each do |recipient|
- begin
- case push_data['build_status']
- when 'success'
- Notify.build_success_email(build_id, recipient).deliver_now
- when 'failed'
- Notify.build_fail_email(build_id, recipient).deliver_now
- end
- # These are input errors and won't be corrected even if Sidekiq retries
- rescue Net::SMTPFatalError, Net::SMTPSyntaxError => e
- logger.info("Failed to send e-mail for project '#{push_data['project_name']}' to #{recipient}: #{e}")
- end
- end
- end
-end
diff --git a/app/workers/build_success_worker.rb b/app/workers/build_success_worker.rb
index e17add7421f..bf009dfab0f 100644
--- a/app/workers/build_success_worker.rb
+++ b/app/workers/build_success_worker.rb
@@ -11,15 +11,6 @@ class BuildSuccessWorker
private
def create_deployment(build)
- service = CreateDeploymentService.new(
- build.project, build.user,
- environment: build.environment,
- sha: build.sha,
- ref: build.ref,
- tag: build.tag,
- options: build.options.to_h[:environment],
- variables: build.variables)
-
- service.execute(build)
+ CreateDeploymentService.new(build).execute
end
end
diff --git a/app/workers/clear_database_cache_worker.rb b/app/workers/clear_database_cache_worker.rb
deleted file mode 100644
index c4cb4733482..00000000000
--- a/app/workers/clear_database_cache_worker.rb
+++ /dev/null
@@ -1,24 +0,0 @@
-# This worker clears all cache fields in the database, working in batches.
-class ClearDatabaseCacheWorker
- include Sidekiq::Worker
- include DedicatedSidekiqQueue
-
- BATCH_SIZE = 1000
-
- def perform
- CacheMarkdownField.caching_classes.each do |kls|
- fields = kls.cached_markdown_fields.html_fields
- clear_cache_fields = fields.each_with_object({}) do |field, memo|
- memo[field] = nil
- end
-
- Rails.logger.debug("Clearing Markdown cache for #{kls}: #{fields.inspect}")
-
- kls.unscoped.in_batches(of: BATCH_SIZE) do |relation|
- relation.update_all(clear_cache_fields)
- end
- end
-
- nil
- end
-end
diff --git a/app/workers/expire_build_instance_artifacts_worker.rb b/app/workers/expire_build_instance_artifacts_worker.rb
index eb403c134d1..7b59e976492 100644
--- a/app/workers/expire_build_instance_artifacts_worker.rb
+++ b/app/workers/expire_build_instance_artifacts_worker.rb
@@ -8,7 +8,7 @@ class ExpireBuildInstanceArtifactsWorker
.reorder(nil)
.find_by(id: build_id)
- return unless build.try(:project)
+ return unless build&.project && !build.project.pending_delete
Rails.logger.info "Removing artifacts for build #{build.id}..."
build.erase_artifacts!
diff --git a/app/workers/expire_job_cache_worker.rb b/app/workers/expire_job_cache_worker.rb
new file mode 100644
index 00000000000..e383202260d
--- /dev/null
+++ b/app/workers/expire_job_cache_worker.rb
@@ -0,0 +1,27 @@
+class ExpireJobCacheWorker
+ include Sidekiq::Worker
+ include BuildQueue
+
+ def perform(job_id)
+ job = CommitStatus.joins(:pipeline, :project).find_by(id: job_id)
+ return unless job
+
+ pipeline = job.pipeline
+ project = job.project
+
+ Gitlab::EtagCaching::Store.new.tap do |store|
+ store.touch(project_pipeline_path(project, pipeline))
+ store.touch(project_job_path(project, job))
+ end
+ end
+
+ private
+
+ def project_pipeline_path(project, pipeline)
+ Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json)
+ end
+
+ def project_job_path(project, job)
+ Gitlab::Routing.url_helpers.project_build_path(project, job.id, format: :json)
+ end
+end
diff --git a/app/workers/expire_pipeline_cache_worker.rb b/app/workers/expire_pipeline_cache_worker.rb
new file mode 100644
index 00000000000..7c02d6cf892
--- /dev/null
+++ b/app/workers/expire_pipeline_cache_worker.rb
@@ -0,0 +1,48 @@
+class ExpirePipelineCacheWorker
+ include Sidekiq::Worker
+ include PipelineQueue
+
+ def perform(pipeline_id)
+ pipeline = Ci::Pipeline.find_by(id: pipeline_id)
+ return unless pipeline
+
+ project = pipeline.project
+ store = Gitlab::EtagCaching::Store.new
+
+ store.touch(project_pipelines_path(project))
+ store.touch(project_pipeline_path(project, pipeline))
+ store.touch(commit_pipelines_path(project, pipeline.commit)) if pipeline.commit
+ store.touch(new_merge_request_pipelines_path(project))
+ each_pipelines_merge_request_path(project, pipeline) do |path|
+ store.touch(path)
+ end
+
+ Gitlab::Cache::Ci::ProjectPipelineStatus.update_for_pipeline(pipeline)
+ end
+
+ private
+
+ def project_pipelines_path(project)
+ Gitlab::Routing.url_helpers.project_pipelines_path(project, format: :json)
+ end
+
+ def project_pipeline_path(project, pipeline)
+ Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json)
+ end
+
+ def commit_pipelines_path(project, commit)
+ Gitlab::Routing.url_helpers.pipelines_project_commit_path(project, commit.id, format: :json)
+ end
+
+ def new_merge_request_pipelines_path(project)
+ Gitlab::Routing.url_helpers.project_new_merge_request_path(project, format: :json)
+ end
+
+ def each_pipelines_merge_request_path(project, pipeline)
+ pipeline.all_merge_requests.each do |merge_request|
+ path = Gitlab::Routing.url_helpers.pipelines_project_merge_request_path(project, merge_request, format: :json)
+
+ yield(path)
+ end
+ end
+end
diff --git a/app/workers/gitlab_usage_ping_worker.rb b/app/workers/gitlab_usage_ping_worker.rb
new file mode 100644
index 00000000000..0a55aab63fd
--- /dev/null
+++ b/app/workers/gitlab_usage_ping_worker.rb
@@ -0,0 +1,19 @@
+class GitlabUsagePingWorker
+ LEASE_TIMEOUT = 86400
+
+ include Sidekiq::Worker
+ include CronjobQueue
+
+ def perform
+ # Multiple Sidekiq workers could run this. We should only do this at most once a day.
+ return unless try_obtain_lease
+
+ SubmitUsagePingService.new.execute
+ end
+
+ private
+
+ def try_obtain_lease
+ Gitlab::ExclusiveLease.new('gitlab_usage_ping_worker:ping', timeout: LEASE_TIMEOUT).try_obtain
+ end
+end
diff --git a/app/workers/irker_worker.rb b/app/workers/irker_worker.rb
index c9658b3fe17..22f67fa9e9f 100644
--- a/app/workers/irker_worker.rb
+++ b/app/workers/irker_worker.rb
@@ -142,10 +142,10 @@ class IrkerWorker
end
def files_count(commit)
- diffs = commit.raw_diffs(deltas_only: true)
+ diff_size = commit.raw_deltas.size
- files = "#{diffs.real_size} file"
- files += 's' if diffs.size > 1
+ files = "#{diff_size} file"
+ files += 's' if diff_size > 1
files
end
diff --git a/app/workers/merge_worker.rb b/app/workers/merge_worker.rb
index 79efca4f2f9..48e2da338f6 100644
--- a/app/workers/merge_worker.rb
+++ b/app/workers/merge_worker.rb
@@ -7,7 +7,7 @@ class MergeWorker
current_user = User.find(current_user_id)
merge_request = MergeRequest.find(merge_request_id)
- MergeRequests::MergeService.new(merge_request.target_project, current_user, params).
- execute(merge_request)
+ MergeRequests::MergeService.new(merge_request.target_project, current_user, params)
+ .execute(merge_request)
end
end
diff --git a/app/workers/namespaceless_project_destroy_worker.rb b/app/workers/namespaceless_project_destroy_worker.rb
new file mode 100644
index 00000000000..bfae0c77700
--- /dev/null
+++ b/app/workers/namespaceless_project_destroy_worker.rb
@@ -0,0 +1,43 @@
+# Worker to destroy projects that do not have a namespace
+#
+# It destroys everything it can without having the info about the namespace it
+# used to belong to. Projects in this state should be rare.
+# The worker will reject doing anything for projects that *do* have a
+# namespace. For those use ProjectDestroyWorker instead.
+class NamespacelessProjectDestroyWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ def self.bulk_perform_async(args_list)
+ Sidekiq::Client.push_bulk('class' => self, 'queue' => sidekiq_options['queue'], 'args' => args_list)
+ end
+
+ def perform(project_id)
+ begin
+ project = Project.unscoped.find(project_id)
+ rescue ActiveRecord::RecordNotFound
+ return
+ end
+ return unless project.namespace_id.nil? # Reject doing anything for projects that *do* have a namespace
+
+ project.team.truncate
+
+ unlink_fork(project) if project.forked?
+
+ # Override Project#remove_pages for this instance so it doesn't do anything
+ def project.remove_pages
+ end
+
+ project.destroy!
+ end
+
+ private
+
+ def unlink_fork(project)
+ merge_requests = project.forked_from_project.merge_requests.opened.from_project(project)
+
+ merge_requests.update_all(state: 'closed')
+
+ project.forked_project_link.destroy
+ end
+end
diff --git a/app/workers/pipeline_schedule_worker.rb b/app/workers/pipeline_schedule_worker.rb
new file mode 100644
index 00000000000..7b485b3363c
--- /dev/null
+++ b/app/workers/pipeline_schedule_worker.rb
@@ -0,0 +1,25 @@
+class PipelineScheduleWorker
+ include Sidekiq::Worker
+ include CronjobQueue
+
+ def perform
+ Ci::PipelineSchedule.active.where("next_run_at < ?", Time.now)
+ .preload(:owner, :project).find_each do |schedule|
+ begin
+ unless schedule.runnable_by_owner?
+ schedule.deactivate!
+ next
+ end
+
+ Ci::CreatePipelineService.new(schedule.project,
+ schedule.owner,
+ ref: schedule.ref)
+ .execute(:schedule, save_on_errors: false, schedule: schedule)
+ rescue => e
+ Rails.logger.error "#{schedule.id}: Failed to create a scheduled pipeline: #{e.message}"
+ ensure
+ schedule.schedule_next_run!
+ end
+ end
+ end
+end
diff --git a/app/workers/post_receive.rb b/app/workers/post_receive.rb
index 2cd87895c55..b8f8d3750d9 100644
--- a/app/workers/post_receive.rb
+++ b/app/workers/post_receive.rb
@@ -2,38 +2,34 @@ class PostReceive
include Sidekiq::Worker
include DedicatedSidekiqQueue
- def perform(repo_path, identifier, changes)
- if repository_storage = Gitlab.config.repositories.storages.find { |p| repo_path.start_with?(p[1]['path'].to_s) }
- repo_path.gsub!(repository_storage[1]['path'].to_s, "")
- else
- log("Check gitlab.yml config for correct repositories.storages values. No repository storage path matches \"#{repo_path}\"")
+ def perform(gl_repository, identifier, changes)
+ project, is_wiki = Gitlab::GlRepository.parse(gl_repository)
+
+ if project.nil?
+ log("Triggered hook for non-existing project with gl_repository \"#{gl_repository}\"")
+ return false
end
changes = Base64.decode64(changes) unless changes.include?(' ')
# Use Sidekiq.logger so arguments can be correlated with execution
# time and thread ID's.
Sidekiq.logger.info "changes: #{changes.inspect}" if ENV['SIDEKIQ_LOG_ARGUMENTS']
- post_received = Gitlab::GitPostReceive.new(repo_path, identifier, changes)
-
- if post_received.project.nil?
- log("Triggered hook for non-existing project with full path \"#{repo_path}\"")
- return false
- end
+ post_received = Gitlab::GitPostReceive.new(project, identifier, changes)
- if post_received.wiki?
- # Nothing defined here yet.
- elsif post_received.regular_project?
- process_project_changes(post_received)
+ if is_wiki
+ process_wiki_changes(post_received)
else
- log("Triggered hook for unidentifiable repository type with full path \"#{repo_path}\"")
- false
+ process_project_changes(post_received)
end
end
+ private
+
def process_project_changes(post_received)
- post_received.changes.each do |change|
- oldrev, newrev, ref = change.strip.split(' ')
+ changes = []
+ refs = Set.new
+ post_received.changes_refs do |oldrev, newrev, ref|
@user ||= post_received.identify(newrev)
unless @user
@@ -46,10 +42,22 @@ class PostReceive
elsif Gitlab::Git.branch_ref?(ref)
GitPushService.new(post_received.project, @user, oldrev: oldrev, newrev: newrev, ref: ref).execute
end
+
+ changes << Gitlab::DataBuilder::Repository.single_change(oldrev, newrev, ref)
+ refs << ref
end
+
+ after_project_changes_hooks(post_received, @user, refs.to_a, changes)
end
- private
+ def after_project_changes_hooks(post_received, user, refs, changes)
+ hook_data = Gitlab::DataBuilder::Repository.update(post_received.project, user, changes, refs)
+ SystemHooksService.new.execute_hooks(hook_data, :repository_update_hooks)
+ end
+
+ def process_wiki_changes(post_received)
+ # Nothing defined here yet.
+ end
def log(message)
Gitlab::GitLogger.error("POST-RECEIVE: #{message}")
diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb
index e9a5bd7f24e..c0c03848a40 100644
--- a/app/workers/process_commit_worker.rb
+++ b/app/workers/process_commit_worker.rb
@@ -17,12 +17,14 @@ class ProcessCommitWorker
project = Project.find_by(id: project_id)
return unless project
+ return if commit_exists_in_upstream?(project, commit_hash)
user = User.find_by(id: user_id)
return unless user
commit = build_commit(project, commit_hash)
+
author = commit.author || user
process_commit_message(project, commit, user, author, default)
@@ -45,16 +47,18 @@ class ProcessCommitWorker
# therefor we use IssueCollection here and skip the authorization check in
# Issues::CloseService#execute.
IssueCollection.new(issues).updatable_by_user(user).each do |issue|
- Issues::CloseService.new(project, author).
- close_issue(issue, commit: commit)
+ Issues::CloseService.new(project, author)
+ .close_issue(issue, commit: commit)
end
end
def update_issue_metrics(commit, author)
mentioned_issues = commit.all_references(author).issues
- Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil).
- update_all(first_mentioned_in_commit_at: commit.committed_date)
+ return if mentioned_issues.empty?
+
+ Issue::Metrics.where(issue_id: mentioned_issues.map(&:id), first_mentioned_in_commit_at: nil)
+ .update_all(first_mentioned_in_commit_at: commit.committed_date)
end
def build_commit(project, hash)
@@ -71,4 +75,16 @@ class ProcessCommitWorker
Commit.from_hash(hash, project)
end
+
+ private
+
+ # Avoid reprocessing commits that already exist in the upstream
+ # when project is forked. This will also prevent duplicated system notes.
+ def commit_exists_in_upstream?(project, commit_hash)
+ return false unless project.forked?
+
+ upstream_project = project.forked_from_project
+ commit_id = commit_hash.with_indifferent_access[:id]
+ upstream_project.commit(commit_id).present?
+ end
end
diff --git a/app/workers/project_cache_worker.rb b/app/workers/project_cache_worker.rb
index 8ff9d07860f..505ff9e086e 100644
--- a/app/workers/project_cache_worker.rb
+++ b/app/workers/project_cache_worker.rb
@@ -32,8 +32,8 @@ class ProjectCacheWorker
private
def try_obtain_lease_for(project_id, section)
- Gitlab::ExclusiveLease.
- new("project_cache_worker:#{project_id}:#{section}", timeout: LEASE_TIMEOUT).
- try_obtain
+ Gitlab::ExclusiveLease
+ .new("project_cache_worker:#{project_id}:#{section}", timeout: LEASE_TIMEOUT)
+ .try_obtain
end
end
diff --git a/app/workers/project_service_worker.rb b/app/workers/project_service_worker.rb
index fdfdeab7b41..4883d848c53 100644
--- a/app/workers/project_service_worker.rb
+++ b/app/workers/project_service_worker.rb
@@ -2,6 +2,8 @@ class ProjectServiceWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
+ sidekiq_options dead: false
+
def perform(hook_id, data)
data = data.with_indifferent_access
Service.find(hook_id).execute(data)
diff --git a/app/workers/propagate_service_template_worker.rb b/app/workers/propagate_service_template_worker.rb
new file mode 100644
index 00000000000..6b607451c7a
--- /dev/null
+++ b/app/workers/propagate_service_template_worker.rb
@@ -0,0 +1,21 @@
+# Worker for updating any project specific caches.
+class PropagateServiceTemplateWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ LEASE_TIMEOUT = 4.hours.to_i
+
+ def perform(template_id)
+ return unless try_obtain_lease_for(template_id)
+
+ Projects::PropagateServiceTemplate.propagate(Service.find_by(id: template_id))
+ end
+
+ private
+
+ def try_obtain_lease_for(template_id)
+ Gitlab::ExclusiveLease
+ .new("propagate_service_template_worker:#{template_id}", timeout: LEASE_TIMEOUT)
+ .try_obtain
+ end
+end
diff --git a/app/workers/prune_old_events_worker.rb b/app/workers/prune_old_events_worker.rb
index 392abb9c21b..2b43bb19ad1 100644
--- a/app/workers/prune_old_events_worker.rb
+++ b/app/workers/prune_old_events_worker.rb
@@ -10,9 +10,9 @@ class PruneOldEventsWorker
'(id IN (SELECT id FROM (?) ids_to_remove))',
Event.unscoped.where(
'created_at < ?',
- (12.months + 1.day).ago).
- select(:id).
- limit(10_000)).
- delete_all
+ (12.months + 1.day).ago)
+ .select(:id)
+ .limit(10_000))
+ .delete_all
end
end
diff --git a/app/workers/remove_old_web_hook_logs_worker.rb b/app/workers/remove_old_web_hook_logs_worker.rb
new file mode 100644
index 00000000000..555e1bb8691
--- /dev/null
+++ b/app/workers/remove_old_web_hook_logs_worker.rb
@@ -0,0 +1,10 @@
+class RemoveOldWebHookLogsWorker
+ include Sidekiq::Worker
+ include CronjobQueue
+
+ WEB_HOOK_LOG_LIFETIME = 2.days
+
+ def perform
+ WebHookLog.destroy_all(['created_at < ?', Time.now - WEB_HOOK_LOG_LIFETIME])
+ end
+end
diff --git a/app/workers/repository_check/batch_worker.rb b/app/workers/repository_check/batch_worker.rb
index c3e7491ec4e..b94d83bd709 100644
--- a/app/workers/repository_check/batch_worker.rb
+++ b/app/workers/repository_check/batch_worker.rb
@@ -32,10 +32,10 @@ module RepositoryCheck
# has to sit and wait for this query to finish.
def project_ids
limit = 10_000
- never_checked_projects = Project.where('last_repository_check_at IS NULL AND created_at < ?', 24.hours.ago).
- limit(limit).pluck(:id)
- old_check_projects = Project.where('last_repository_check_at < ?', 1.month.ago).
- reorder('last_repository_check_at ASC').limit(limit).pluck(:id)
+ never_checked_projects = Project.where('last_repository_check_at IS NULL AND created_at < ?', 24.hours.ago)
+ .limit(limit).pluck(:id)
+ old_check_projects = Project.where('last_repository_check_at < ?', 1.month.ago)
+ .reorder('last_repository_check_at ASC').limit(limit).pluck(:id)
never_checked_projects + old_check_projects
end
diff --git a/app/workers/repository_check/clear_worker.rb b/app/workers/repository_check/clear_worker.rb
index 1f1b38540ee..85bc9103538 100644
--- a/app/workers/repository_check/clear_worker.rb
+++ b/app/workers/repository_check/clear_worker.rb
@@ -8,7 +8,7 @@ module RepositoryCheck
Project.select(:id).find_in_batches(batch_size: 100) do |batch|
Project.where(id: batch.map(&:id)).update_all(
last_repository_check_failed: nil,
- last_repository_check_at: nil,
+ last_repository_check_at: nil
)
end
end
diff --git a/app/workers/repository_check/single_repository_worker.rb b/app/workers/repository_check/single_repository_worker.rb
index 3d8bfc6fc6c..164586cf0b7 100644
--- a/app/workers/repository_check/single_repository_worker.rb
+++ b/app/workers/repository_check/single_repository_worker.rb
@@ -7,7 +7,7 @@ module RepositoryCheck
project = Project.find(project_id)
project.update_columns(
last_repository_check_failed: !check(project),
- last_repository_check_at: Time.now,
+ last_repository_check_at: Time.now
)
end
diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb
index efc99ec962a..a338523dc6b 100644
--- a/app/workers/repository_fork_worker.rb
+++ b/app/workers/repository_fork_worker.rb
@@ -1,4 +1,6 @@
class RepositoryForkWorker
+ ForkError = Class.new(StandardError)
+
include Sidekiq::Worker
include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
@@ -8,29 +10,31 @@ class RepositoryForkWorker
source_path: source_path,
target_path: target_path)
- project = Project.find_by_id(project_id)
-
- unless project.present?
- logger.error("Project #{project_id} no longer exists!")
- return
- end
+ project = Project.find(project_id)
+ project.import_start
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_path,
project.repository_storage_path, target_path)
- unless result
- logger.error("Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}")
- project.mark_import_as_failed('The project could not be forked.')
- return
- end
+ raise ForkError, "Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}" unless result
project.repository.after_import
-
- unless project.valid_repo?
- logger.error("Project #{project_id} had an invalid repository after fork")
- project.mark_import_as_failed('The forked repository is invalid.')
- return
- end
+ raise ForkError, "Project #{project_id} had an invalid repository after fork" unless project.valid_repo?
project.import_finish
+ rescue ForkError => ex
+ fail_fork(project, ex.message)
+ raise
+ rescue => ex
+ return unless project
+
+ fail_fork(project, ex.message)
+ raise ForkError, "#{ex.class} #{ex.message}"
+ end
+
+ private
+
+ def fail_fork(project, message)
+ Rails.logger.error(message)
+ project.mark_import_as_failed(message)
end
end
diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb
index c8a77e21c12..625476b7e01 100644
--- a/app/workers/repository_import_worker.rb
+++ b/app/workers/repository_import_worker.rb
@@ -1,28 +1,43 @@
class RepositoryImportWorker
+ ImportError = Class.new(StandardError)
+
include Sidekiq::Worker
- include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
+ sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION
+
attr_accessor :project, :current_user
def perform(project_id)
@project = Project.find(project_id)
@current_user = @project.creator
+ project.import_start
+
Gitlab::Metrics.add_event(:import_repository,
import_url: @project.import_url,
path: @project.path_with_namespace)
- project.update_column(:import_error, nil)
+ project.update_columns(import_jid: self.jid, import_error: nil)
result = Projects::ImportService.new(project, current_user).execute
-
- if result[:status] == :error
- project.mark_import_as_failed(result[:message])
- return
- end
+ raise ImportError, result[:message] if result[:status] == :error
project.repository.after_import
project.import_finish
+ rescue ImportError => ex
+ fail_import(project, ex.message)
+ raise
+ rescue => ex
+ return unless project
+
+ fail_import(project, ex.message)
+ raise ImportError, "#{ex.class} #{ex.message}"
+ end
+
+ private
+
+ def fail_import(project, message)
+ project.mark_import_as_failed(message)
end
end
diff --git a/app/workers/schedule_update_user_activity_worker.rb b/app/workers/schedule_update_user_activity_worker.rb
new file mode 100644
index 00000000000..6c2c3e437f3
--- /dev/null
+++ b/app/workers/schedule_update_user_activity_worker.rb
@@ -0,0 +1,10 @@
+class ScheduleUpdateUserActivityWorker
+ include Sidekiq::Worker
+ include CronjobQueue
+
+ def perform(batch_size = 500)
+ Gitlab::UserActivities.new.each_slice(batch_size) do |batch|
+ UpdateUserActivityWorker.perform_async(Hash[batch])
+ end
+ end
+end
diff --git a/app/workers/stuck_ci_jobs_worker.rb b/app/workers/stuck_ci_jobs_worker.rb
index ae8c980c9e4..8b0cfcc8af8 100644
--- a/app/workers/stuck_ci_jobs_worker.rb
+++ b/app/workers/stuck_ci_jobs_worker.rb
@@ -45,7 +45,7 @@ class StuckCiJobsWorker
def search(status, timeout)
builds = Ci::Build.where(status: status).where('ci_builds.updated_at < ?', timeout.ago)
- builds.joins(:project).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build|
+ builds.joins(:project).merge(Project.without_deleted).includes(:tags, :runner, project: :namespace).find_each(batch_size: 50).each do |build|
yield(build)
end
end
diff --git a/app/workers/stuck_import_jobs_worker.rb b/app/workers/stuck_import_jobs_worker.rb
new file mode 100644
index 00000000000..bfc5e667bb6
--- /dev/null
+++ b/app/workers/stuck_import_jobs_worker.rb
@@ -0,0 +1,37 @@
+class StuckImportJobsWorker
+ include Sidekiq::Worker
+ include CronjobQueue
+
+ IMPORT_EXPIRATION = 15.hours.to_i
+
+ def perform
+ stuck_projects.find_in_batches(batch_size: 500) do |group|
+ jids = group.map(&:import_jid)
+
+ # Find the jobs that aren't currently running or that exceeded the threshold.
+ completed_jids = Gitlab::SidekiqStatus.completed_jids(jids)
+
+ if completed_jids.any?
+ completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id)
+
+ fail_batch!(completed_jids, completed_ids)
+ end
+ end
+ end
+
+ private
+
+ def stuck_projects
+ Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil)
+ end
+
+ def fail_batch!(completed_jids, completed_ids)
+ Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message)
+
+ Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}")
+ end
+
+ def error_message
+ "Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds"
+ end
+end
diff --git a/app/workers/system_hook_worker.rb b/app/workers/system_hook_worker.rb
deleted file mode 100644
index baf2f12eeac..00000000000
--- a/app/workers/system_hook_worker.rb
+++ /dev/null
@@ -1,8 +0,0 @@
-class SystemHookWorker
- include Sidekiq::Worker
- include DedicatedSidekiqQueue
-
- def perform(hook_id, data, hook_name)
- SystemHook.find(hook_id).execute(data, hook_name)
- end
-end
diff --git a/app/workers/update_user_activity_worker.rb b/app/workers/update_user_activity_worker.rb
new file mode 100644
index 00000000000..31bbdb69edb
--- /dev/null
+++ b/app/workers/update_user_activity_worker.rb
@@ -0,0 +1,26 @@
+class UpdateUserActivityWorker
+ include Sidekiq::Worker
+ include DedicatedSidekiqQueue
+
+ def perform(pairs)
+ pairs = cast_data(pairs)
+ ids = pairs.keys
+ conditions = 'WHEN id = ? THEN ? ' * ids.length
+
+ User.where(id: ids)
+ .update_all([
+ "last_activity_on = CASE #{conditions} ELSE last_activity_on END",
+ *pairs.to_a.flatten
+ ])
+
+ Gitlab::UserActivities.new.delete(*ids)
+ end
+
+ private
+
+ def cast_data(pairs)
+ pairs.each_with_object({}) do |(key, value), new_pairs|
+ new_pairs[key.to_i] = Time.at(value.to_i).to_s(:db)
+ end
+ end
+end
diff --git a/app/workers/project_web_hook_worker.rb b/app/workers/web_hook_worker.rb
index d973e662ff2..713c0228040 100644
--- a/app/workers/project_web_hook_worker.rb
+++ b/app/workers/web_hook_worker.rb
@@ -1,11 +1,13 @@
-class ProjectWebHookWorker
+class WebHookWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
- sidekiq_options retry: 4
+ sidekiq_options retry: 4, dead: false
def perform(hook_id, data, hook_name)
+ hook = WebHook.find(hook_id)
data = data.with_indifferent_access
- WebHook.find(hook_id).execute(data, hook_name)
+
+ WebHookService.new(hook, data, hook_name).execute
end
end