diff options
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r-- | lib/bulk_imports/pipeline/runner.rb | 117 |
1 files changed, 81 insertions, 36 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 04038e50399..88b96f0ab6e 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -5,57 +5,102 @@ module BulkImports module Runner extend ActiveSupport::Concern - included do - private + MarkedAsFailedError = Class.new(StandardError) - def extractors - @extractors ||= self.class.extractors.map(&method(:instantiate)) - end + def run(context) + raise MarkedAsFailedError if marked_as_failed?(context) - def transformers - @transformers ||= self.class.transformers.map(&method(:instantiate)) - end + info(context, message: 'Pipeline started', pipeline_class: pipeline) - def loaders - @loaders ||= self.class.loaders.map(&method(:instantiate)) - end + extractors.each do |extractor| + data = run_pipeline_step(:extractor, extractor.class.name, context) do + extractor.extract(context) + end - def pipeline_name - @pipeline ||= self.class.name - end + if data && data.respond_to?(:each) + data.each do |entry| + transformers.each do |transformer| + entry = run_pipeline_step(:transformer, transformer.class.name, context) do + transformer.transform(context, entry) + end + end - def instantiate(class_config) - class_config[:klass].new(class_config[:options]) + loaders.each do |loader| + run_pipeline_step(:loader, loader.class.name, context) do + loader.load(context, entry) + end + end + end + end end + + after_run.call(context) if after_run.present? + rescue MarkedAsFailedError + log_skip(context) end - def run(context) - info(context, message: "Pipeline started", pipeline: pipeline_name) + private # rubocop:disable Lint/UselessAccessModifier - extractors.each do |extractor| - extractor.extract(context).each do |entry| - info(context, extractor: extractor.class.name) + def run_pipeline_step(type, class_name, context) + raise MarkedAsFailedError if marked_as_failed?(context) - transformers.each do |transformer| - info(context, transformer: transformer.class.name) - entry = transformer.transform(context, entry) - end + info(context, type => class_name) - loaders.each do |loader| - info(context, loader: loader.class.name) - loader.load(context, entry) - end - end - end + yield + rescue MarkedAsFailedError + log_skip(context, type => class_name) + rescue => e + log_import_failure(e, context) + + mark_as_failed(context) if abort_on_failure? end - private # rubocop:disable Lint/UselessAccessModifier + def mark_as_failed(context) + warn(context, message: 'Pipeline failed', pipeline_class: pipeline) + + context.entity.fail_op! + end + + def marked_as_failed?(context) + return true if context.entity.failed? + + false + end + + def log_skip(context, extra = {}) + log = { + message: 'Skipping due to failed pipeline status', + pipeline_class: pipeline + }.merge(extra) + + info(context, log) + end + + def log_import_failure(exception, context) + attributes = { + bulk_import_entity_id: context.entity.id, + pipeline_class: pipeline, + exception_class: exception.class.to_s, + exception_message: exception.message.truncate(255), + correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id + } + + BulkImports::Failure.create(attributes) + end + + def warn(context, extra = {}) + logger.warn(log_base_params(context).merge(extra)) + end def info(context, extra = {}) - logger.info({ - entity: context.entity.id, - entity_type: context.entity.source_type - }.merge(extra)) + logger.info(log_base_params(context).merge(extra)) + end + + def log_base_params(context) + { + bulk_import_entity_id: context.entity.id, + bulk_import_entity_type: context.entity.source_type + } end def logger |