summaryrefslogtreecommitdiff
path: root/lib/bulk_imports/pipeline.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/bulk_imports/pipeline.rb')
-rw-r--r--lib/bulk_imports/pipeline.rb88
1 files changed, 83 insertions, 5 deletions
diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb
index 1d55ad95887..14445162737 100644
--- a/lib/bulk_imports/pipeline.rb
+++ b/lib/bulk_imports/pipeline.rb
@@ -3,9 +3,14 @@
module BulkImports
module Pipeline
extend ActiveSupport::Concern
+ include Gitlab::Utils::StrongMemoize
include Gitlab::ClassAttributes
include Runner
+ NotAllowedError = Class.new(StandardError)
+
+ CACHE_KEY_EXPIRATION = 2.hours
+
def initialize(context)
@context = context
end
@@ -15,16 +20,83 @@ module BulkImports
attr_reader :context
+ # Fetch pipeline extractor.
+ # An extractor is defined either by instance `#extract(context)` method
+ # or by using `extractor` DSL.
+ #
+ # @example
+ # class MyPipeline
+ # extractor MyExtractor, foo: :bar
+ # end
+ #
+ # class MyPipeline
+ # def extract(context)
+ # puts 'Fetch some data'
+ # end
+ # end
+ #
+ # If pipeline implements instance method `extract` - use it
+ # and ignore class `extractor` method implementation.
def extractor
- @extractor ||= instantiate(self.class.get_extractor)
+ @extractor ||= self.respond_to?(:extract) ? self : instantiate(self.class.get_extractor)
end
+ # Fetch pipeline transformers.
+ #
+ # A transformer can be defined using:
+ # - `transformer` class method
+ # - `transform` instance method
+ #
+ # Multiple transformers can be defined within a single
+ # pipeline and run sequentially for each record in the
+ # following order:
+ # - Transformers defined using `transformer` class method
+ # - Instance method `transform`
+ #
+ # Instance method `transform` is always the last to run.
+ #
+ # @example
+ # class MyPipeline
+ # transformer MyTransformerOne, foo: :bar
+ # transformer MyTransformerTwo, foo: :bar
+ #
+ # def transform(context, data)
+ # # perform transformation here
+ # end
+ # end
+ #
+ # In the example above `#transform` is the first to run and
+ # `MyTransformerTwo` method is the last.
def transformers
- @transformers ||= self.class.transformers.map(&method(:instantiate))
+ strong_memoize(:transformers) do
+ defined_transformers = self.class.transformers.map(&method(:instantiate))
+
+ transformers = []
+ transformers << self if respond_to?(:transform)
+ transformers.concat(defined_transformers)
+ transformers
+ end
end
+ # Fetch pipeline loader.
+ # A loader is defined either by instance method `#load(context, data)`
+ # or by using `loader` DSL.
+ #
+ # @example
+ # class MyPipeline
+ # loader MyLoader, foo: :bar
+ # end
+ #
+ # class MyPipeline
+ # def load(context, data)
+ # puts 'Load some data'
+ # end
+ # end
+ #
+ # If pipeline implements instance method `load` - use it
+ # and ignore class `loader` method implementation.
def loader
- @loaders ||= instantiate(self.class.get_loader)
+ @loader ||= self.respond_to?(:load) ? self : instantiate(self.class.get_loader)
end
def pipeline
@@ -32,7 +104,13 @@ module BulkImports
end
def instantiate(class_config)
- class_config[:klass].new(class_config[:options])
+ options = class_config[:options]
+
+ if options
+ class_config[:klass].new(class_config[:options])
+ else
+ class_config[:klass].new
+ end
end
def abort_on_failure?
@@ -58,7 +136,7 @@ module BulkImports
end
def transformers
- class_attributes[:transformers]
+ class_attributes[:transformers] || []
end
def get_loader