diff options
Diffstat (limited to 'app/workers/bulk_imports/pipeline_worker.rb')
-rw-r--r-- | app/workers/bulk_imports/pipeline_worker.rb | 25 |
1 files changed, 24 insertions, 1 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 256301bf097..d3297017714 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -4,6 +4,8 @@ module BulkImports class PipelineWorker # rubocop:disable Scalability/IdempotentWorker include ApplicationWorker + NDJSON_PIPELINE_PERFORM_DELAY = 1.minute + feature_category :importers tags :exclude_from_kubernetes @@ -40,6 +42,15 @@ module BulkImports private def run(pipeline_tracker) + if ndjson_pipeline?(pipeline_tracker) + status = ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class.relation) + + raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?(pipeline_tracker) + raise(Pipeline::FailedError, status.error) if status.failed? + + return reenqueue(pipeline_tracker) if status.started? + end + pipeline_tracker.update!(status_event: 'start', jid: jid) context = ::BulkImports::Pipeline::Context.new(pipeline_tracker) @@ -48,7 +59,7 @@ module BulkImports pipeline_tracker.finish! rescue StandardError => e - pipeline_tracker.fail_op! + pipeline_tracker.update!(status_event: 'fail_op', jid: jid) logger.error( worker: self.class.name, @@ -67,5 +78,17 @@ module BulkImports def logger @logger ||= Gitlab::Import::Logger.build end + + def ndjson_pipeline?(pipeline_tracker) + pipeline_tracker.pipeline_class.ndjson_pipeline? + end + + def job_timeout?(pipeline_tracker) + (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT + end + + def reenqueue(pipeline_tracker) + self.class.perform_in(NDJSON_PIPELINE_PERFORM_DELAY, pipeline_tracker.id, pipeline_tracker.stage, pipeline_tracker.entity.id) + end end end |