diff options
Diffstat (limited to 'lib/bulk_imports')
20 files changed, 263 insertions, 126 deletions
diff --git a/lib/bulk_imports/common/loaders/entity_loader.rb b/lib/bulk_imports/common/loaders/entity_loader.rb deleted file mode 100644 index 8644f3c9dcb..00000000000 --- a/lib/bulk_imports/common/loaders/entity_loader.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Common - module Loaders - class EntityLoader - def initialize(*args); end - - def load(context, entity) - context.bulk_import.entities.create!(entity) - end - end - end - end -end diff --git a/lib/bulk_imports/common/transformers/award_emoji_transformer.rb b/lib/bulk_imports/common/transformers/award_emoji_transformer.rb deleted file mode 100644 index 260b47ab917..00000000000 --- a/lib/bulk_imports/common/transformers/award_emoji_transformer.rb +++ /dev/null @@ -1,27 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Common - module Transformers - class AwardEmojiTransformer - def initialize(*args); end - - def transform(context, data) - user = find_user(context, data&.dig('user', 'public_email')) || context.current_user - - data - .except('user') - .merge('user_id' => user.id) - end - - private - - def find_user(context, email) - return if email.blank? - - context.group.users.find_by_any_email(email, confirmed: true) # rubocop: disable CodeReuse/ActiveRecord - end - end - end - end -end diff --git a/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb b/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb index 858c4c8976b..38e2fc0b1b9 100644 --- a/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb +++ b/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb @@ -14,11 +14,9 @@ module BulkImports /\Aremote_\w+_(url|urls|request_header)\Z/ # carrierwave automatically creates these attribute methods for uploads ).freeze - def initialize(options = {}) - @options = options - end - def transform(context, data) + return unless data + data.each_with_object({}) do |(key, value), result| prohibited = prohibited_key?(key) diff --git a/lib/bulk_imports/common/transformers/user_reference_transformer.rb b/lib/bulk_imports/common/transformers/user_reference_transformer.rb new file mode 100644 index 00000000000..ca077b4ef43 --- /dev/null +++ b/lib/bulk_imports/common/transformers/user_reference_transformer.rb @@ -0,0 +1,39 @@ +# frozen_string_literal: true + +# UserReferenceTransformer replaces specified user +# reference key with a user id being either: +# - A user id found by `public_email` in the group +# - Current user id +# under a new key `"#{@reference}_id"`. +module BulkImports + module Common + module Transformers + class UserReferenceTransformer + DEFAULT_REFERENCE = 'user' + + def initialize(options = {}) + @reference = options[:reference] || DEFAULT_REFERENCE + @suffixed_reference = "#{@reference}_id" + end + + def transform(context, data) + return unless data + + user = find_user(context, data&.dig(@reference, 'public_email')) || context.current_user + + data + .except(@reference) + .merge(@suffixed_reference => user.id) + end + + private + + def find_user(context, email) + return if email.blank? + + context.group.users.find_by_any_email(email, confirmed: true) # rubocop: disable CodeReuse/ActiveRecord + end + end + end + end +end diff --git a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb index b01fb6f68ac..e5e2b9fdbd4 100644 --- a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb +++ b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb @@ -4,8 +4,6 @@ module BulkImports module Groups module Extractors class SubgroupsExtractor - def initialize(*args); end - def extract(context) encoded_parent_path = ERB::Util.url_encode(context.entity.source_full_path) diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb index d1fe791c2ce..23efbc33581 100644 --- a/lib/bulk_imports/groups/graphql/get_labels_query.rb +++ b/lib/bulk_imports/groups/graphql/get_labels_query.rb @@ -10,7 +10,7 @@ module BulkImports <<-'GRAPHQL' query ($full_path: ID!, $cursor: String) { group(fullPath: $full_path) { - labels(first: 100, after: $cursor) { + labels(first: 100, after: $cursor, onlyGroupLabels: true) { page_info: pageInfo { end_cursor: endCursor has_next_page: hasNextPage @@ -19,6 +19,8 @@ module BulkImports title description color + created_at: createdAt + updated_at: updatedAt } } } diff --git a/lib/bulk_imports/groups/graphql/get_milestones_query.rb b/lib/bulk_imports/groups/graphql/get_milestones_query.rb new file mode 100644 index 00000000000..2ade87e6fa0 --- /dev/null +++ b/lib/bulk_imports/groups/graphql/get_milestones_query.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Graphql + module GetMilestonesQuery + extend self + + def to_s + <<-'GRAPHQL' + query ($full_path: ID!, $cursor: String) { + group(fullPath: $full_path) { + milestones(first: 100, after: $cursor, includeDescendants: false) { + page_info: pageInfo { + end_cursor: endCursor + has_next_page: hasNextPage + } + nodes { + title + description + state + start_date: startDate + due_date: dueDate + created_at: createdAt + updated_at: updatedAt + } + } + } + } + GRAPHQL + end + + def variables(context) + { + full_path: context.entity.source_full_path, + cursor: context.entity.next_page_for(:milestones) + } + end + + def base_path + %w[data group milestones] + end + + def data_path + base_path << 'nodes' + end + + def page_info_path + base_path << 'page_info' + end + end + end + end +end diff --git a/lib/bulk_imports/groups/loaders/group_loader.rb b/lib/bulk_imports/groups/loaders/group_loader.rb index 386fc695182..a631685c2ad 100644 --- a/lib/bulk_imports/groups/loaders/group_loader.rb +++ b/lib/bulk_imports/groups/loaders/group_loader.rb @@ -4,10 +4,6 @@ module BulkImports module Groups module Loaders class GroupLoader - def initialize(options = {}) - @options = options - end - def load(context, data) return unless user_can_create_group?(context.current_user, data) diff --git a/lib/bulk_imports/groups/loaders/labels_loader.rb b/lib/bulk_imports/groups/loaders/labels_loader.rb deleted file mode 100644 index b8c9ba9609c..00000000000 --- a/lib/bulk_imports/groups/loaders/labels_loader.rb +++ /dev/null @@ -1,15 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Groups - module Loaders - class LabelsLoader - def initialize(*); end - - def load(context, data) - Labels::CreateService.new(data).execute(group: context.group) - end - end - end - end -end diff --git a/lib/bulk_imports/groups/loaders/members_loader.rb b/lib/bulk_imports/groups/loaders/members_loader.rb deleted file mode 100644 index ccf44b31aee..00000000000 --- a/lib/bulk_imports/groups/loaders/members_loader.rb +++ /dev/null @@ -1,17 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Groups - module Loaders - class MembersLoader - def initialize(*); end - - def load(context, data) - return unless data - - context.group.members.create!(data) - end - end - end - end -end diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb index 40dab9b444c..9f8b8682751 100644 --- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -11,7 +11,9 @@ module BulkImports transformer Common::Transformers::ProhibitedAttributesTransformer - loader BulkImports::Groups::Loaders::LabelsLoader + def load(context, data) + Labels::CreateService.new(data).execute(group: context.group) + end def after_run(extracted_data) context.entity.update_tracker_for( diff --git a/lib/bulk_imports/groups/pipelines/members_pipeline.rb b/lib/bulk_imports/groups/pipelines/members_pipeline.rb index b00c4c1a659..32fc931e8c3 100644 --- a/lib/bulk_imports/groups/pipelines/members_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/members_pipeline.rb @@ -12,7 +12,11 @@ module BulkImports transformer Common::Transformers::ProhibitedAttributesTransformer transformer BulkImports::Groups::Transformers::MemberAttributesTransformer - loader BulkImports::Groups::Loaders::MembersLoader + def load(context, data) + return unless data + + context.group.members.create!(data) + end def after_run(extracted_data) context.entity.update_tracker_for( diff --git a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb new file mode 100644 index 00000000000..8497162e0e7 --- /dev/null +++ b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb @@ -0,0 +1,42 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Pipelines + class MilestonesPipeline + include Pipeline + + extractor BulkImports::Common::Extractors::GraphqlExtractor, + query: BulkImports::Groups::Graphql::GetMilestonesQuery + + transformer Common::Transformers::ProhibitedAttributesTransformer + + def load(context, data) + return unless data + + raise ::BulkImports::Pipeline::NotAllowedError unless authorized? + + context.group.milestones.create!(data) + end + + def after_run(extracted_data) + context.entity.update_tracker_for( + relation: :milestones, + has_next_page: extracted_data.has_next_page?, + next_page: extracted_data.next_page + ) + + if extracted_data.has_next_page? + run + end + end + + private + + def authorized? + context.current_user.can?(:admin_milestone, context.group) + end + end + end + end +end diff --git a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb index d7e1a118d0b..c47a8bd1daa 100644 --- a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb @@ -9,7 +9,10 @@ module BulkImports extractor BulkImports::Groups::Extractors::SubgroupsExtractor transformer Common::Transformers::ProhibitedAttributesTransformer transformer BulkImports::Groups::Transformers::SubgroupToEntityTransformer - loader BulkImports::Common::Loaders::EntityLoader + + def load(context, data) + context.bulk_import.entities.create!(data) + end end end end diff --git a/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb b/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb index 7de9a430421..23e898a7bb2 100644 --- a/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb +++ b/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb @@ -4,10 +4,6 @@ module BulkImports module Groups module Transformers class GroupAttributesTransformer - def initialize(options = {}) - @options = options - end - def transform(context, data) import_entity = context.entity @@ -39,12 +35,11 @@ module BulkImports end def transform_parent(context, import_entity, data) - current_user = context.current_user - namespace = Namespace.find_by_full_path(import_entity.destination_namespace) - - return data if namespace == current_user.namespace + unless import_entity.destination_namespace.blank? + namespace = Namespace.find_by_full_path(import_entity.destination_namespace) + data['parent_id'] = namespace.id + end - data['parent_id'] = namespace.id data end diff --git a/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb b/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb index 622f5b60ffe..e92c898171a 100644 --- a/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb +++ b/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb @@ -4,8 +4,6 @@ module BulkImports module Groups module Transformers class MemberAttributesTransformer - def initialize(*); end - def transform(context, data) data .then { |data| add_user(data) } diff --git a/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb b/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb index 6c3c299c2d2..676a6ca8d2a 100644 --- a/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb +++ b/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb @@ -4,8 +4,6 @@ module BulkImports module Groups module Transformers class SubgroupToEntityTransformer - def initialize(*args); end - def transform(context, entry) { source_type: :group_entity, diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb index f967b7ad7ab..f016b552fd4 100644 --- a/lib/bulk_imports/importers/group_importer.rb +++ b/lib/bulk_imports/importers/group_importer.rb @@ -24,7 +24,8 @@ module BulkImports BulkImports::Groups::Pipelines::GroupPipeline, BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, BulkImports::Groups::Pipelines::MembersPipeline, - BulkImports::Groups::Pipelines::LabelsPipeline + BulkImports::Groups::Pipelines::LabelsPipeline, + BulkImports::Groups::Pipelines::MilestonesPipeline ] end end diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index 1d55ad95887..14445162737 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -3,9 +3,14 @@ module BulkImports module Pipeline extend ActiveSupport::Concern + include Gitlab::Utils::StrongMemoize include Gitlab::ClassAttributes include Runner + NotAllowedError = Class.new(StandardError) + + CACHE_KEY_EXPIRATION = 2.hours + def initialize(context) @context = context end @@ -15,16 +20,83 @@ module BulkImports attr_reader :context + # Fetch pipeline extractor. + # An extractor is defined either by instance `#extract(context)` method + # or by using `extractor` DSL. + # + # @example + # class MyPipeline + # extractor MyExtractor, foo: :bar + # end + # + # class MyPipeline + # def extract(context) + # puts 'Fetch some data' + # end + # end + # + # If pipeline implements instance method `extract` - use it + # and ignore class `extractor` method implementation. def extractor - @extractor ||= instantiate(self.class.get_extractor) + @extractor ||= self.respond_to?(:extract) ? self : instantiate(self.class.get_extractor) end + # Fetch pipeline transformers. + # + # A transformer can be defined using: + # - `transformer` class method + # - `transform` instance method + # + # Multiple transformers can be defined within a single + # pipeline and run sequentially for each record in the + # following order: + # - Transformers defined using `transformer` class method + # - Instance method `transform` + # + # Instance method `transform` is always the last to run. + # + # @example + # class MyPipeline + # transformer MyTransformerOne, foo: :bar + # transformer MyTransformerTwo, foo: :bar + # + # def transform(context, data) + # # perform transformation here + # end + # end + # + # In the example above `#transform` is the first to run and + # `MyTransformerTwo` method is the last. def transformers - @transformers ||= self.class.transformers.map(&method(:instantiate)) + strong_memoize(:transformers) do + defined_transformers = self.class.transformers.map(&method(:instantiate)) + + transformers = [] + transformers << self if respond_to?(:transform) + transformers.concat(defined_transformers) + transformers + end end + # Fetch pipeline loader. + # A loader is defined either by instance method `#load(context, data)` + # or by using `loader` DSL. + # + # @example + # class MyPipeline + # loader MyLoader, foo: :bar + # end + # + # class MyPipeline + # def load(context, data) + # puts 'Load some data' + # end + # end + # + # If pipeline implements instance method `load` - use it + # and ignore class `loader` method implementation. def loader - @loaders ||= instantiate(self.class.get_loader) + @loader ||= self.respond_to?(:load) ? self : instantiate(self.class.get_loader) end def pipeline @@ -32,7 +104,13 @@ module BulkImports end def instantiate(class_config) - class_config[:klass].new(class_config[:options]) + options = class_config[:options] + + if options + class_config[:klass].new(class_config[:options]) + else + class_config[:klass].new + end end def abort_on_failure? @@ -58,7 +136,7 @@ module BulkImports end def transformers - class_attributes[:transformers] + class_attributes[:transformers] || [] end def get_loader diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index d39f4121b51..e3535e585cc 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -8,7 +8,7 @@ module BulkImports MarkedAsFailedError = Class.new(StandardError) def run - raise MarkedAsFailedError if marked_as_failed? + raise MarkedAsFailedError if context.entity.failed? info(message: 'Pipeline started') @@ -40,7 +40,7 @@ module BulkImports private # rubocop:disable Lint/UselessAccessModifier def run_pipeline_step(step, class_name = nil) - raise MarkedAsFailedError if marked_as_failed? + raise MarkedAsFailedError if context.entity.failed? info(pipeline_step: step, step_class: class_name) @@ -62,24 +62,13 @@ module BulkImports end def mark_as_failed - warn(message: 'Pipeline failed', pipeline_class: pipeline) + warn(message: 'Pipeline failed') context.entity.fail_op! end - def marked_as_failed? - return true if context.entity.failed? - - false - end - def log_skip(extra = {}) - log = { - message: 'Skipping due to failed pipeline status', - pipeline_class: pipeline - }.merge(extra) - - info(log) + info({ message: 'Skipping due to failed pipeline status' }.merge(extra)) end def log_import_failure(exception, step) @@ -92,25 +81,39 @@ module BulkImports correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id } + error( + pipeline_step: step, + exception_class: exception.class.to_s, + exception_message: exception.message + ) + BulkImports::Failure.create(attributes) end + def info(extra = {}) + logger.info(log_params(extra)) + end + def warn(extra = {}) logger.warn(log_params(extra)) end - def info(extra = {}) - logger.info(log_params(extra)) + def error(extra = {}) + logger.error(log_params(extra)) end def log_params(extra) defaults = { + bulk_import_id: context.bulk_import.id, bulk_import_entity_id: context.entity.id, bulk_import_entity_type: context.entity.source_type, - pipeline_class: pipeline + pipeline_class: pipeline, + context_extra: context.extra } - defaults.merge(extra).compact + defaults + .merge(extra) + .reject { |_key, value| value.blank? } end def logger |