summaryrefslogtreecommitdiff
path: root/app/workers/bulk_imports/pipeline_worker.rb
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/bulk_imports/pipeline_worker.rb')
-rw-r--r--app/workers/bulk_imports/pipeline_worker.rb25
1 files changed, 24 insertions, 1 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb
index 256301bf097..d3297017714 100644
--- a/app/workers/bulk_imports/pipeline_worker.rb
+++ b/app/workers/bulk_imports/pipeline_worker.rb
@@ -4,6 +4,8 @@ module BulkImports
class PipelineWorker # rubocop:disable Scalability/IdempotentWorker
include ApplicationWorker
+ NDJSON_PIPELINE_PERFORM_DELAY = 1.minute
+
feature_category :importers
tags :exclude_from_kubernetes
@@ -40,6 +42,15 @@ module BulkImports
private
def run(pipeline_tracker)
+ if ndjson_pipeline?(pipeline_tracker)
+ status = ExportStatus.new(pipeline_tracker, pipeline_tracker.pipeline_class.relation)
+
+ raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout?(pipeline_tracker)
+ raise(Pipeline::FailedError, status.error) if status.failed?
+
+ return reenqueue(pipeline_tracker) if status.started?
+ end
+
pipeline_tracker.update!(status_event: 'start', jid: jid)
context = ::BulkImports::Pipeline::Context.new(pipeline_tracker)
@@ -48,7 +59,7 @@ module BulkImports
pipeline_tracker.finish!
rescue StandardError => e
- pipeline_tracker.fail_op!
+ pipeline_tracker.update!(status_event: 'fail_op', jid: jid)
logger.error(
worker: self.class.name,
@@ -67,5 +78,17 @@ module BulkImports
def logger
@logger ||= Gitlab::Import::Logger.build
end
+
+ def ndjson_pipeline?(pipeline_tracker)
+ pipeline_tracker.pipeline_class.ndjson_pipeline?
+ end
+
+ def job_timeout?(pipeline_tracker)
+ (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT
+ end
+
+ def reenqueue(pipeline_tracker)
+ self.class.perform_in(NDJSON_PIPELINE_PERFORM_DELAY, pipeline_tracker.id, pipeline_tracker.stage, pipeline_tracker.entity.id)
+ end
end
end