summaryrefslogtreecommitdiff
path: root/lib/bulk_imports/pipeline/runner.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/bulk_imports/pipeline/runner.rb')
-rw-r--r--lib/bulk_imports/pipeline/runner.rb66
1 files changed, 66 insertions, 0 deletions
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
new file mode 100644
index 00000000000..04038e50399
--- /dev/null
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -0,0 +1,66 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Pipeline
+ module Runner
+ extend ActiveSupport::Concern
+
+ included do
+ private
+
+ def extractors
+ @extractors ||= self.class.extractors.map(&method(:instantiate))
+ end
+
+ def transformers
+ @transformers ||= self.class.transformers.map(&method(:instantiate))
+ end
+
+ def loaders
+ @loaders ||= self.class.loaders.map(&method(:instantiate))
+ end
+
+ def pipeline_name
+ @pipeline ||= self.class.name
+ end
+
+ def instantiate(class_config)
+ class_config[:klass].new(class_config[:options])
+ end
+ end
+
+ def run(context)
+ info(context, message: "Pipeline started", pipeline: pipeline_name)
+
+ extractors.each do |extractor|
+ extractor.extract(context).each do |entry|
+ info(context, extractor: extractor.class.name)
+
+ transformers.each do |transformer|
+ info(context, transformer: transformer.class.name)
+ entry = transformer.transform(context, entry)
+ end
+
+ loaders.each do |loader|
+ info(context, loader: loader.class.name)
+ loader.load(context, entry)
+ end
+ end
+ end
+ end
+
+ private # rubocop:disable Lint/UselessAccessModifier
+
+ def info(context, extra = {})
+ logger.info({
+ entity: context.entity.id,
+ entity_type: context.entity.source_type
+ }.merge(extra))
+ end
+
+ def logger
+ @logger ||= Gitlab::Import::Logger.build
+ end
+ end
+ end
+end