diff options
Diffstat (limited to 'lib/bulk_imports')
20 files changed, 412 insertions, 137 deletions
diff --git a/lib/bulk_imports/common/extractors/graphql_extractor.rb b/lib/bulk_imports/common/extractors/graphql_extractor.rb index af274ee1299..cde3d1cad5b 100644 --- a/lib/bulk_imports/common/extractors/graphql_extractor.rb +++ b/lib/bulk_imports/common/extractors/graphql_extractor.rb @@ -4,17 +4,22 @@ module BulkImports module Common module Extractors class GraphqlExtractor - def initialize(query) - @query = query[:query] + def initialize(options = {}) + @query = options[:query] end def extract(context) client = graphql_client(context) - client.execute( + response = client.execute( client.parse(query.to_s), - query.variables(context.entity) + query.variables(context) ).original_hash.deep_dup + + BulkImports::Pipeline::ExtractedData.new( + data: response.dig(*query.data_path), + page_info: response.dig(*query.page_info_path) + ) end private @@ -27,10 +32,6 @@ module BulkImports token: context.configuration.access_token ) end - - def parsed_query - @parsed_query ||= graphql_client.parse(query.to_s) - end end end end diff --git a/lib/bulk_imports/common/loaders/entity_loader.rb b/lib/bulk_imports/common/loaders/entity_loader.rb index 4540b892c88..8644f3c9dcb 100644 --- a/lib/bulk_imports/common/loaders/entity_loader.rb +++ b/lib/bulk_imports/common/loaders/entity_loader.rb @@ -7,7 +7,7 @@ module BulkImports def initialize(*args); end def load(context, entity) - context.entity.bulk_import.entities.create!(entity) + context.bulk_import.entities.create!(entity) end end end diff --git a/lib/bulk_imports/common/transformers/award_emoji_transformer.rb b/lib/bulk_imports/common/transformers/award_emoji_transformer.rb new file mode 100644 index 00000000000..260b47ab917 --- /dev/null +++ b/lib/bulk_imports/common/transformers/award_emoji_transformer.rb @@ -0,0 +1,27 @@ +# frozen_string_literal: true + +module BulkImports + module Common + module Transformers + class AwardEmojiTransformer + def initialize(*args); end + + def transform(context, data) + user = find_user(context, data&.dig('user', 'public_email')) || context.current_user + + data + .except('user') + .merge('user_id' => user.id) + end + + private + + def find_user(context, email) + return if email.blank? + + context.group.users.find_by_any_email(email, confirmed: true) # rubocop: disable CodeReuse/ActiveRecord + end + end + end + end +end diff --git a/lib/bulk_imports/common/transformers/hash_key_digger.rb b/lib/bulk_imports/common/transformers/hash_key_digger.rb deleted file mode 100644 index b4897b5b2bf..00000000000 --- a/lib/bulk_imports/common/transformers/hash_key_digger.rb +++ /dev/null @@ -1,23 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Common - module Transformers - class HashKeyDigger - def initialize(options = {}) - @key_path = options[:key_path] - end - - def transform(_, data) - raise ArgumentError, "Given data must be a Hash" unless data.is_a?(Hash) - - data.dig(*Array.wrap(key_path)) - end - - private - - attr_reader :key_path - end - end - end -end diff --git a/lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb b/lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb deleted file mode 100644 index b32ab28fdbb..00000000000 --- a/lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb +++ /dev/null @@ -1,19 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Common - module Transformers - class UnderscorifyKeysTransformer - def initialize(options = {}) - @options = options - end - - def transform(_, data) - data.deep_transform_keys do |key| - key.to_s.underscore - end - end - end - end - end -end diff --git a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb index 5c5e686cec5..b01fb6f68ac 100644 --- a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb +++ b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb @@ -9,9 +9,11 @@ module BulkImports def extract(context) encoded_parent_path = ERB::Util.url_encode(context.entity.source_full_path) - http_client(context.entity.bulk_import.configuration) + response = http_client(context.configuration) .each_page(:get, "groups/#{encoded_parent_path}/subgroups") .flat_map(&:itself) + + BulkImports::Pipeline::ExtractedData.new(data: response) end private diff --git a/lib/bulk_imports/groups/graphql/get_group_query.rb b/lib/bulk_imports/groups/graphql/get_group_query.rb index 2bc0f60baa2..6852e25c87f 100644 --- a/lib/bulk_imports/groups/graphql/get_group_query.rb +++ b/lib/bulk_imports/groups/graphql/get_group_query.rb @@ -12,25 +12,37 @@ module BulkImports group(fullPath: $full_path) { name path - fullPath + full_path: fullPath description visibility - emailsDisabled - lfsEnabled - mentionsDisabled - projectCreationLevel - requestAccessEnabled - requireTwoFactorAuthentication - shareWithGroupLock - subgroupCreationLevel - twoFactorGracePeriod + emails_disabled: emailsDisabled + lfs_enabled: lfsEnabled + mentions_disabled: mentionsDisabled + project_creation_level: projectCreationLevel + request_access_enabled: requestAccessEnabled + require_two_factor_authentication: requireTwoFactorAuthentication + share_with_group_lock: shareWithGroupLock + subgroup_creation_level: subgroupCreationLevel + two_factor_grace_period: twoFactorGracePeriod } } GRAPHQL end - def variables(entity) - { full_path: entity.source_full_path } + def variables(context) + { full_path: context.entity.source_full_path } + end + + def base_path + %w[data group] + end + + def data_path + base_path + end + + def page_info_path + base_path << 'page_info' end end end diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb new file mode 100644 index 00000000000..d1fe791c2ce --- /dev/null +++ b/lib/bulk_imports/groups/graphql/get_labels_query.rb @@ -0,0 +1,50 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Graphql + module GetLabelsQuery + extend self + + def to_s + <<-'GRAPHQL' + query ($full_path: ID!, $cursor: String) { + group(fullPath: $full_path) { + labels(first: 100, after: $cursor) { + page_info: pageInfo { + end_cursor: endCursor + has_next_page: hasNextPage + } + nodes { + title + description + color + } + } + } + } + GRAPHQL + end + + def variables(context) + { + full_path: context.entity.source_full_path, + cursor: context.entity.next_page_for(:labels) + } + end + + def base_path + %w[data group labels] + end + + def data_path + base_path << 'nodes' + end + + def page_info_path + base_path << 'page_info' + end + end + end + end +end diff --git a/lib/bulk_imports/groups/graphql/get_members_query.rb b/lib/bulk_imports/groups/graphql/get_members_query.rb new file mode 100644 index 00000000000..e3a78124a47 --- /dev/null +++ b/lib/bulk_imports/groups/graphql/get_members_query.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Graphql + module GetMembersQuery + extend self + def to_s + <<-'GRAPHQL' + query($full_path: ID!, $cursor: String) { + group(fullPath: $full_path) { + group_members: groupMembers(relations: DIRECT, first: 100, after: $cursor) { + page_info: pageInfo { + end_cursor: endCursor + has_next_page: hasNextPage + } + nodes { + created_at: createdAt + updated_at: updatedAt + expires_at: expiresAt + access_level: accessLevel { + integer_value: integerValue + } + user { + public_email: publicEmail + } + } + } + } + } + GRAPHQL + end + + def variables(context) + { + full_path: context.entity.source_full_path, + cursor: context.entity.next_page_for(:group_members) + } + end + + def base_path + %w[data group group_members] + end + + def data_path + base_path << 'nodes' + end + + def page_info_path + base_path << 'page_info' + end + end + end + end +end diff --git a/lib/bulk_imports/groups/loaders/labels_loader.rb b/lib/bulk_imports/groups/loaders/labels_loader.rb new file mode 100644 index 00000000000..b8c9ba9609c --- /dev/null +++ b/lib/bulk_imports/groups/loaders/labels_loader.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Loaders + class LabelsLoader + def initialize(*); end + + def load(context, data) + Labels::CreateService.new(data).execute(group: context.group) + end + end + end + end +end diff --git a/lib/bulk_imports/groups/loaders/members_loader.rb b/lib/bulk_imports/groups/loaders/members_loader.rb new file mode 100644 index 00000000000..ccf44b31aee --- /dev/null +++ b/lib/bulk_imports/groups/loaders/members_loader.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Loaders + class MembersLoader + def initialize(*); end + + def load(context, data) + return unless data + + context.group.members.create!(data) + end + end + end + end +end diff --git a/lib/bulk_imports/groups/pipelines/group_pipeline.rb b/lib/bulk_imports/groups/pipelines/group_pipeline.rb index 5169e292180..8c6f089e8a4 100644 --- a/lib/bulk_imports/groups/pipelines/group_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/group_pipeline.rb @@ -10,8 +10,6 @@ module BulkImports extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery - transformer Common::Transformers::HashKeyDigger, key_path: %w[data group] - transformer Common::Transformers::UnderscorifyKeysTransformer transformer Common::Transformers::ProhibitedAttributesTransformer transformer Groups::Transformers::GroupAttributesTransformer diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb new file mode 100644 index 00000000000..40dab9b444c --- /dev/null +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -0,0 +1,30 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Pipelines + class LabelsPipeline + include Pipeline + + extractor BulkImports::Common::Extractors::GraphqlExtractor, + query: BulkImports::Groups::Graphql::GetLabelsQuery + + transformer Common::Transformers::ProhibitedAttributesTransformer + + loader BulkImports::Groups::Loaders::LabelsLoader + + def after_run(extracted_data) + context.entity.update_tracker_for( + relation: :labels, + has_next_page: extracted_data.has_next_page?, + next_page: extracted_data.next_page + ) + + if extracted_data.has_next_page? + run + end + end + end + end + end +end diff --git a/lib/bulk_imports/groups/pipelines/members_pipeline.rb b/lib/bulk_imports/groups/pipelines/members_pipeline.rb new file mode 100644 index 00000000000..b00c4c1a659 --- /dev/null +++ b/lib/bulk_imports/groups/pipelines/members_pipeline.rb @@ -0,0 +1,31 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Pipelines + class MembersPipeline + include Pipeline + + extractor BulkImports::Common::Extractors::GraphqlExtractor, + query: BulkImports::Groups::Graphql::GetMembersQuery + + transformer Common::Transformers::ProhibitedAttributesTransformer + transformer BulkImports::Groups::Transformers::MemberAttributesTransformer + + loader BulkImports::Groups::Loaders::MembersLoader + + def after_run(extracted_data) + context.entity.update_tracker_for( + relation: :group_members, + has_next_page: extracted_data.has_next_page?, + next_page: extracted_data.next_page + ) + + if extracted_data.has_next_page? + run + end + end + end + end + end +end diff --git a/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb b/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb new file mode 100644 index 00000000000..622f5b60ffe --- /dev/null +++ b/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb @@ -0,0 +1,56 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Transformers + class MemberAttributesTransformer + def initialize(*); end + + def transform(context, data) + data + .then { |data| add_user(data) } + .then { |data| add_access_level(data) } + .then { |data| add_author(data, context) } + end + + private + + def add_user(data) + user = find_user(data&.dig('user', 'public_email')) + + return unless user + + data + .except('user') + .merge('user_id' => user.id) + end + + def find_user(email) + return unless email + + User.find_by_any_email(email, confirmed: true) + end + + def add_access_level(data) + access_level = data&.dig('access_level', 'integer_value') + + return unless valid_access_level?(access_level) + + data.merge('access_level' => access_level) + end + + def valid_access_level?(access_level) + Gitlab::Access + .options_with_owner + .value?(access_level) + end + + def add_author(data, context) + return unless data + + data.merge('created_by_id' => context.current_user.id) + end + end + end + end +end diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb index 6e1b86e9515..f967b7ad7ab 100644 --- a/lib/bulk_imports/importers/group_importer.rb +++ b/lib/bulk_imports/importers/group_importer.rb @@ -8,16 +8,9 @@ module BulkImports end def execute - bulk_import = entity.bulk_import - configuration = bulk_import.configuration + context = BulkImports::Pipeline::Context.new(entity) - context = BulkImports::Pipeline::Context.new( - current_user: bulk_import.user, - entity: entity, - configuration: configuration - ) - - pipelines.each { |pipeline| pipeline.new.run(context) } + pipelines.each { |pipeline| pipeline.new(context).run } entity.finish! end @@ -29,7 +22,9 @@ module BulkImports def pipelines [ BulkImports::Groups::Pipelines::GroupPipeline, - BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline + BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, + BulkImports::Groups::Pipelines::MembersPipeline, + BulkImports::Groups::Pipelines::LabelsPipeline ] end end diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index 06b81b5da14..1d55ad95887 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -4,12 +4,17 @@ module BulkImports module Pipeline extend ActiveSupport::Concern include Gitlab::ClassAttributes + include Runner - included do - include Runner + def initialize(context) + @context = context + end + included do private + attr_reader :context + def extractor @extractor ||= instantiate(self.class.get_extractor) end @@ -22,10 +27,6 @@ module BulkImports @loaders ||= instantiate(self.class.get_loader) end - def after_run - @after_run ||= self.class.after_run_callback - end - def pipeline @pipeline ||= self.class.name end @@ -52,10 +53,6 @@ module BulkImports class_attributes[:loader] = { klass: klass, options: options } end - def after_run(&block) - class_attributes[:after_run] = block - end - def get_extractor class_attributes[:extractor] end @@ -68,10 +65,6 @@ module BulkImports class_attributes[:loader] end - def after_run_callback - class_attributes[:after_run] - end - def abort_on_failure! class_attributes[:abort_on_failure] = true end diff --git a/lib/bulk_imports/pipeline/context.rb b/lib/bulk_imports/pipeline/context.rb index ad19f5cad7d..dd121b2dbed 100644 --- a/lib/bulk_imports/pipeline/context.rb +++ b/lib/bulk_imports/pipeline/context.rb @@ -3,30 +3,25 @@ module BulkImports module Pipeline class Context - include Gitlab::Utils::LazyAttributes + attr_reader :entity, :bulk_import + attr_accessor :extra - Attribute = Struct.new(:name, :type) - - PIPELINE_ATTRIBUTES = [ - Attribute.new(:current_user, User), - Attribute.new(:entity, ::BulkImports::Entity), - Attribute.new(:configuration, ::BulkImports::Configuration) - ].freeze - - def initialize(args) - assign_attributes(args) + def initialize(entity, extra = {}) + @entity = entity + @bulk_import = entity.bulk_import + @extra = extra end - private + def group + entity.group + end - PIPELINE_ATTRIBUTES.each do |attr| - lazy_attr_reader attr.name, type: attr.type + def current_user + bulk_import.user end - def assign_attributes(values) - values.slice(*PIPELINE_ATTRIBUTES.map(&:name)).each do |name, value| - instance_variable_set("@#{name}", value) - end + def configuration + bulk_import.configuration end end end diff --git a/lib/bulk_imports/pipeline/extracted_data.rb b/lib/bulk_imports/pipeline/extracted_data.rb new file mode 100644 index 00000000000..685a91a4afe --- /dev/null +++ b/lib/bulk_imports/pipeline/extracted_data.rb @@ -0,0 +1,26 @@ +# frozen_string_literal: true + +module BulkImports + module Pipeline + class ExtractedData + attr_reader :data + + def initialize(data: nil, page_info: {}) + @data = Array.wrap(data) + @page_info = page_info + end + + def has_next_page? + @page_info['has_next_page'] + end + + def next_page + @page_info['end_cursor'] + end + + def each(&block) + data.each(&block) + end + end + end +end diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index 11fb9722173..d39f4121b51 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -7,75 +7,86 @@ module BulkImports MarkedAsFailedError = Class.new(StandardError) - def run(context) - raise MarkedAsFailedError if marked_as_failed?(context) + def run + raise MarkedAsFailedError if marked_as_failed? - info(context, message: 'Pipeline started', pipeline_class: pipeline) + info(message: 'Pipeline started') - Array.wrap(extracted_data_from(context)).each do |entry| + extracted_data = extracted_data_from + + extracted_data&.each do |entry| transformers.each do |transformer| - entry = run_pipeline_step(:transformer, transformer.class.name, context) do + entry = run_pipeline_step(:transformer, transformer.class.name) do transformer.transform(context, entry) end end - run_pipeline_step(:loader, loader.class.name, context) do + run_pipeline_step(:loader, loader.class.name) do loader.load(context, entry) end end - after_run.call(context) if after_run.present? + if respond_to?(:after_run) + run_pipeline_step(:after_run) do + after_run(extracted_data) + end + end + + info(message: 'Pipeline finished') rescue MarkedAsFailedError - log_skip(context) + log_skip end private # rubocop:disable Lint/UselessAccessModifier - def run_pipeline_step(type, class_name, context) - raise MarkedAsFailedError if marked_as_failed?(context) + def run_pipeline_step(step, class_name = nil) + raise MarkedAsFailedError if marked_as_failed? - info(context, type => class_name) + info(pipeline_step: step, step_class: class_name) yield rescue MarkedAsFailedError - log_skip(context, type => class_name) + log_skip(step => class_name) rescue => e - log_import_failure(e, context) + log_import_failure(e, step) - mark_as_failed(context) if abort_on_failure? + mark_as_failed if abort_on_failure? + + nil end - def extracted_data_from(context) - run_pipeline_step(:extractor, extractor.class.name, context) do + def extracted_data_from + run_pipeline_step(:extractor, extractor.class.name) do extractor.extract(context) end end - def mark_as_failed(context) - warn(context, message: 'Pipeline failed', pipeline_class: pipeline) + def mark_as_failed + warn(message: 'Pipeline failed', pipeline_class: pipeline) context.entity.fail_op! end - def marked_as_failed?(context) + def marked_as_failed? return true if context.entity.failed? false end - def log_skip(context, extra = {}) + def log_skip(extra = {}) log = { message: 'Skipping due to failed pipeline status', pipeline_class: pipeline }.merge(extra) - info(context, log) + info(log) end - def log_import_failure(exception, context) + def log_import_failure(exception, step) attributes = { bulk_import_entity_id: context.entity.id, pipeline_class: pipeline, + pipeline_step: step, exception_class: exception.class.to_s, exception_message: exception.message.truncate(255), correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id @@ -84,19 +95,22 @@ module BulkImports BulkImports::Failure.create(attributes) end - def warn(context, extra = {}) - logger.warn(log_base_params(context).merge(extra)) + def warn(extra = {}) + logger.warn(log_params(extra)) end - def info(context, extra = {}) - logger.info(log_base_params(context).merge(extra)) + def info(extra = {}) + logger.info(log_params(extra)) end - def log_base_params(context) - { + def log_params(extra) + defaults = { bulk_import_entity_id: context.entity.id, - bulk_import_entity_type: context.entity.source_type + bulk_import_entity_type: context.entity.source_type, + pipeline_class: pipeline } + + defaults.merge(extra).compact end def logger |