diff options
Diffstat (limited to 'app/workers/gitlab/github_import/advance_stage_worker.rb')
-rw-r--r-- | app/workers/gitlab/github_import/advance_stage_worker.rb | 51 |
1 files changed, 4 insertions, 47 deletions
diff --git a/app/workers/gitlab/github_import/advance_stage_worker.rb b/app/workers/gitlab/github_import/advance_stage_worker.rb index 8c379be2ae4..8fbf88a1762 100644 --- a/app/workers/gitlab/github_import/advance_stage_worker.rb +++ b/app/workers/gitlab/github_import/advance_stage_worker.rb @@ -8,15 +8,12 @@ module Gitlab # stage. class AdvanceStageWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker + include ::Gitlab::Import::AdvanceStage sidekiq_options dead: false feature_category :importers - INTERVAL = 30.seconds.to_i - - # The number of seconds to wait (while blocking the thread) before - # continuing to the next waiter. - BLOCKING_WAIT_TIME = 5 + private # The known importer stages and their corresponding Sidekiq workers. STAGES = { @@ -26,49 +23,9 @@ module Gitlab finish: Stage::FinishImportWorker }.freeze - # project_id - The ID of the project being imported. - # waiters - A Hash mapping Gitlab::JobWaiter keys to the number of - # remaining jobs. - # next_stage - The name of the next stage to start when all jobs have been - # completed. - def perform(project_id, waiters, next_stage) - return unless import_state = find_import_state(project_id) - - new_waiters = wait_for_jobs(waiters) - - if new_waiters.empty? - # We refresh the import JID here so workers importing individual - # resources (e.g. notes) don't have to do this all the time, reducing - # the pressure on Redis. We _only_ do this once all jobs are done so - # we don't get stuck forever if one or more jobs failed to notify the - # JobWaiter. - import_state.refresh_jid_expiration - - STAGES.fetch(next_stage.to_sym).perform_async(project_id) - else - self.class.perform_in(INTERVAL, project_id, new_waiters, next_stage) - end - end - - def wait_for_jobs(waiters) - waiters.each_with_object({}) do |(key, remaining), new_waiters| - waiter = JobWaiter.new(remaining, key) - - # We wait for a brief moment of time so we don't reschedule if we can - # complete the work fast enough. - waiter.wait(BLOCKING_WAIT_TIME) - - next unless waiter.jobs_remaining.positive? - - new_waiters[waiter.key] = waiter.jobs_remaining - end - end - - # rubocop: disable CodeReuse/ActiveRecord - def find_import_state(project_id) - ProjectImportState.select(:jid).with_status(:started).find_by(project_id: project_id) + def next_stage_worker(next_stage) + STAGES.fetch(next_stage.to_sym) end - # rubocop: enable CodeReuse/ActiveRecord end end end |