summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--Gemfile2
-rw-r--r--Gemfile.lock3
-rw-r--r--app/models/project.rb25
-rw-r--r--app/workers/repository_fork_worker.rb16
-rw-r--r--app/workers/repository_import_worker.rb24
-rw-r--r--app/workers/stuck_import_jobs_worker.rb50
-rw-r--r--lib/after_commit_queue.rb30
-rw-r--r--lib/gitlab/sidekiq_status.rb9
-rw-r--r--spec/lib/after_commit_queue_spec.rb15
-rw-r--r--spec/models/project_spec.rb27
-rw-r--r--spec/workers/repository_import_worker_spec.rb2
-rw-r--r--spec/workers/stuck_import_jobs_worker_spec.rb34
12 files changed, 170 insertions, 67 deletions
diff --git a/Gemfile b/Gemfile
index a484cefb9a4..dd0c9bc9c0c 100644
--- a/Gemfile
+++ b/Gemfile
@@ -144,8 +144,6 @@ end
# State machine
gem 'state_machines-activerecord', '~> 0.4.0'
-# Run events after state machine commits
-gem 'after_commit_queue', '~> 1.3.0'
# Issue tags
gem 'acts-as-taggable-on', '~> 4.0'
diff --git a/Gemfile.lock b/Gemfile.lock
index a93caba2393..e162a191951 100644
--- a/Gemfile.lock
+++ b/Gemfile.lock
@@ -46,8 +46,6 @@ GEM
ice_nine (~> 0.11.0)
memoizable (~> 0.4.0)
addressable (2.3.8)
- after_commit_queue (1.3.0)
- activerecord (>= 3.0)
akismet (2.0.0)
allocations (1.0.5)
arel (6.0.4)
@@ -963,7 +961,6 @@ DEPENDENCIES
activerecord_sane_schema_dumper (= 0.2)
acts-as-taggable-on (~> 4.0)
addressable (~> 2.3.8)
- after_commit_queue (~> 1.3.0)
akismet (~> 2.0)
allocations (~> 1.0)
asana (~> 0.6.0)
diff --git a/app/models/project.rb b/app/models/project.rb
index 0de7da0ddaa..6955a8a161b 100644
--- a/app/models/project.rb
+++ b/app/models/project.rb
@@ -369,7 +369,10 @@ class Project < ActiveRecord::Base
state :failed
after_transition [:none, :finished, :failed] => :scheduled do |project, _|
- project.run_after_commit { add_import_job }
+ project.run_after_commit do
+ job_id = add_import_job
+ update(import_jid: job_id) if job_id
+ end
end
after_transition started: :finished do |project, _|
@@ -524,17 +527,26 @@ class Project < ActiveRecord::Base
def add_import_job
job_id =
if forked?
- RepositoryForkWorker.perform_async(id, forked_from_project.repository_storage_path,
- forked_from_project.full_path,
- self.namespace.full_path)
+ RepositoryForkWorker.perform_async(id,
+ forked_from_project.repository_storage_path,
+ forked_from_project.full_path,
+ self.namespace.full_path)
else
RepositoryImportWorker.perform_async(self.id)
end
+ log_import_activity(job_id)
+
+ job_id
+ end
+
+ def log_import_activity(job_id, type: :import)
+ job_type = type.to_s.capitalize
+
if job_id
- Rails.logger.info "Import job started for #{full_path} with job ID #{job_id}"
+ Rails.logger.info("#{job_type} job scheduled for #{full_path} with job ID #{job_id}.")
else
- Rails.logger.error "Import job failed to start for #{full_path}"
+ Rails.logger.error("#{job_type} job failed to create for #{full_path}.")
end
end
@@ -543,6 +555,7 @@ class Project < ActiveRecord::Base
ProjectCacheWorker.perform_async(self.id)
end
+ update(import_error: nil)
remove_import_data
end
diff --git a/app/workers/repository_fork_worker.rb b/app/workers/repository_fork_worker.rb
index a338523dc6b..cde5b45ad41 100644
--- a/app/workers/repository_fork_worker.rb
+++ b/app/workers/repository_fork_worker.rb
@@ -5,14 +5,17 @@ class RepositoryForkWorker
include Gitlab::ShellAdapter
include DedicatedSidekiqQueue
+ sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
+
def perform(project_id, forked_from_repository_storage_path, source_path, target_path)
+ project = Project.find(project_id)
+
+ return unless start_fork(project)
+
Gitlab::Metrics.add_event(:fork_repository,
source_path: source_path,
target_path: target_path)
- project = Project.find(project_id)
- project.import_start
-
result = gitlab_shell.fork_repository(forked_from_repository_storage_path, source_path,
project.repository_storage_path, target_path)
raise ForkError, "Unable to fork project #{project_id} for repository #{source_path} -> #{target_path}" unless result
@@ -33,6 +36,13 @@ class RepositoryForkWorker
private
+ def start_fork(project)
+ return true if project.import_start
+
+ Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while forking.")
+ false
+ end
+
def fail_fork(project, message)
Rails.logger.error(message)
project.mark_import_as_failed(message)
diff --git a/app/workers/repository_import_worker.rb b/app/workers/repository_import_worker.rb
index 6be541abd3e..2c2d1e8b91f 100644
--- a/app/workers/repository_import_worker.rb
+++ b/app/workers/repository_import_worker.rb
@@ -4,23 +4,18 @@ class RepositoryImportWorker
include Sidekiq::Worker
include DedicatedSidekiqQueue
- sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_EXPIRATION
-
- attr_accessor :project, :current_user
+ sidekiq_options status_expiration: StuckImportJobsWorker::IMPORT_JOBS_EXPIRATION
def perform(project_id)
- @project = Project.find(project_id)
- @current_user = @project.creator
+ project = Project.find(project_id)
- project.import_start
+ return unless start_import(project)
Gitlab::Metrics.add_event(:import_repository,
- import_url: @project.import_url,
- path: @project.full_path)
-
- project.update_columns(import_jid: self.jid, import_error: nil)
+ import_url: project.import_url,
+ path: project.full_path)
- result = Projects::ImportService.new(project, current_user).execute
+ result = Projects::ImportService.new(project, project.creator).execute
raise ImportError, result[:message] if result[:status] == :error
project.repository.after_import
@@ -37,6 +32,13 @@ class RepositoryImportWorker
private
+ def start_import(project)
+ return true if project.import_start
+
+ Rails.logger.info("Project #{project.full_path} was in inconsistent state (#{project.import_status}) while importing.")
+ false
+ end
+
def fail_import(project, message)
project.mark_import_as_failed(message)
end
diff --git a/app/workers/stuck_import_jobs_worker.rb b/app/workers/stuck_import_jobs_worker.rb
index bfc5e667bb6..f850e459cd9 100644
--- a/app/workers/stuck_import_jobs_worker.rb
+++ b/app/workers/stuck_import_jobs_worker.rb
@@ -2,36 +2,60 @@ class StuckImportJobsWorker
include Sidekiq::Worker
include CronjobQueue
- IMPORT_EXPIRATION = 15.hours.to_i
+ IMPORT_JOBS_EXPIRATION = 15.hours.to_i
def perform
- stuck_projects.find_in_batches(batch_size: 500) do |group|
+ projects_without_jid_count = mark_projects_without_jid_as_failed!
+ projects_with_jid_count = mark_projects_with_jid_as_failed!
+
+ Gitlab::Metrics.add_event(:stuck_import_jobs,
+ projects_without_jid_count: projects_without_jid_count,
+ projects_with_jid_count: projects_with_jid_count)
+ end
+
+ private
+
+ def mark_projects_without_jid_as_failed!
+ started_projects_without_jid.each do |project|
+ project.mark_import_as_failed(error_message)
+ end.count
+ end
+
+ def mark_projects_with_jid_as_failed!
+ completed_jids_count = 0
+
+ started_projects_with_jid.find_in_batches(batch_size: 500) do |group|
jids = group.map(&:import_jid)
# Find the jobs that aren't currently running or that exceeded the threshold.
- completed_jids = Gitlab::SidekiqStatus.completed_jids(jids)
+ completed_jids = Gitlab::SidekiqStatus.completed_jids(jids).to_set
if completed_jids.any?
- completed_ids = group.select { |project| completed_jids.include?(project.import_jid) }.map(&:id)
+ completed_jids_count += completed_jids.count
+ group.each do |project|
+ project.mark_import_as_failed(error_message) if completed_jids.include?(project.import_jid)
+ end
- fail_batch!(completed_jids, completed_ids)
+ Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.to_a.join(', ')}")
end
end
- end
- private
+ completed_jids_count
+ end
- def stuck_projects
- Project.select('id, import_jid').with_import_status(:started).where.not(import_jid: nil)
+ def started_projects
+ Project.with_import_status(:started)
end
- def fail_batch!(completed_jids, completed_ids)
- Project.where(id: completed_ids).update_all(import_status: 'failed', import_error: error_message)
+ def started_projects_with_jid
+ started_projects.where.not(import_jid: nil)
+ end
- Rails.logger.info("Marked stuck import jobs as failed. JIDs: #{completed_jids.join(', ')}")
+ def started_projects_without_jid
+ started_projects.where(import_jid: nil)
end
def error_message
- "Import timed out. Import took longer than #{IMPORT_EXPIRATION} seconds"
+ "Import timed out. Import took longer than #{IMPORT_JOBS_EXPIRATION} seconds"
end
end
diff --git a/lib/after_commit_queue.rb b/lib/after_commit_queue.rb
new file mode 100644
index 00000000000..b67575a3ac2
--- /dev/null
+++ b/lib/after_commit_queue.rb
@@ -0,0 +1,30 @@
+module AfterCommitQueue
+ extend ActiveSupport::Concern
+
+ included do
+ after_commit :_run_after_commit_queue
+ after_rollback :_clear_after_commit_queue
+ end
+
+ def run_after_commit(method = nil, &block)
+ _after_commit_queue << proc { self.send(method) } if method
+ _after_commit_queue << block if block
+ true
+ end
+
+ protected
+
+ def _run_after_commit_queue
+ while action = _after_commit_queue.pop
+ self.instance_eval(&action)
+ end
+ end
+
+ def _after_commit_queue
+ @after_commit_queue ||= []
+ end
+
+ def _clear_after_commit_queue
+ _after_commit_queue.clear
+ end
+end
diff --git a/lib/gitlab/sidekiq_status.rb b/lib/gitlab/sidekiq_status.rb
index ca8d3271541..a0a2769cf9e 100644
--- a/lib/gitlab/sidekiq_status.rb
+++ b/lib/gitlab/sidekiq_status.rb
@@ -90,9 +90,14 @@ module Gitlab
#
# Returns an array of completed JIDs
def self.completed_jids(job_ids)
- Sidekiq.redis do |redis|
- job_ids.reject { |jid| redis.exists(key_for(jid)) }
+ statuses = job_status(job_ids)
+
+ completed = []
+ job_ids.zip(statuses).each do |job_id, status|
+ completed << job_id unless status
end
+
+ completed
end
def self.key_for(jid)
diff --git a/spec/lib/after_commit_queue_spec.rb b/spec/lib/after_commit_queue_spec.rb
new file mode 100644
index 00000000000..6e7c2ec2363
--- /dev/null
+++ b/spec/lib/after_commit_queue_spec.rb
@@ -0,0 +1,15 @@
+require 'spec_helper'
+
+describe AfterCommitQueue do
+ it 'runs after transaction is committed' do
+ called = false
+ test_proc = proc { called = true }
+
+ project = build(:project)
+ project.run_after_commit(&test_proc)
+
+ project.save
+
+ expect(called).to be true
+ end
+end
diff --git a/spec/models/project_spec.rb b/spec/models/project_spec.rb
index eba71ba2f72..d47277dd675 100644
--- a/spec/models/project_spec.rb
+++ b/spec/models/project_spec.rb
@@ -1610,8 +1610,7 @@ describe Project do
it 'imports a project' do
expect_any_instance_of(RepositoryImportWorker).to receive(:perform).and_call_original
- project.import_schedule
-
+ expect { project.import_schedule }.to change { project.import_jid }
expect(project.reload.import_status).to eq('finished')
end
end
@@ -1624,6 +1623,13 @@ describe Project do
allow(Projects::HousekeepingService).to receive(:new) { housekeeping_service }
end
+ it 'resets project import_error' do
+ error_message = 'Some error'
+ mirror = create(:project_empty_repo, :import_started, import_error: error_message)
+
+ expect { mirror.import_finish }.to change { mirror.import_error }.from(error_message).to(nil)
+ end
+
it 'performs housekeeping when an import of a fresh project is completed' do
project = create(:project_empty_repo, :import_started, import_type: :github)
@@ -1730,17 +1736,21 @@ describe Project do
end
describe '#add_import_job' do
+ let(:import_jid) { '123' }
+
context 'forked' do
let(:forked_project_link) { create(:forked_project_link, :forked_to_empty_project) }
let(:forked_from_project) { forked_project_link.forked_from_project }
let(:project) { forked_project_link.forked_to_project }
it 'schedules a RepositoryForkWorker job' do
- expect(RepositoryForkWorker).to receive(:perform_async)
- .with(project.id, forked_from_project.repository_storage_path,
- forked_from_project.disk_path, project.namespace.full_path)
+ expect(RepositoryForkWorker).to receive(:perform_async).with(
+ project.id,
+ forked_from_project.repository_storage_path,
+ forked_from_project.disk_path,
+ project.namespace.full_path).and_return(import_jid)
- project.add_import_job
+ expect(project.add_import_job).to eq(import_jid)
end
end
@@ -1748,9 +1758,8 @@ describe Project do
it 'schedules a RepositoryImportWorker job' do
project = create(:project, import_url: generate(:url))
- expect(RepositoryImportWorker).to receive(:perform_async).with(project.id)
-
- project.add_import_job
+ expect(RepositoryImportWorker).to receive(:perform_async).with(project.id).and_return(import_jid)
+ expect(project.add_import_job).to eq(import_jid)
end
end
end
diff --git a/spec/workers/repository_import_worker_spec.rb b/spec/workers/repository_import_worker_spec.rb
index ca904e512ac..100dfc32bbe 100644
--- a/spec/workers/repository_import_worker_spec.rb
+++ b/spec/workers/repository_import_worker_spec.rb
@@ -22,8 +22,8 @@ describe RepositoryImportWorker do
it 'hide the credentials that were used in the import URL' do
error = %q{remote: Not Found fatal: repository 'https://user:pass@test.com/root/repoC.git/' not found }
+ project.update_attributes(import_jid: '123')
expect_any_instance_of(Projects::ImportService).to receive(:execute).and_return({ status: :error, message: error })
- allow(subject).to receive(:jid).and_return('123')
expect do
subject.perform(project.id)
diff --git a/spec/workers/stuck_import_jobs_worker_spec.rb b/spec/workers/stuck_import_jobs_worker_spec.rb
index 2f5b685a332..a82eb54ffe4 100644
--- a/spec/workers/stuck_import_jobs_worker_spec.rb
+++ b/spec/workers/stuck_import_jobs_worker_spec.rb
@@ -8,29 +8,29 @@ describe StuckImportJobsWorker do
allow_any_instance_of(Gitlab::ExclusiveLease).to receive(:try_obtain).and_return(exclusive_lease_uuid)
end
- describe 'long running import' do
- let(:project) { create(:project, import_jid: '123', import_status: 'started') }
+ describe 'with started import_status' do
+ let(:project) { create(:project, :import_started, import_jid: '123') }
- before do
- allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return(['123'])
- end
+ describe 'long running import' do
+ it 'marks the project as failed' do
+ allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return(['123'])
- it 'marks the project as failed' do
- expect { worker.perform }.to change { project.reload.import_status }.to('failed')
+ expect { worker.perform }.to change { project.reload.import_status }.to('failed')
+ end
end
- end
- describe 'running import' do
- let(:project) { create(:project, import_jid: '123', import_status: 'started') }
-
- before do
- allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
- end
+ describe 'running import' do
+ it 'does not mark the project as failed' do
+ allow(Gitlab::SidekiqStatus).to receive(:completed_jids).and_return([])
- it 'does not mark the project as failed' do
- worker.perform
+ expect { worker.perform }.not_to change { project.reload.import_status }
+ end
- expect(project.reload.import_status).to eq('started')
+ describe 'import without import_jid' do
+ it 'marks the project as failed' do
+ expect { worker.perform }.to change { project.reload.import_status }.to('failed')
+ end
+ end
end
end
end