diff options
Diffstat (limited to 'app/workers/bulk_imports/pipeline_worker.rb')
-rw-r--r-- | app/workers/bulk_imports/pipeline_worker.rb | 70 |
1 files changed, 70 insertions, 0 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb new file mode 100644 index 00000000000..a6de3c36205 --- /dev/null +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -0,0 +1,70 @@ +# frozen_string_literal: true + +module BulkImports + class PipelineWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + feature_category :importers + + sidekiq_options retry: false, dead: false + + worker_has_external_dependencies! + + def perform(pipeline_tracker_id, stage, entity_id) + pipeline_tracker = ::BulkImports::Tracker + .with_status(:created) + .find_by_id(pipeline_tracker_id) + + if pipeline_tracker.present? + logger.info( + worker: self.class.name, + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name + ) + + run(pipeline_tracker) + else + logger.error( + worker: self.class.name, + entity_id: entity_id, + pipeline_tracker_id: pipeline_tracker_id, + message: 'Unstarted pipeline not found' + ) + end + + ensure + ::BulkImports::EntityWorker.perform_async(entity_id, stage) + end + + private + + def run(pipeline_tracker) + pipeline_tracker.update!(status_event: 'start', jid: jid) + + context = ::BulkImports::Pipeline::Context.new(pipeline_tracker) + + pipeline_tracker.pipeline_class.new(context).run + + pipeline_tracker.finish! + rescue => e + pipeline_tracker.fail_op! + + logger.error( + worker: self.class.name, + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name, + message: e.message + ) + + Gitlab::ErrorTracking.track_exception( + e, + entity_id: pipeline_tracker.entity.id, + pipeline_name: pipeline_tracker.pipeline_name + ) + end + + def logger + @logger ||= Gitlab::Import::Logger.build + end + end +end |