diff options
-rw-r--r-- | Gemfile | 2 | ||||
-rw-r--r-- | Gemfile.lock | 3 | ||||
-rw-r--r-- | app/models/project.rb | 25 | ||||
-rw-r--r-- | app/workers/repository_fork_worker.rb | 16 | ||||
-rw-r--r-- | app/workers/repository_import_worker.rb | 24 | ||||
-rw-r--r-- | app/workers/stuck_import_jobs_worker.rb | 50 | ||||
-rw-r--r-- | lib/after_commit_queue.rb | 30 | ||||
-rw-r--r-- | lib/gitlab/sidekiq_status.rb | 9 | ||||
-rw-r--r-- | spec/lib/after_commit_queue_spec.rb | 15 | ||||
-rw-r--r-- | spec/models/project_spec.rb | 27 | ||||
-rw-r--r-- | spec/workers/repository_import_worker_spec.rb | 2 | ||||
-rw-r--r-- | spec/workers/stuck_import_jobs_worker_spec.rb | 34 |
12 files changed, 170 insertions, 67 deletions
@@ -144,8 +144,6 @@ end # State machine gem 'state_machines-activerecord', '~> 0.4.0' -# Run events after state machine commits -gem 'after_commit_queue', '~> 1.3.0' # Issue tags gem 'acts-as-taggable-on', '~> 4.0' diff --git a/Gemfile.lock b/Gemfile.lock index a93caba2393..e162a191951 100644 --- a/Gemfile.lock +++ b/Gemfile.lock @@ -46,8 +46,6 @@ GEM ice_nine (~> 0.11.0) memoizable (~> 0.4.0) addressable (2.3.8) - after_commit_queue (1.3.0) - activerecord (>= 3.0) akismet (2.0.0) allocations (1.0.5) arel (6.0.4) @@ -963,7 +961,6 @@ DEPENDENCIES activerecord_sane_schema_dumper (= 0.2) acts-as-taggable-on (~> 4.0) addressable (~> 2.3.8) - after_commit_queue (~> 1.3.0) akismet (~> 2.0) allocations (~> 1.0) asana (~> 0.6.0) diff --git a/app/models/project.rb b/app/models/project.rb index 0de7da0ddaa..6955a8a161b 100644 --- a/app/models/project.rb +++ b/app/models/project.rb @@ -369,7 +369,10 @@ class Project < ActiveRecord::Base state :failed after_transition [:none, :finished, :failed] => :scheduled do |project, _| - project.run_after_commit { add_import_job } + project.run_after_commit do + job_id = add_import_job + update(import_jid: job_id) if job_id + end end after_transition started: :finished do |project, _| @@ -524,17 +527,26 @@ class Project < ActiveRecord::Base def add_import_job job_id = if forked? - RepositoryForkWorker.perform_async(id, forked_from_project.repository_storage_path, - forked_from_project.full_path, - self.namespace.full_path) + RepositoryForkWorker.perform_async(id, + forked_from_project.repository_storage_path, + forked_from_project.full_path, + self.namespace.full_path) else RepositoryImportWorker.perform_async(self.id) end + log_import_activity(job_id) + + job_id + end + + def log_import_activity(job_id, type: :import) + job_type = type.to_s.capitalize + if job_id - Rails.logger.info "Import job started for #{full_path} with job ID #{job_id}" + Rails.logger.info("#{job_type} job scheduled for #{full_path} with job ID #{job_id}.") else - Rails.logger.error "Import job failed to start for #{full_path}" + Rails.logger.error("#{job_type} job failed to create for #{full_path}.") end end @@ -543,6 +555,7 @@ class Project < ActiveRecord::Base ProjectCacheWorker.perform_async(self.id) end + update(import_error: nil) remove_import_data end diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb index a338523dc6b..cde5b45ad41 100644 --- a/app/workers/repository_fork_worker.rb +++ b/app/workers/repository_fork_worker.rb @@ -5,14 +5,17 @@ class RepositoryForkWorker include Gitlab::ShellAdapter include DedicatedSidekiqQueue + sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION + def perform(project_id, forked_from_repository_storage_path, source_path, target_path) + project = Project.find(project_id) + + return unless start_fork(project) + Gitlab::Metrics.add_event(:fork_repository, source_path: source_path, target_path: target_path) - project = Project.find(project_id) - project.import_start - result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_path, project.repository_storage_path, target_path) raise ForkError, "Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}" unless result @@ -33,6 +36,13 @@ class RepositoryForkWorker private + def start_fork(project) + return true if project.import_start + + Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.") + false + end + def fail_fork(project, message) Rails.logger.error(message) project.mark_import_as_failed(message) diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb index 6be541abd3e..2c2d1e8b91f 100644 --- a/app/workers/repository_import_worker.rb +++ b/app/workers/repository_import_worker.rb @@ -4,23 +4,18 @@ class RepositoryImportWorker include Sidekiq::Worker include DedicatedSidekiqQueue - sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION - - attr_accessor :project, :current_user + sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION def perform(project_id) - @project = Project.find(project_id) - @current_user = @project.creator + project = Project.find(project_id) - project.import_start + return unless start_import(project) Gitlab::Metrics.add_event(:import_repository, - import_url: @project.import_url, - path: @project.full_path) - - project.update_columns(import_jid: self.jid, import_error: nil) + import_url: project.import_url, + path: project.full_path) - result = Projects::ImportService.new(project, current_user).execute + result = Projects::ImportService.new(project, project.creator).execute raise ImportError, result[:message] if result[:status] == :error project.repository.after_import @@ -37,6 +32,13 @@ class RepositoryImportWorker private + def start_import(project) + return true if project.import_start + + Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.") + false + end + def fail_import(project, message) project.mark_import_as_failed(message) end diff --git a/app/workers/stuck_import_jobs_worker.rb b/app/workers/stuck_import_jobs_worker.rb index bfc5e667bb6..f850e459cd9 100644 --- a/app/workers/stuck_import_jobs_worker.rb +++ b/app/workers/stuck_import_jobs_worker.rb @@ -2,36 +2,60 @@ class StuckImportJobsWorker include Sidekiq::Worker include CronjobQueue - IMPORT_EXPIRATION = 15.hours.to_i + IMPORT_JOBS_EXPIRATION = 15.hours.to_i def perform - stuck_projects.find_in_batches(batch_size: 500) do |group| + projects_without_jid_count = mark_projects_without_jid_as_failed! + projects_with_jid_count = mark_projects_with_jid_as_failed! + + Gitlab::Metrics.add_event(:stuck_import_jobs, + projects_without_jid_count: projects_without_jid_count, + projects_with_jid_count: projects_with_jid_count) + end + + private + + def mark_projects_without_jid_as_failed! + started_projects_without_jid.each do |project| + project.mark_import_as_failed(error_message) + end.count + end + + def mark_projects_with_jid_as_failed! + completed_jids_count = 0 + + started_projects_with_jid.find_in_batches(batch_size: 500) do |group| jids = group.map(&:import_jid) # Find the jobs that aren't currently running or that exceeded the threshold. - completed_jids = Gitlab::SidekiqStatus.completed_jids(jids) + completed_jids = Gitlab::SidekiqStatus.completed_jids(jids).to_set if completed_jids.any? - completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id) + completed_jids_count += completed_jids.count + group.each do |project| + project.mark_import_as_failed(error_message) if completed_jids.include?(project.import_jid) + end - fail_batch!(completed_jids, completed_ids) + Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.to_a.join(', ')}") end end - end - private + completed_jids_count + end - def stuck_projects - Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil) + def started_projects + Project.with_import_status(:started) end - def fail_batch!(completed_jids, completed_ids) - Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message) + def started_projects_with_jid + started_projects.where.not(import_jid: nil) + end - Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}") + def started_projects_without_jid + started_projects.where(import_jid: nil) end def error_message - "Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds" + "Import timed out. Import took longer than #{IMPORT_JOBS_EXPIRATION} seconds" end end diff --git a/lib/after_commit_queue.rb b/lib/after_commit_queue.rb new file mode 100644 index 00000000000..b67575a3ac2 --- /dev/null +++ b/lib/after_commit_queue.rb @@ -0,0 +1,30 @@ +module AfterCommitQueue + extend ActiveSupport::Concern + + included do + after_commit :_run_after_commit_queue + after_rollback :_clear_after_commit_queue + end + + def run_after_commit(method = nil, &block) + _after_commit_queue << proc { self.send(method) } if method + _after_commit_queue << block if block + true + end + + protected + + def _run_after_commit_queue + while action = _after_commit_queue.pop + self.instance_eval(&action) + end + end + + def _after_commit_queue + @after_commit_queue ||= [] + end + + def _clear_after_commit_queue + _after_commit_queue.clear + end +end diff --git a/lib/gitlab/sidekiq_status.rb b/lib/gitlab/sidekiq_status.rb index ca8d3271541..a0a2769cf9e 100644 --- a/lib/gitlab/sidekiq_status.rb +++ b/lib/gitlab/sidekiq_status.rb @@ -90,9 +90,14 @@ module Gitlab # # Returns an array of completed JIDs def self.completed_jids(job_ids) - Sidekiq.redis do |redis| - job_ids.reject { |jid| redis.exists(key_for(jid)) } + statuses = job_status(job_ids) + + completed = [] + job_ids.zip(statuses).each do |job_id, status| + completed << job_id unless status end + + completed end def self.key_for(jid) diff --git a/spec/lib/after_commit_queue_spec.rb b/spec/lib/after_commit_queue_spec.rb new file mode 100644 index 00000000000..6e7c2ec2363 --- /dev/null +++ b/spec/lib/after_commit_queue_spec.rb @@ -0,0 +1,15 @@ +require 'spec_helper' + +describe AfterCommitQueue do + it 'runs after transaction is committed' do + called = false + test_proc = proc { called = true } + + project = build(:project) + project.run_after_commit(&test_proc) + + project.save + + expect(called).to be true + end +end diff --git a/spec/models/project_spec.rb b/spec/models/project_spec.rb index eba71ba2f72..d47277dd675 100644 --- a/spec/models/project_spec.rb +++ b/spec/models/project_spec.rb @@ -1610,8 +1610,7 @@ describe Project do it 'imports a project' do expect_any_instance_of(RepositoryImportWorker).to receive(:perform).and_call_original - project.import_schedule - + expect { project.import_schedule }.to change { project.import_jid } expect(project.reload.import_status).to eq('finished') end end @@ -1624,6 +1623,13 @@ describe Project do allow(Projects::HousekeepingService).to receive(:new) { housekeeping_service } end + it 'resets project import_error' do + error_message = 'Some error' + mirror = create(:project_empty_repo, :import_started, import_error: error_message) + + expect { mirror.import_finish }.to change { mirror.import_error }.from(error_message).to(nil) + end + it 'performs housekeeping when an import of a fresh project is completed' do project = create(:project_empty_repo, :import_started, import_type: :github) @@ -1730,17 +1736,21 @@ describe Project do end describe '#add_import_job' do + let(:import_jid) { '123' } + context 'forked' do let(:forked_project_link) { create(:forked_project_link, :forked_to_empty_project) } let(:forked_from_project) { forked_project_link.forked_from_project } let(:project) { forked_project_link.forked_to_project } it 'schedules a RepositoryForkWorker job' do - expect(RepositoryForkWorker).to receive(:perform_async) - .with(project.id, forked_from_project.repository_storage_path, - forked_from_project.disk_path, project.namespace.full_path) + expect(RepositoryForkWorker).to receive(:perform_async).with( + project.id, + forked_from_project.repository_storage_path, + forked_from_project.disk_path, + project.namespace.full_path).and_return(import_jid) - project.add_import_job + expect(project.add_import_job).to eq(import_jid) end end @@ -1748,9 +1758,8 @@ describe Project do it 'schedules a RepositoryImportWorker job' do project = create(:project, import_url: generate(:url)) - expect(RepositoryImportWorker).to receive(:perform_async).with(project.id) - - project.add_import_job + expect(RepositoryImportWorker).to receive(:perform_async).with(project.id).and_return(import_jid) + expect(project.add_import_job).to eq(import_jid) end end end diff --git a/spec/workers/repository_import_worker_spec.rb b/spec/workers/repository_import_worker_spec.rb index ca904e512ac..100dfc32bbe 100644 --- a/spec/workers/repository_import_worker_spec.rb +++ b/spec/workers/repository_import_worker_spec.rb @@ -22,8 +22,8 @@ describe RepositoryImportWorker do it 'hide the credentials that were used in the import URL' do error = %q{remote: Not Found fatal: repository 'https://user:pass@test.com/root/repoC.git/' not found } + project.update_attributes(import_jid: '123') expect_any_instance_of(Projects::ImportService).to receive(:execute).and_return({ status: :error, message: error }) - allow(subject).to receive(:jid).and_return('123') expect do subject.perform(project.id) diff --git a/spec/workers/stuck_import_jobs_worker_spec.rb b/spec/workers/stuck_import_jobs_worker_spec.rb index 2f5b685a332..a82eb54ffe4 100644 --- a/spec/workers/stuck_import_jobs_worker_spec.rb +++ b/spec/workers/stuck_import_jobs_worker_spec.rb @@ -8,29 +8,29 @@ describe StuckImportJobsWorker do allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(exclusive_lease_uuid) end - describe 'long running import' do - let(:project) { create(:project, import_jid: '123', import_status: 'started') } + describe 'with started import_status' do + let(:project) { create(:project, :import_started, import_jid: '123') } - before do - allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return(['123']) - end + describe 'long running import' do + it 'marks the project as failed' do + allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return(['123']) - it 'marks the project as failed' do - expect { worker.perform }.to change { project.reload.import_status }.to('failed') + expect { worker.perform }.to change { project.reload.import_status }.to('failed') + end end - end - describe 'running import' do - let(:project) { create(:project, import_jid: '123', import_status: 'started') } - - before do - allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([]) - end + describe 'running import' do + it 'does not mark the project as failed' do + allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([]) - it 'does not mark the project as failed' do - worker.perform + expect { worker.perform }.not_to change { project.reload.import_status } + end - expect(project.reload.import_status).to eq('started') + describe 'import without import_jid' do + it 'marks the project as failed' do + expect { worker.perform }.to change { project.reload.import_status }.to('failed') + end + end end end end |