diff options
Diffstat (limited to 'app/workers/bulk_imports/pipeline_worker.rb')
-rw-r--r-- | app/workers/bulk_imports/pipeline_worker.rb | 79 |
1 files changed, 53 insertions, 26 deletions
diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 6d314774cff..5716f6e3f31 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -17,24 +17,34 @@ module BulkImports .find_by_id(pipeline_tracker_id) if pipeline_tracker.present? + @entity = @pipeline_tracker.entity + logger.info( structured_payload( - bulk_import_entity_id: pipeline_tracker.entity.id, - bulk_import_id: pipeline_tracker.entity.bulk_import_id, + bulk_import_entity_id: entity.id, + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, pipeline_name: pipeline_tracker.pipeline_name, message: 'Pipeline starting', + source_version: source_version, importer: 'gitlab_migration' ) ) run else + @entity = ::BulkImports::Entity.find(entity_id) + logger.error( structured_payload( bulk_import_entity_id: entity_id, - bulk_import_id: bulk_import_id(entity_id), + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, pipeline_tracker_id: pipeline_tracker_id, message: 'Unstarted pipeline not found', + source_version: source_version, importer: 'gitlab_migration' ) ) @@ -46,10 +56,10 @@ module BulkImports private - attr_reader :pipeline_tracker + attr_reader :pipeline_tracker, :entity def run - return skip_tracker if pipeline_tracker.entity.failed? + return skip_tracker if entity.failed? raise(Pipeline::ExpiredError, 'Pipeline timeout') if job_timeout? raise(Pipeline::FailedError, "Export from source instance failed: #{export_status.error}") if export_failed? @@ -65,33 +75,39 @@ module BulkImports fail_tracker(e) end - def bulk_import_id(entity_id) - @bulk_import_id ||= Entity.find(entity_id).bulk_import_id + def source_version + entity.bulk_import.source_version_info.to_s end def fail_tracker(exception) pipeline_tracker.update!(status_event: 'fail_op', jid: jid) - logger.error( - structured_payload( - bulk_import_entity_id: pipeline_tracker.entity.id, - bulk_import_id: pipeline_tracker.entity.bulk_import_id, + log_exception(exception, + { + bulk_import_entity_id: entity.id, + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, pipeline_name: pipeline_tracker.pipeline_name, - message: exception.message, + message: 'Pipeline failed', + source_version: source_version, importer: 'gitlab_migration' - ) + } ) Gitlab::ErrorTracking.track_exception( exception, - bulk_import_entity_id: pipeline_tracker.entity.id, - bulk_import_id: pipeline_tracker.entity.bulk_import_id, + bulk_import_entity_id: entity.id, + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, pipeline_name: pipeline_tracker.pipeline_name, + source_version: source_version, importer: 'gitlab_migration' ) BulkImports::Failure.create( - bulk_import_entity_id: context.entity.id, + bulk_import_entity_id: entity.id, pipeline_class: pipeline_tracker.pipeline_name, pipeline_step: 'pipeline_worker_run', exception_class: exception.class.to_s, @@ -109,7 +125,7 @@ module BulkImports delay, pipeline_tracker.id, pipeline_tracker.stage, - pipeline_tracker.entity.id + entity.id ) end @@ -128,7 +144,7 @@ module BulkImports def job_timeout? return false unless file_extraction_pipeline? - (Time.zone.now - pipeline_tracker.entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT + (Time.zone.now - entity.created_at) > Pipeline::NDJSON_EXPORT_TIMEOUT end def export_failed? @@ -150,14 +166,17 @@ module BulkImports end def retry_tracker(exception) - logger.error( - structured_payload( - bulk_import_entity_id: pipeline_tracker.entity.id, - bulk_import_id: pipeline_tracker.entity.bulk_import_id, + log_exception(exception, + { + bulk_import_entity_id: entity.id, + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, pipeline_name: pipeline_tracker.pipeline_name, - message: "Retrying error: #{exception.message}", + message: "Retrying pipeline", + source_version: source_version, importer: 'gitlab_migration' - ) + } ) pipeline_tracker.update!(status_event: 'retry', jid: jid) @@ -168,15 +187,23 @@ module BulkImports def skip_tracker logger.info( structured_payload( - bulk_import_entity_id: pipeline_tracker.entity.id, - bulk_import_id: pipeline_tracker.entity.bulk_import_id, + bulk_import_entity_id: entity.id, + bulk_import_id: entity.bulk_import_id, + bulk_import_entity_type: entity.source_type, + source_full_path: entity.source_full_path, pipeline_name: pipeline_tracker.pipeline_name, message: 'Skipping pipeline due to failed entity', + source_version: source_version, importer: 'gitlab_migration' ) ) pipeline_tracker.update!(status_event: 'skip', jid: jid) end + + def log_exception(exception, payload) + Gitlab::ExceptionLogFormatter.format!(exception, payload) + logger.error(structured_payload(payload)) + end end end |