summaryrefslogtreecommitdiff
path: root/lib/bulk_imports/pipeline/runner.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r--lib/bulk_imports/pipeline/runner.rb117
1 files changed, 81 insertions, 36 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index 04038e50399..88b96f0ab6e 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -5,57 +5,102 @@ module BulkImports
module Runner
extend ActiveSupport::Concern
- included do
- private
+ MarkedAsFailedError = Class.new(StandardError)
- def extractors
- @extractors ||= self.class.extractors.map(&method(:instantiate))
- end
+ def run(context)
+ raise MarkedAsFailedError if marked_as_failed?(context)
- def transformers
- @transformers ||= self.class.transformers.map(&method(:instantiate))
- end
+ info(context, message: 'Pipeline started', pipeline_class: pipeline)
- def loaders
- @loaders ||= self.class.loaders.map(&method(:instantiate))
- end
+ extractors.each do |extractor|
+ data = run_pipeline_step(:extractor, extractor.class.name, context) do
+ extractor.extract(context)
+ end
- def pipeline_name
- @pipeline ||= self.class.name
- end
+ if data && data.respond_to?(:each)
+ data.each do |entry|
+ transformers.each do |transformer|
+ entry = run_pipeline_step(:transformer, transformer.class.name, context) do
+ transformer.transform(context, entry)
+ end
+ end
- def instantiate(class_config)
- class_config[:klass].new(class_config[:options])
+ loaders.each do |loader|
+ run_pipeline_step(:loader, loader.class.name, context) do
+ loader.load(context, entry)
+ end
+ end
+ end
+ end
end
+
+ after_run.call(context) if after_run.present?
+ rescue MarkedAsFailedError
+ log_skip(context)
end
- def run(context)
- info(context, message: "Pipeline started", pipeline: pipeline_name)
+ private # rubocop:disable Lint/UselessAccessModifier
- extractors.each do |extractor|
- extractor.extract(context).each do |entry|
- info(context, extractor: extractor.class.name)
+ def run_pipeline_step(type, class_name, context)
+ raise MarkedAsFailedError if marked_as_failed?(context)
- transformers.each do |transformer|
- info(context, transformer: transformer.class.name)
- entry = transformer.transform(context, entry)
- end
+ info(context, type => class_name)
- loaders.each do |loader|
- info(context, loader: loader.class.name)
- loader.load(context, entry)
- end
- end
- end
+ yield
+ rescue MarkedAsFailedError
+ log_skip(context, type => class_name)
+ rescue => e
+ log_import_failure(e, context)
+
+ mark_as_failed(context) if abort_on_failure?
end
- private # rubocop:disable Lint/UselessAccessModifier
+ def mark_as_failed(context)
+ warn(context, message: 'Pipeline failed', pipeline_class: pipeline)
+
+ context.entity.fail_op!
+ end
+
+ def marked_as_failed?(context)
+ return true if context.entity.failed?
+
+ false
+ end
+
+ def log_skip(context, extra = {})
+ log = {
+ message: 'Skipping due to failed pipeline status',
+ pipeline_class: pipeline
+ }.merge(extra)
+
+ info(context, log)
+ end
+
+ def log_import_failure(exception, context)
+ attributes = {
+ bulk_import_entity_id: context.entity.id,
+ pipeline_class: pipeline,
+ exception_class: exception.class.to_s,
+ exception_message: exception.message.truncate(255),
+ correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
+ }
+
+ BulkImports::Failure.create(attributes)
+ end
+
+ def warn(context, extra = {})
+ logger.warn(log_base_params(context).merge(extra))
+ end
def info(context, extra = {})
- logger.info({
- entity: context.entity.id,
- entity_type: context.entity.source_type
- }.merge(extra))
+ logger.info(log_base_params(context).merge(extra))
+ end
+
+ def log_base_params(context)
+ {
+ bulk_import_entity_id: context.entity.id,
+ bulk_import_entity_type: context.entity.source_type
+ }
end
def logger