1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
|
# frozen_string_literal: true
module BulkImports
module Pipeline
extend ActiveSupport::Concern
include Gitlab::Utils::StrongMemoize
include Gitlab::ClassAttributes
include Runner
NotAllowedError = Class.new(StandardError)
CACHE_KEY_EXPIRATION = 2.hours
def initialize(context)
@context = context
end
included do
private
attr_reader :context
# Fetch pipeline extractor.
# An extractor is defined either by instance `#extract(context)` method
# or by using `extractor` DSL.
#
# @example
# class MyPipeline
# extractor MyExtractor, foo: :bar
# end
#
# class MyPipeline
# def extract(context)
# puts 'Fetch some data'
# end
# end
#
# If pipeline implements instance method `extract` - use it
# and ignore class `extractor` method implementation.
def extractor
@extractor ||= self.respond_to?(:extract) ? self : instantiate(self.class.get_extractor)
end
# Fetch pipeline transformers.
#
# A transformer can be defined using:
# - `transformer` class method
# - `transform` instance method
#
# Multiple transformers can be defined within a single
# pipeline and run sequentially for each record in the
# following order:
# - Transformers defined using `transformer` class method
# - Instance method `transform`
#
# Instance method `transform` is always the last to run.
#
# @example
# class MyPipeline
# transformer MyTransformerOne, foo: :bar
# transformer MyTransformerTwo, foo: :bar
#
# def transform(context, data)
# # perform transformation here
# end
# end
#
# In the example above `#transform` is the first to run and
# `MyTransformerTwo` method is the last.
def transformers
strong_memoize(:transformers) do
defined_transformers = self.class.transformers.map(&method(:instantiate))
transformers = []
transformers << self if respond_to?(:transform)
transformers.concat(defined_transformers)
transformers
end
end
# Fetch pipeline loader.
# A loader is defined either by instance method `#load(context, data)`
# or by using `loader` DSL.
#
# @example
# class MyPipeline
# loader MyLoader, foo: :bar
# end
#
# class MyPipeline
# def load(context, data)
# puts 'Load some data'
# end
# end
#
# If pipeline implements instance method `load` - use it
# and ignore class `loader` method implementation.
def loader
@loader ||= self.respond_to?(:load) ? self : instantiate(self.class.get_loader)
end
def pipeline
@pipeline ||= self.class.name
end
def instantiate(class_config)
options = class_config[:options]
if options
class_config[:klass].new(class_config[:options])
else
class_config[:klass].new
end
end
def abort_on_failure?
self.class.abort_on_failure?
end
end
class_methods do
def extractor(klass, options = nil)
class_attributes[:extractor] = { klass: klass, options: options }
end
def transformer(klass, options = nil)
add_attribute(:transformers, klass, options)
end
def loader(klass, options = nil)
class_attributes[:loader] = { klass: klass, options: options }
end
def get_extractor
class_attributes[:extractor]
end
def transformers
class_attributes[:transformers] || []
end
def get_loader
class_attributes[:loader]
end
def abort_on_failure!
class_attributes[:abort_on_failure] = true
end
def abort_on_failure?
class_attributes[:abort_on_failure]
end
private
def add_attribute(sym, klass, options)
class_attributes[sym] ||= []
class_attributes[sym] << { klass: klass, options: options }
end
end
end
end
|