diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2020-12-17 11:59:07 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2020-12-17 11:59:07 +0000 |
commit | 8b573c94895dc0ac0e1d9d59cf3e8745e8b539ca (patch) | |
tree | 544930fb309b30317ae9797a9683768705d664c4 /lib/bulk_imports | |
parent | 4b1de649d0168371549608993deac953eb692019 (diff) | |
download | gitlab-ce-8b573c94895dc0ac0e1d9d59cf3e8745e8b539ca.tar.gz |
Add latest changes from gitlab-org/gitlab@13-7-stable-eev13.7.0-rc42
Diffstat (limited to 'lib/bulk_imports')
11 files changed, 242 insertions, 151 deletions
diff --git a/lib/bulk_imports/common/extractors/graphql_extractor.rb b/lib/bulk_imports/common/extractors/graphql_extractor.rb index 7d58032cfcc..c0cef61d2b2 100644 --- a/lib/bulk_imports/common/extractors/graphql_extractor.rb +++ b/lib/bulk_imports/common/extractors/graphql_extractor.rb @@ -6,15 +6,16 @@ module BulkImports class GraphqlExtractor def initialize(query) @query = query[:query] - @query_string = @query.to_s - @variables = @query.variables end def extract(context) - @context = context + client = graphql_client(context) Enumerator.new do |yielder| - result = graphql_client.execute(parsed_query, query_variables(context.entity)) + result = client.execute( + client.parse(query.to_s), + query.variables(context.entity) + ) yielder << result.original_hash.deep_dup end @@ -22,23 +23,17 @@ module BulkImports private - def graphql_client + attr_reader :query + + def graphql_client(context) @graphql_client ||= BulkImports::Clients::Graphql.new( - url: @context.configuration.url, - token: @context.configuration.access_token + url: context.configuration.url, + token: context.configuration.access_token ) end def parsed_query - @parsed_query ||= graphql_client.parse(@query.to_s) - end - - def query_variables(entity) - return unless @variables - - @variables.transform_values do |entity_attribute| - entity.public_send(entity_attribute) # rubocop:disable GitlabSecurity/PublicSend - end + @parsed_query ||= graphql_client.parse(query.to_s) end end end diff --git a/lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb b/lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb deleted file mode 100644 index dce0fac6999..00000000000 --- a/lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb +++ /dev/null @@ -1,54 +0,0 @@ -# frozen_string_literal: true - -# Cleanup GraphQL original response hash from unnecessary nesting -# 1. Remove ['data']['group'] or ['data']['project'] hash nesting -# 2. Remove ['edges'] & ['nodes'] array wrappings -# 3. Remove ['node'] hash wrapping -# -# @example -# data = {"data"=>{"group"=> { -# "name"=>"test", -# "fullName"=>"test", -# "description"=>"test", -# "labels"=>{"edges"=>[{"node"=>{"title"=>"label1"}}, {"node"=>{"title"=>"label2"}}, {"node"=>{"title"=>"label3"}}]}}}} -# -# BulkImports::Common::Transformers::GraphqlCleanerTransformer.new.transform(nil, data) -# -# {"name"=>"test", "fullName"=>"test", "description"=>"test", "labels"=>[{"title"=>"label1"}, {"title"=>"label2"}, {"title"=>"label3"}]} -module BulkImports - module Common - module Transformers - class GraphqlCleanerTransformer - EDGES = 'edges' - NODE = 'node' - - def initialize(options = {}) - @options = options - end - - def transform(_, data) - return data unless data.is_a?(Hash) - - data = data.dig('data', 'group') || data.dig('data', 'project') || data - - clean_edges_and_nodes(data) - end - - def clean_edges_and_nodes(data) - case data - when Array - data.map(&method(:clean_edges_and_nodes)) - when Hash - if data.key?(NODE) - clean_edges_and_nodes(data[NODE]) - else - data.transform_values { |value| clean_edges_and_nodes(value.try(:fetch, EDGES, value) || value) } - end - else - data - end - end - end - end - end -end diff --git a/lib/bulk_imports/common/transformers/hash_key_digger.rb b/lib/bulk_imports/common/transformers/hash_key_digger.rb new file mode 100644 index 00000000000..b4897b5b2bf --- /dev/null +++ b/lib/bulk_imports/common/transformers/hash_key_digger.rb @@ -0,0 +1,23 @@ +# frozen_string_literal: true + +module BulkImports + module Common + module Transformers + class HashKeyDigger + def initialize(options = {}) + @key_path = options[:key_path] + end + + def transform(_, data) + raise ArgumentError, "Given data must be a Hash" unless data.is_a?(Hash) + + data.dig(*Array.wrap(key_path)) + end + + private + + attr_reader :key_path + end + end + end +end diff --git a/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb b/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb new file mode 100644 index 00000000000..858c4c8976b --- /dev/null +++ b/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +module BulkImports + module Common + module Transformers + class ProhibitedAttributesTransformer + PROHIBITED_REFERENCES = Regexp.union( + /\Acached_markdown_version\Z/, + /\Aid\Z/, + /_id\Z/, + /_ids\Z/, + /_html\Z/, + /attributes/, + /\Aremote_\w+_(url|urls|request_header)\Z/ # carrierwave automatically creates these attribute methods for uploads + ).freeze + + def initialize(options = {}) + @options = options + end + + def transform(context, data) + data.each_with_object({}) do |(key, value), result| + prohibited = prohibited_key?(key) + + unless prohibited + result[key] = value.is_a?(Hash) ? transform(context, value) : value + end + end + end + + private + + def prohibited_key?(key) + key.to_s =~ PROHIBITED_REFERENCES + end + end + end + end +end diff --git a/lib/bulk_imports/groups/graphql/get_group_query.rb b/lib/bulk_imports/groups/graphql/get_group_query.rb index c50b99aae4e..2bc0f60baa2 100644 --- a/lib/bulk_imports/groups/graphql/get_group_query.rb +++ b/lib/bulk_imports/groups/graphql/get_group_query.rb @@ -29,8 +29,8 @@ module BulkImports GRAPHQL end - def variables - { full_path: :source_full_path } + def variables(entity) + { full_path: entity.source_full_path } end end end diff --git a/lib/bulk_imports/groups/pipelines/group_pipeline.rb b/lib/bulk_imports/groups/pipelines/group_pipeline.rb index 2b7d0ef7658..5169e292180 100644 --- a/lib/bulk_imports/groups/pipelines/group_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/group_pipeline.rb @@ -6,10 +6,13 @@ module BulkImports class GroupPipeline include Pipeline + abort_on_failure! + extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery - transformer Common::Transformers::GraphqlCleanerTransformer + transformer Common::Transformers::HashKeyDigger, key_path: %w[data group] transformer Common::Transformers::UnderscorifyKeysTransformer + transformer Common::Transformers::ProhibitedAttributesTransformer transformer Groups::Transformers::GroupAttributesTransformer loader Groups::Loaders::GroupLoader diff --git a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb index 6384e9d5972..d7e1a118d0b 100644 --- a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb @@ -7,6 +7,7 @@ module BulkImports include Pipeline extractor BulkImports::Groups::Extractors::SubgroupsExtractor + transformer Common::Transformers::ProhibitedAttributesTransformer transformer BulkImports::Groups::Transformers::SubgroupToEntityTransformer loader BulkImports::Common::Loaders::EntityLoader end diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb index c7253590c87..82cb1ca03a2 100644 --- a/lib/bulk_imports/importers/group_importer.rb +++ b/lib/bulk_imports/importers/group_importer.rb @@ -19,6 +19,7 @@ module BulkImports ) BulkImports::Groups::Pipelines::GroupPipeline.new.run(context) + 'BulkImports::EE::Groups::Pipelines::EpicsPipeline'.constantize.new.run(context) if Gitlab.ee? BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline.new.run(context) entity.finish! diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index 70e6030ea2c..a44f8fc7193 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -3,10 +3,89 @@ module BulkImports module Pipeline extend ActiveSupport::Concern + include Gitlab::ClassAttributes included do - include Attributes include Runner + + private + + def extractors + @extractors ||= self.class.extractors.map(&method(:instantiate)) + end + + def transformers + @transformers ||= self.class.transformers.map(&method(:instantiate)) + end + + def loaders + @loaders ||= self.class.loaders.map(&method(:instantiate)) + end + + def after_run + @after_run ||= self.class.after_run_callback + end + + def pipeline + @pipeline ||= self.class.name + end + + def instantiate(class_config) + class_config[:klass].new(class_config[:options]) + end + + def abort_on_failure? + self.class.abort_on_failure? + end + end + + class_methods do + def extractor(klass, options = nil) + add_attribute(:extractors, klass, options) + end + + def transformer(klass, options = nil) + add_attribute(:transformers, klass, options) + end + + def loader(klass, options = nil) + add_attribute(:loaders, klass, options) + end + + def after_run(&block) + class_attributes[:after_run] = block + end + + def extractors + class_attributes[:extractors] + end + + def transformers + class_attributes[:transformers] + end + + def loaders + class_attributes[:loaders] + end + + def after_run_callback + class_attributes[:after_run] + end + + def abort_on_failure! + class_attributes[:abort_on_failure] = true + end + + def abort_on_failure? + class_attributes[:abort_on_failure] + end + + private + + def add_attribute(sym, klass, options) + class_attributes[sym] ||= [] + class_attributes[sym] << { klass: klass, options: options } + end end end end diff --git a/lib/bulk_imports/pipeline/attributes.rb b/lib/bulk_imports/pipeline/attributes.rb deleted file mode 100644 index ebfbaf6f6ba..00000000000 --- a/lib/bulk_imports/pipeline/attributes.rb +++ /dev/null @@ -1,41 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Pipeline - module Attributes - extend ActiveSupport::Concern - include Gitlab::ClassAttributes - - class_methods do - def extractor(klass, options = nil) - add_attribute(:extractors, klass, options) - end - - def transformer(klass, options = nil) - add_attribute(:transformers, klass, options) - end - - def loader(klass, options = nil) - add_attribute(:loaders, klass, options) - end - - def add_attribute(sym, klass, options) - class_attributes[sym] ||= [] - class_attributes[sym] << { klass: klass, options: options } - end - - def extractors - class_attributes[:extractors] - end - - def transformers - class_attributes[:transformers] - end - - def loaders - class_attributes[:loaders] - end - end - end - end -end diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 04038e50399..88b96f0ab6e 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -5,57 +5,102 @@ module BulkImports module Runner extend ActiveSupport::Concern - included do - private + MarkedAsFailedError = Class.new(StandardError) - def extractors - @extractors ||= self.class.extractors.map(&method(:instantiate)) - end + def run(context) + raise MarkedAsFailedError if marked_as_failed?(context) - def transformers - @transformers ||= self.class.transformers.map(&method(:instantiate)) - end + info(context, message: 'Pipeline started', pipeline_class: pipeline) - def loaders - @loaders ||= self.class.loaders.map(&method(:instantiate)) - end + extractors.each do |extractor| + data = run_pipeline_step(:extractor, extractor.class.name, context) do + extractor.extract(context) + end - def pipeline_name - @pipeline ||= self.class.name - end + if data && data.respond_to?(:each) + data.each do |entry| + transformers.each do |transformer| + entry = run_pipeline_step(:transformer, transformer.class.name, context) do + transformer.transform(context, entry) + end + end - def instantiate(class_config) - class_config[:klass].new(class_config[:options]) + loaders.each do |loader| + run_pipeline_step(:loader, loader.class.name, context) do + loader.load(context, entry) + end + end + end + end end + + after_run.call(context) if after_run.present? + rescue MarkedAsFailedError + log_skip(context) end - def run(context) - info(context, message: "Pipeline started", pipeline: pipeline_name) + private # rubocop:disable Lint/UselessAccessModifier - extractors.each do |extractor| - extractor.extract(context).each do |entry| - info(context, extractor: extractor.class.name) + def run_pipeline_step(type, class_name, context) + raise MarkedAsFailedError if marked_as_failed?(context) - transformers.each do |transformer| - info(context, transformer: transformer.class.name) - entry = transformer.transform(context, entry) - end + info(context, type => class_name) - loaders.each do |loader| - info(context, loader: loader.class.name) - loader.load(context, entry) - end - end - end + yield + rescue MarkedAsFailedError + log_skip(context, type => class_name) + rescue => e + log_import_failure(e, context) + + mark_as_failed(context) if abort_on_failure? end - private # rubocop:disable Lint/UselessAccessModifier + def mark_as_failed(context) + warn(context, message: 'Pipeline failed', pipeline_class: pipeline) + + context.entity.fail_op! + end + + def marked_as_failed?(context) + return true if context.entity.failed? + + false + end + + def log_skip(context, extra = {}) + log = { + message: 'Skipping due to failed pipeline status', + pipeline_class: pipeline + }.merge(extra) + + info(context, log) + end + + def log_import_failure(exception, context) + attributes = { + bulk_import_entity_id: context.entity.id, + pipeline_class: pipeline, + exception_class: exception.class.to_s, + exception_message: exception.message.truncate(255), + correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id + } + + BulkImports::Failure.create(attributes) + end + + def warn(context, extra = {}) + logger.warn(log_base_params(context).merge(extra)) + end def info(context, extra = {}) - logger.info({ - entity: context.entity.id, - entity_type: context.entity.source_type - }.merge(extra)) + logger.info(log_base_params(context).merge(extra)) + end + + def log_base_params(context) + { + bulk_import_entity_id: context.entity.id, + bulk_import_entity_type: context.entity.source_type + } end def logger |