diff options
Diffstat (limited to 'app/workers')
-rw-r--r-- | app/workers/build_finished_worker.rb | 1 | ||||
-rw-r--r-- | app/workers/build_trace_sections_worker.rb | 8 | ||||
-rw-r--r-- | app/workers/cluster_provision_worker.rb | 10 | ||||
-rw-r--r-- | app/workers/concerns/cluster_queue.rb | 10 | ||||
-rw-r--r-- | app/workers/concerns/project_start_import.rb | 9 | ||||
-rw-r--r-- | app/workers/project_migrate_hashed_storage_worker.rb | 11 | ||||
-rw-r--r-- | app/workers/repository_fork_worker.rb | 3 | ||||
-rw-r--r-- | app/workers/repository_import_worker.rb | 3 | ||||
-rw-r--r-- | app/workers/storage_migrator_worker.rb | 30 | ||||
-rw-r--r-- | app/workers/stuck_merge_jobs_worker.rb | 2 | ||||
-rw-r--r-- | app/workers/update_merge_requests_worker.rb | 9 | ||||
-rw-r--r-- | app/workers/wait_for_cluster_creation_worker.rb | 27 |
12 files changed, 120 insertions, 3 deletions
diff --git a/app/workers/build_finished_worker.rb b/app/workers/build_finished_worker.rb index e2a1b3dcc41..52e7d346e74 100644 --- a/app/workers/build_finished_worker.rb +++ b/app/workers/build_finished_worker.rb @@ -6,6 +6,7 @@ class BuildFinishedWorker def perform(build_id) Ci::Build.find_by(id: build_id).try do |build| + BuildTraceSectionsWorker.perform_async(build.id) BuildCoverageWorker.new.perform(build.id) BuildHooksWorker.new.perform(build.id) end diff --git a/app/workers/build_trace_sections_worker.rb b/app/workers/build_trace_sections_worker.rb new file mode 100644 index 00000000000..8c57e8f767b --- /dev/null +++ b/app/workers/build_trace_sections_worker.rb @@ -0,0 +1,8 @@ +class BuildTraceSectionsWorker + include Sidekiq::Worker + include PipelineQueue + + def perform(build_id) + Ci::Build.find_by(id: build_id)&.parse_trace_sections! + end +end diff --git a/app/workers/cluster_provision_worker.rb b/app/workers/cluster_provision_worker.rb new file mode 100644 index 00000000000..63300b58a25 --- /dev/null +++ b/app/workers/cluster_provision_worker.rb @@ -0,0 +1,10 @@ +class ClusterProvisionWorker + include Sidekiq::Worker + include ClusterQueue + + def perform(cluster_id) + Gcp::Cluster.find_by_id(cluster_id).try do |cluster| + Ci::ProvisionClusterService.new.execute(cluster) + end + end +end diff --git a/app/workers/concerns/cluster_queue.rb b/app/workers/concerns/cluster_queue.rb new file mode 100644 index 00000000000..a5074d13220 --- /dev/null +++ b/app/workers/concerns/cluster_queue.rb @@ -0,0 +1,10 @@ +## +# Concern for setting Sidekiq settings for the various Gcp clusters workers. +# +module ClusterQueue + extend ActiveSupport::Concern + + included do + sidekiq_options queue: :gcp_cluster + end +end diff --git a/app/workers/concerns/project_start_import.rb b/app/workers/concerns/project_start_import.rb new file mode 100644 index 00000000000..0704ebbb0fd --- /dev/null +++ b/app/workers/concerns/project_start_import.rb @@ -0,0 +1,9 @@ +module ProjectStartImport + def start(project) + if project.import_started? && project.import_jid == self.jid + return true + end + + project.import_start + end +end diff --git a/app/workers/project_migrate_hashed_storage_worker.rb b/app/workers/project_migrate_hashed_storage_worker.rb new file mode 100644 index 00000000000..ca276d7801c --- /dev/null +++ b/app/workers/project_migrate_hashed_storage_worker.rb @@ -0,0 +1,11 @@ +class ProjectMigrateHashedStorageWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + def perform(project_id) + project = Project.find_by(id: project_id) + return if project.nil? || project.pending_delete? + + ::Projects::HashedStorageMigrationService.new(project, logger).execute + end +end diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb index cde5b45ad41..264706e3e23 100644 --- a/app/workers/repository_fork_worker.rb +++ b/app/workers/repository_fork_worker.rb @@ -4,6 +4,7 @@ class RepositoryForkWorker include Sidekiq::Worker include Gitlab::ShellAdapter include DedicatedSidekiqQueue + include ProjectStartImport sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION @@ -37,7 +38,7 @@ class RepositoryForkWorker private def start_fork(project) - return true if project.import_start + return true if start(project) Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.") false diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb index 00a021abbdc..d7c0043d3b6 100644 --- a/app/workers/repository_import_worker.rb +++ b/app/workers/repository_import_worker.rb @@ -4,6 +4,7 @@ class RepositoryImportWorker include Sidekiq::Worker include DedicatedSidekiqQueue include ExceptionBacktrace + include ProjectStartImport sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION @@ -34,7 +35,7 @@ class RepositoryImportWorker private def start_import(project) - return true if project.import_start + return true if start(project) Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.") false diff --git a/app/workers/storage_migrator_worker.rb b/app/workers/storage_migrator_worker.rb new file mode 100644 index 00000000000..b48ead799b9 --- /dev/null +++ b/app/workers/storage_migrator_worker.rb @@ -0,0 +1,30 @@ +class StorageMigratorWorker + include Sidekiq::Worker + include DedicatedSidekiqQueue + + BATCH_SIZE = 100 + + def perform(start, finish) + projects = build_relation(start, finish) + + projects.with_route.find_each(batch_size: BATCH_SIZE) do |project| + Rails.logger.info "Starting storage migration of #{project.full_path} (ID=#{project.id})..." + + begin + project.migrate_to_hashed_storage! + rescue => err + Rails.logger.error("#{err.message} migrating storage of #{project.full_path} (ID=#{project.id}), trace - #{err.backtrace}") + end + end + end + + def build_relation(start, finish) + relation = Project + table = Project.arel_table + + relation = relation.where(table[:id].gteq(start)) if start + relation = relation.where(table[:id].lteq(finish)) if finish + + relation + end +end diff --git a/app/workers/stuck_merge_jobs_worker.rb b/app/workers/stuck_merge_jobs_worker.rb index 7843179d77c..a396c0f27b2 100644 --- a/app/workers/stuck_merge_jobs_worker.rb +++ b/app/workers/stuck_merge_jobs_worker.rb @@ -23,7 +23,7 @@ class StuckMergeJobsWorker merge_requests = MergeRequest.where(id: completed_ids) merge_requests.where.not(merge_commit_sha: nil).update_all(state: :merged) - merge_requests.where(merge_commit_sha: nil).update_all(state: :opened) + merge_requests.where(merge_commit_sha: nil).update_all(state: :opened, merge_jid: nil) Rails.logger.info("Updated state of locked merge jobs. JIDs: #{completed_jids.join(', ')}") end diff --git a/app/workers/update_merge_requests_worker.rb b/app/workers/update_merge_requests_worker.rb index 89ae17cef37..150788ca611 100644 --- a/app/workers/update_merge_requests_worker.rb +++ b/app/workers/update_merge_requests_worker.rb @@ -2,6 +2,10 @@ class UpdateMergeRequestsWorker include Sidekiq::Worker include DedicatedSidekiqQueue + def metrics_tags + @metrics_tags || {} + end + def perform(project_id, user_id, oldrev, newrev, ref) project = Project.find_by(id: project_id) return unless project @@ -9,6 +13,11 @@ class UpdateMergeRequestsWorker user = User.find_by(id: user_id) return unless user + @metrics_tags = { + project_id: project_id, + user_id: user_id + } + MergeRequests::RefreshService.new(project, user).execute(oldrev, newrev, ref) end end diff --git a/app/workers/wait_for_cluster_creation_worker.rb b/app/workers/wait_for_cluster_creation_worker.rb new file mode 100644 index 00000000000..5aa3bbdaa9d --- /dev/null +++ b/app/workers/wait_for_cluster_creation_worker.rb @@ -0,0 +1,27 @@ +class WaitForClusterCreationWorker + include Sidekiq::Worker + include ClusterQueue + + INITIAL_INTERVAL = 2.minutes + EAGER_INTERVAL = 10.seconds + TIMEOUT = 20.minutes + + def perform(cluster_id) + Gcp::Cluster.find_by_id(cluster_id).try do |cluster| + Ci::FetchGcpOperationService.new.execute(cluster) do |operation| + case operation.status + when 'RUNNING' + if TIMEOUT < Time.now.utc - operation.start_time.to_time.utc + return cluster.make_errored!("Cluster creation time exceeds timeout; #{TIMEOUT}") + end + + WaitForClusterCreationWorker.perform_in(EAGER_INTERVAL, cluster.id) + when 'DONE' + Ci::FinalizeClusterCreationService.new.execute(cluster) + else + return cluster.make_errored!("Unexpected operation status; #{operation.status} #{operation.status_message}") + end + end + end + end +end |