1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
|
# frozen_string_literal: true
module BulkImports
module Pipeline
extend ActiveSupport::Concern
include Gitlab::Utils::StrongMemoize
include Gitlab::ClassAttributes
include Runner
NotAllowedError = Class.new(StandardError)
ExpiredError = Class.new(StandardError)
FailedError = Class.new(StandardError)
CACHE_KEY_EXPIRATION = 2.hours
NDJSON_EXPORT_TIMEOUT = 30.minutes
def initialize(context)
@context = context
end
def tracker
@tracker ||= context.tracker
end
def portable
@portable ||= context.portable
end
def import_export_config
@import_export_config ||= context.import_export_config
end
def current_user
@current_user ||= context.current_user
end
included do
private
attr_reader :context
# Fetch pipeline extractor.
# An extractor is defined either by instance `#extract(context)` method
# or by using `extractor` DSL.
#
# @example
# class MyPipeline
# extractor MyExtractor, foo: :bar
# end
#
# class MyPipeline
# def extract(context)
# puts 'Fetch some data'
# end
# end
#
# If pipeline implements instance method `extract` - use it
# and ignore class `extractor` method implementation.
def extractor
@extractor ||= self.respond_to?(:extract) ? self : instantiate(self.class.get_extractor)
end
# Fetch pipeline transformers.
#
# A transformer can be defined using:
# - `transformer` class method
# - `transform` instance method
#
# Multiple transformers can be defined within a single
# pipeline and run sequentially for each record in the
# following order:
# - Instance method `transform`
# - Transformers defined using `transformer` class method
#
# Instance method `transform` is always the last to run.
#
# @example
# class MyPipeline
# transformer MyTransformerOne, foo: :bar
# transformer MyTransformerTwo, foo: :bar
#
# def transform(context, data)
# # perform transformation here
# end
# end
#
# In the example above `#transform` is the first to run and
# `MyTransformerTwo` method is the last.
def transformers
strong_memoize(:transformers) do
defined_transformers = self.class.transformers.map(&method(:instantiate))
transformers = []
transformers << self if respond_to?(:transform)
transformers.concat(defined_transformers)
transformers
end
end
# Fetch pipeline loader.
# A loader is defined either by instance method `#load(context, data)`
# or by using `loader` DSL.
#
# @example
# class MyPipeline
# loader MyLoader, foo: :bar
# end
#
# class MyPipeline
# def load(context, data)
# puts 'Load some data'
# end
# end
#
# If pipeline implements instance method `load` - use it
# and ignore class `loader` method implementation.
def loader
@loader ||= self.respond_to?(:load) ? self : instantiate(self.class.get_loader)
end
def pipeline
@pipeline ||= self.class.name
end
def instantiate(class_config)
options = class_config[:options]
if options
class_config[:klass].new(**class_config[:options])
else
class_config[:klass].new
end
end
def abort_on_failure?
self.class.abort_on_failure?
end
end
class_methods do
def extractor(klass, options = nil)
class_attributes[:extractor] = { klass: klass, options: options }
end
def transformer(klass, options = nil)
add_attribute(:transformers, klass, options)
end
def loader(klass, options = nil)
class_attributes[:loader] = { klass: klass, options: options }
end
def get_extractor
class_attributes[:extractor]
end
def transformers
class_attributes[:transformers] || []
end
def get_loader
class_attributes[:loader]
end
def abort_on_failure!
class_attributes[:abort_on_failure] = true
end
def abort_on_failure?
class_attributes[:abort_on_failure]
end
def ndjson_pipeline!
class_attributes[:ndjson_pipeline] = true
end
def ndjson_pipeline?
class_attributes[:ndjson_pipeline]
end
def relation_name(name)
class_attributes[:relation_name] = name
end
def relation
class_attributes[:relation_name]
end
private
def add_attribute(sym, klass, options)
class_attributes[sym] ||= []
class_attributes[sym] << { klass: klass, options: options }
end
end
end
end
|