diff options
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r-- | lib/bulk_imports/pipeline/runner.rb | 30 |
1 files changed, 13 insertions, 17 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 88b96f0ab6e..11fb9722173 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -12,25 +12,15 @@ module BulkImports info(context, message: 'Pipeline started', pipeline_class: pipeline) - extractors.each do |extractor| - data = run_pipeline_step(:extractor, extractor.class.name, context) do - extractor.extract(context) + Array.wrap(extracted_data_from(context)).each do |entry| + transformers.each do |transformer| + entry = run_pipeline_step(:transformer, transformer.class.name, context) do + transformer.transform(context, entry) + end end - if data && data.respond_to?(:each) - data.each do |entry| - transformers.each do |transformer| - entry = run_pipeline_step(:transformer, transformer.class.name, context) do - transformer.transform(context, entry) - end - end - - loaders.each do |loader| - run_pipeline_step(:loader, loader.class.name, context) do - loader.load(context, entry) - end - end - end + run_pipeline_step(:loader, loader.class.name, context) do + loader.load(context, entry) end end @@ -55,6 +45,12 @@ module BulkImports mark_as_failed(context) if abort_on_failure? end + def extracted_data_from(context) + run_pipeline_step(:extractor, extractor.class.name, context) do + extractor.extract(context) + end + end + def mark_as_failed(context) warn(context, message: 'Pipeline failed', pipeline_class: pipeline) |