1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
|
# frozen_string_literal: true
module BulkImports
module Pipeline
module Runner
extend ActiveSupport::Concern
MarkedAsFailedError = Class.new(StandardError)
def run
raise MarkedAsFailedError if context.entity.failed?
info(message: 'Pipeline started')
extracted_data = extracted_data_from
if extracted_data
extracted_data.each do |entry|
transformers.each do |transformer|
entry = run_pipeline_step(:transformer, transformer.class.name) do
transformer.transform(context, entry)
end
end
run_pipeline_step(:loader, loader.class.name) do
loader.load(context, entry)
end
end
tracker.update!(
has_next_page: extracted_data.has_next_page?,
next_page: extracted_data.next_page
)
run_pipeline_step(:after_run) do
after_run(extracted_data)
end
end
info(message: 'Pipeline finished')
rescue MarkedAsFailedError
skip!('Skipping pipeline due to failed entity')
end
private # rubocop:disable Lint/UselessAccessModifier
def run_pipeline_step(step, class_name = nil)
raise MarkedAsFailedError if context.entity.failed?
info(pipeline_step: step, step_class: class_name)
yield
rescue MarkedAsFailedError
skip!(
'Skipping pipeline due to failed entity',
pipeline_step: step,
step_class: class_name,
importer: 'gitlab_migration'
)
rescue BulkImports::NetworkError => e
if e.retriable?(context.tracker)
raise BulkImports::RetryPipelineError.new(e.message, e.retry_delay)
else
log_and_fail(e, step)
end
rescue BulkImports::RetryPipelineError
raise
rescue StandardError => e
log_and_fail(e, step)
end
def extracted_data_from
run_pipeline_step(:extractor, extractor.class.name) do
extractor.extract(context)
end
end
def after_run(extracted_data)
run if extracted_data.has_next_page?
end
def log_and_fail(exception, step)
log_import_failure(exception, step)
tracker.fail_op!
if abort_on_failure?
warn(message: 'Aborting entity migration due to pipeline failure')
context.entity.fail_op!
end
nil
end
def skip!(message, extra = {})
warn({ message: message }.merge(extra))
tracker.skip!
end
def log_import_failure(exception, step)
attributes = {
bulk_import_entity_id: context.entity.id,
pipeline_class: pipeline,
pipeline_step: step,
exception_class: exception.class.to_s,
exception_message: exception.message.truncate(255),
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
error(
bulk_import_id: context.bulk_import_id,
pipeline_step: step,
exception_class: exception.class.to_s,
exception_message: exception.message,
message: "Pipeline failed",
importer: 'gitlab_migration'
)
BulkImports::Failure.create(attributes)
end
def info(extra = {})
logger.info(log_params(extra))
end
def warn(extra = {})
logger.warn(log_params(extra))
end
def error(extra = {})
logger.error(log_params(extra))
end
def log_params(extra)
defaults = {
bulk_import_id: context.bulk_import_id,
bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type,
pipeline_class: pipeline,
context_extra: context.extra,
importer: 'gitlab_migration'
}
defaults
.merge(extra)
.reject { |_key, value| value.blank? }
end
def logger
@logger ||= Gitlab::Import::Logger.build
end
end
end
end
|