summaryrefslogtreecommitdiff
path: root/lib/bulk_imports
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2020-12-17 11:59:07 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2020-12-17 11:59:07 +0000
commit8b573c94895dc0ac0e1d9d59cf3e8745e8b539ca (patch)
tree544930fb309b30317ae9797a9683768705d664c4 /lib/bulk_imports
parent4b1de649d0168371549608993deac953eb692019 (diff)
downloadgitlab-ce-8b573c94895dc0ac0e1d9d59cf3e8745e8b539ca.tar.gz
Add latest changes from gitlab-org/gitlab@13-7-stable-eev13.7.0-rc42
Diffstat (limited to 'lib/bulk_imports')
-rw-r--r--lib/bulk_imports/common/extractors/graphql_extractor.rb27
-rw-r--r--lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb54
-rw-r--r--lib/bulk_imports/common/transformers/hash_key_digger.rb23
-rw-r--r--lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb39
-rw-r--r--lib/bulk_imports/groups/graphql/get_group_query.rb4
-rw-r--r--lib/bulk_imports/groups/pipelines/group_pipeline.rb5
-rw-r--r--lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb1
-rw-r--r--lib/bulk_imports/importers/group_importer.rb1
-rw-r--r--lib/bulk_imports/pipeline.rb81
-rw-r--r--lib/bulk_imports/pipeline/attributes.rb41
-rw-r--r--lib/bulk_imports/pipeline/runner.rb117
11 files changed, 242 insertions, 151 deletions
diff --git a/lib/bulk_imports/common/extractors/graphql_extractor.rb b/lib/bulk_imports/common/extractors/graphql_extractor.rb
index 7d58032cfcc..c0cef61d2b2 100644
--- a/lib/bulk_imports/common/extractors/graphql_extractor.rb
+++ b/lib/bulk_imports/common/extractors/graphql_extractor.rb
@@ -6,15 +6,16 @@ module BulkImports
class GraphqlExtractor
def initialize(query)
@query = query[:query]
- @query_string = @query.to_s
- @variables = @query.variables
end
def extract(context)
- @context = context
+ client = graphql_client(context)
Enumerator.new do |yielder|
- result = graphql_client.execute(parsed_query, query_variables(context.entity))
+ result = client.execute(
+ client.parse(query.to_s),
+ query.variables(context.entity)
+ )
yielder << result.original_hash.deep_dup
end
@@ -22,23 +23,17 @@ module BulkImports
private
- def graphql_client
+ attr_reader :query
+
+ def graphql_client(context)
@graphql_client ||= BulkImports::Clients::Graphql.new(
- url: @context.configuration.url,
- token: @context.configuration.access_token
+ url: context.configuration.url,
+ token: context.configuration.access_token
)
end
def parsed_query
- @parsed_query ||= graphql_client.parse(@query.to_s)
- end
-
- def query_variables(entity)
- return unless @variables
-
- @variables.transform_values do |entity_attribute|
- entity.public_send(entity_attribute) # rubocop:disable GitlabSecurity/PublicSend
- end
+ @parsed_query ||= graphql_client.parse(query.to_s)
end
end
end
diff --git a/lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb b/lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb
deleted file mode 100644
index dce0fac6999..00000000000
--- a/lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb
+++ /dev/null
@@ -1,54 +0,0 @@
-# frozen_string_literal: true
-
-# Cleanup GraphQL original response hash from unnecessary nesting
-# 1. Remove ['data']['group'] or ['data']['project'] hash nesting
-# 2. Remove ['edges'] & ['nodes'] array wrappings
-# 3. Remove ['node'] hash wrapping
-#
-# @example
-# data = {"data"=>{"group"=> {
-# "name"=>"test",
-# "fullName"=>"test",
-# "description"=>"test",
-# "labels"=>{"edges"=>[{"node"=>{"title"=>"label1"}}, {"node"=>{"title"=>"label2"}}, {"node"=>{"title"=>"label3"}}]}}}}
-#
-# BulkImports::Common::Transformers::GraphqlCleanerTransformer.new.transform(nil, data)
-#
-# {"name"=>"test", "fullName"=>"test", "description"=>"test", "labels"=>[{"title"=>"label1"}, {"title"=>"label2"}, {"title"=>"label3"}]}
-module BulkImports
- module Common
- module Transformers
- class GraphqlCleanerTransformer
- EDGES = 'edges'
- NODE = 'node'
-
- def initialize(options = {})
- @options = options
- end
-
- def transform(_, data)
- return data unless data.is_a?(Hash)
-
- data = data.dig('data', 'group') || data.dig('data', 'project') || data
-
- clean_edges_and_nodes(data)
- end
-
- def clean_edges_and_nodes(data)
- case data
- when Array
- data.map(&method(:clean_edges_and_nodes))
- when Hash
- if data.key?(NODE)
- clean_edges_and_nodes(data[NODE])
- else
- data.transform_values { |value| clean_edges_and_nodes(value.try(:fetch, EDGES, value) || value) }
- end
- else
- data
- end
- end
- end
- end
- end
-end
diff --git a/lib/bulk_imports/common/transformers/hash_key_digger.rb b/lib/bulk_imports/common/transformers/hash_key_digger.rb
new file mode 100644
index 00000000000..b4897b5b2bf
--- /dev/null
+++ b/lib/bulk_imports/common/transformers/hash_key_digger.rb
@@ -0,0 +1,23 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Common
+ module Transformers
+ class HashKeyDigger
+ def initialize(options = {})
+ @key_path = options[:key_path]
+ end
+
+ def transform(_, data)
+ raise ArgumentError, "Given data must be a Hash" unless data.is_a?(Hash)
+
+ data.dig(*Array.wrap(key_path))
+ end
+
+ private
+
+ attr_reader :key_path
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb b/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb
new file mode 100644
index 00000000000..858c4c8976b
--- /dev/null
+++ b/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb
@@ -0,0 +1,39 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Common
+ module Transformers
+ class ProhibitedAttributesTransformer
+ PROHIBITED_REFERENCES = Regexp.union(
+ /\Acached_markdown_version\Z/,
+ /\Aid\Z/,
+ /_id\Z/,
+ /_ids\Z/,
+ /_html\Z/,
+ /attributes/,
+ /\Aremote_\w+_(url|urls|request_header)\Z/ # carrierwave automatically creates these attribute methods for uploads
+ ).freeze
+
+ def initialize(options = {})
+ @options = options
+ end
+
+ def transform(context, data)
+ data.each_with_object({}) do |(key, value), result|
+ prohibited = prohibited_key?(key)
+
+ unless prohibited
+ result[key] = value.is_a?(Hash) ? transform(context, value) : value
+ end
+ end
+ end
+
+ private
+
+ def prohibited_key?(key)
+ key.to_s =~ PROHIBITED_REFERENCES
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/graphql/get_group_query.rb b/lib/bulk_imports/groups/graphql/get_group_query.rb
index c50b99aae4e..2bc0f60baa2 100644
--- a/lib/bulk_imports/groups/graphql/get_group_query.rb
+++ b/lib/bulk_imports/groups/graphql/get_group_query.rb
@@ -29,8 +29,8 @@ module BulkImports
GRAPHQL
end
- def variables
- { full_path: :source_full_path }
+ def variables(entity)
+ { full_path: entity.source_full_path }
end
end
end
diff --git a/lib/bulk_imports/groups/pipelines/group_pipeline.rb b/lib/bulk_imports/groups/pipelines/group_pipeline.rb
index 2b7d0ef7658..5169e292180 100644
--- a/lib/bulk_imports/groups/pipelines/group_pipeline.rb
+++ b/lib/bulk_imports/groups/pipelines/group_pipeline.rb
@@ -6,10 +6,13 @@ module BulkImports
class GroupPipeline
include Pipeline
+ abort_on_failure!
+
extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery
- transformer Common::Transformers::GraphqlCleanerTransformer
+ transformer Common::Transformers::HashKeyDigger, key_path: %w[data group]
transformer Common::Transformers::UnderscorifyKeysTransformer
+ transformer Common::Transformers::ProhibitedAttributesTransformer
transformer Groups::Transformers::GroupAttributesTransformer
loader Groups::Loaders::GroupLoader
diff --git a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb
index 6384e9d5972..d7e1a118d0b 100644
--- a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb
+++ b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb
@@ -7,6 +7,7 @@ module BulkImports
include Pipeline
extractor BulkImports::Groups::Extractors::SubgroupsExtractor
+ transformer Common::Transformers::ProhibitedAttributesTransformer
transformer BulkImports::Groups::Transformers::SubgroupToEntityTransformer
loader BulkImports::Common::Loaders::EntityLoader
end
diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb
index c7253590c87..82cb1ca03a2 100644
--- a/lib/bulk_imports/importers/group_importer.rb
+++ b/lib/bulk_imports/importers/group_importer.rb
@@ -19,6 +19,7 @@ module BulkImports
)
BulkImports::Groups::Pipelines::GroupPipeline.new.run(context)
+ 'BulkImports::EE::Groups::Pipelines::EpicsPipeline'.constantize.new.run(context) if Gitlab.ee?
BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline.new.run(context)
entity.finish!
diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb
index 70e6030ea2c..a44f8fc7193 100644
--- a/lib/bulk_imports/pipeline.rb
+++ b/lib/bulk_imports/pipeline.rb
@@ -3,10 +3,89 @@
module BulkImports
module Pipeline
extend ActiveSupport::Concern
+ include Gitlab::ClassAttributes
included do
- include Attributes
include Runner
+
+ private
+
+ def extractors
+ @extractors ||= self.class.extractors.map(&method(:instantiate))
+ end
+
+ def transformers
+ @transformers ||= self.class.transformers.map(&method(:instantiate))
+ end
+
+ def loaders
+ @loaders ||= self.class.loaders.map(&method(:instantiate))
+ end
+
+ def after_run
+ @after_run ||= self.class.after_run_callback
+ end
+
+ def pipeline
+ @pipeline ||= self.class.name
+ end
+
+ def instantiate(class_config)
+ class_config[:klass].new(class_config[:options])
+ end
+
+ def abort_on_failure?
+ self.class.abort_on_failure?
+ end
+ end
+
+ class_methods do
+ def extractor(klass, options = nil)
+ add_attribute(:extractors, klass, options)
+ end
+
+ def transformer(klass, options = nil)
+ add_attribute(:transformers, klass, options)
+ end
+
+ def loader(klass, options = nil)
+ add_attribute(:loaders, klass, options)
+ end
+
+ def after_run(&block)
+ class_attributes[:after_run] = block
+ end
+
+ def extractors
+ class_attributes[:extractors]
+ end
+
+ def transformers
+ class_attributes[:transformers]
+ end
+
+ def loaders
+ class_attributes[:loaders]
+ end
+
+ def after_run_callback
+ class_attributes[:after_run]
+ end
+
+ def abort_on_failure!
+ class_attributes[:abort_on_failure] = true
+ end
+
+ def abort_on_failure?
+ class_attributes[:abort_on_failure]
+ end
+
+ private
+
+ def add_attribute(sym, klass, options)
+ class_attributes[sym] ||= []
+ class_attributes[sym] << { klass: klass, options: options }
+ end
end
end
end
diff --git a/lib/bulk_imports/pipeline/attributes.rb b/lib/bulk_imports/pipeline/attributes.rb
deleted file mode 100644
index ebfbaf6f6ba..00000000000
--- a/lib/bulk_imports/pipeline/attributes.rb
+++ /dev/null
@@ -1,41 +0,0 @@
-# frozen_string_literal: true
-
-module BulkImports
- module Pipeline
- module Attributes
- extend ActiveSupport::Concern
- include Gitlab::ClassAttributes
-
- class_methods do
- def extractor(klass, options = nil)
- add_attribute(:extractors, klass, options)
- end
-
- def transformer(klass, options = nil)
- add_attribute(:transformers, klass, options)
- end
-
- def loader(klass, options = nil)
- add_attribute(:loaders, klass, options)
- end
-
- def add_attribute(sym, klass, options)
- class_attributes[sym] ||= []
- class_attributes[sym] << { klass: klass, options: options }
- end
-
- def extractors
- class_attributes[:extractors]
- end
-
- def transformers
- class_attributes[:transformers]
- end
-
- def loaders
- class_attributes[:loaders]
- end
- end
- end
- end
-end
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index 04038e50399..88b96f0ab6e 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -5,57 +5,102 @@ module BulkImports
module Runner
extend ActiveSupport::Concern
- included do
- private
+ MarkedAsFailedError = Class.new(StandardError)
- def extractors
- @extractors ||= self.class.extractors.map(&method(:instantiate))
- end
+ def run(context)
+ raise MarkedAsFailedError if marked_as_failed?(context)
- def transformers
- @transformers ||= self.class.transformers.map(&method(:instantiate))
- end
+ info(context, message: 'Pipeline started', pipeline_class: pipeline)
- def loaders
- @loaders ||= self.class.loaders.map(&method(:instantiate))
- end
+ extractors.each do |extractor|
+ data = run_pipeline_step(:extractor, extractor.class.name, context) do
+ extractor.extract(context)
+ end
- def pipeline_name
- @pipeline ||= self.class.name
- end
+ if data && data.respond_to?(:each)
+ data.each do |entry|
+ transformers.each do |transformer|
+ entry = run_pipeline_step(:transformer, transformer.class.name, context) do
+ transformer.transform(context, entry)
+ end
+ end
- def instantiate(class_config)
- class_config[:klass].new(class_config[:options])
+ loaders.each do |loader|
+ run_pipeline_step(:loader, loader.class.name, context) do
+ loader.load(context, entry)
+ end
+ end
+ end
+ end
end
+
+ after_run.call(context) if after_run.present?
+ rescue MarkedAsFailedError
+ log_skip(context)
end
- def run(context)
- info(context, message: "Pipeline started", pipeline: pipeline_name)
+ private # rubocop:disable Lint/UselessAccessModifier
- extractors.each do |extractor|
- extractor.extract(context).each do |entry|
- info(context, extractor: extractor.class.name)
+ def run_pipeline_step(type, class_name, context)
+ raise MarkedAsFailedError if marked_as_failed?(context)
- transformers.each do |transformer|
- info(context, transformer: transformer.class.name)
- entry = transformer.transform(context, entry)
- end
+ info(context, type => class_name)
- loaders.each do |loader|
- info(context, loader: loader.class.name)
- loader.load(context, entry)
- end
- end
- end
+ yield
+ rescue MarkedAsFailedError
+ log_skip(context, type => class_name)
+ rescue => e
+ log_import_failure(e, context)
+
+ mark_as_failed(context) if abort_on_failure?
end
- private # rubocop:disable Lint/UselessAccessModifier
+ def mark_as_failed(context)
+ warn(context, message: 'Pipeline failed', pipeline_class: pipeline)
+
+ context.entity.fail_op!
+ end
+
+ def marked_as_failed?(context)
+ return true if context.entity.failed?
+
+ false
+ end
+
+ def log_skip(context, extra = {})
+ log = {
+ message: 'Skipping due to failed pipeline status',
+ pipeline_class: pipeline
+ }.merge(extra)
+
+ info(context, log)
+ end
+
+ def log_import_failure(exception, context)
+ attributes = {
+ bulk_import_entity_id: context.entity.id,
+ pipeline_class: pipeline,
+ exception_class: exception.class.to_s,
+ exception_message: exception.message.truncate(255),
+ correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
+ }
+
+ BulkImports::Failure.create(attributes)
+ end
+
+ def warn(context, extra = {})
+ logger.warn(log_base_params(context).merge(extra))
+ end
def info(context, extra = {})
- logger.info({
- entity: context.entity.id,
- entity_type: context.entity.source_type
- }.merge(extra))
+ logger.info(log_base_params(context).merge(extra))
+ end
+
+ def log_base_params(context)
+ {
+ bulk_import_entity_id: context.entity.id,
+ bulk_import_entity_type: context.entity.source_type
+ }
end
def logger