summaryrefslogtreecommitdiff
path: root/lib/bulk_imports
diff options
context:
space:
mode:
Diffstat (limited to 'lib/bulk_imports')
-rw-r--r--lib/bulk_imports/common/loaders/entity_loader.rb15
-rw-r--r--lib/bulk_imports/common/transformers/award_emoji_transformer.rb27
-rw-r--r--lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb6
-rw-r--r--lib/bulk_imports/common/transformers/user_reference_transformer.rb39
-rw-r--r--lib/bulk_imports/groups/extractors/subgroups_extractor.rb2
-rw-r--r--lib/bulk_imports/groups/graphql/get_labels_query.rb4
-rw-r--r--lib/bulk_imports/groups/graphql/get_milestones_query.rb54
-rw-r--r--lib/bulk_imports/groups/loaders/group_loader.rb4
-rw-r--r--lib/bulk_imports/groups/loaders/labels_loader.rb15
-rw-r--r--lib/bulk_imports/groups/loaders/members_loader.rb17
-rw-r--r--lib/bulk_imports/groups/pipelines/labels_pipeline.rb4
-rw-r--r--lib/bulk_imports/groups/pipelines/members_pipeline.rb6
-rw-r--r--lib/bulk_imports/groups/pipelines/milestones_pipeline.rb42
-rw-r--r--lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb5
-rw-r--r--lib/bulk_imports/groups/transformers/group_attributes_transformer.rb13
-rw-r--r--lib/bulk_imports/groups/transformers/member_attributes_transformer.rb2
-rw-r--r--lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb2
-rw-r--r--lib/bulk_imports/importers/group_importer.rb3
-rw-r--r--lib/bulk_imports/pipeline.rb88
-rw-r--r--lib/bulk_imports/pipeline/runner.rb41
20 files changed, 263 insertions, 126 deletions
diff --git a/lib/bulk_imports/common/loaders/entity_loader.rb b/lib/bulk_imports/common/loaders/entity_loader.rb
deleted file mode 100644
index 8644f3c9dcb..00000000000
--- a/lib/bulk_imports/common/loaders/entity_loader.rb
+++ /dev/null
@@ -1,15 +0,0 @@
-# frozen_string_literal: true
-
-module BulkImports
- module Common
- module Loaders
- class EntityLoader
- def initialize(*args); end
-
- def load(context, entity)
- context.bulk_import.entities.create!(entity)
- end
- end
- end
- end
-end
diff --git a/lib/bulk_imports/common/transformers/award_emoji_transformer.rb b/lib/bulk_imports/common/transformers/award_emoji_transformer.rb
deleted file mode 100644
index 260b47ab917..00000000000
--- a/lib/bulk_imports/common/transformers/award_emoji_transformer.rb
+++ /dev/null
@@ -1,27 +0,0 @@
-# frozen_string_literal: true
-
-module BulkImports
- module Common
- module Transformers
- class AwardEmojiTransformer
- def initialize(*args); end
-
- def transform(context, data)
- user = find_user(context, data&.dig('user', 'public_email')) || context.current_user
-
- data
- .except('user')
- .merge('user_id' => user.id)
- end
-
- private
-
- def find_user(context, email)
- return if email.blank?
-
- context.group.users.find_by_any_email(email, confirmed: true) # rubocop: disable CodeReuse/ActiveRecord
- end
- end
- end
- end
-end
diff --git a/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb b/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb
index 858c4c8976b..38e2fc0b1b9 100644
--- a/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb
+++ b/lib/bulk_imports/common/transformers/prohibited_attributes_transformer.rb
@@ -14,11 +14,9 @@ module BulkImports
/\Aremote_\w+_(url|urls|request_header)\Z/ # carrierwave automatically creates these attribute methods for uploads
).freeze
- def initialize(options = {})
- @options = options
- end
-
def transform(context, data)
+ return unless data
+
data.each_with_object({}) do |(key, value), result|
prohibited = prohibited_key?(key)
diff --git a/lib/bulk_imports/common/transformers/user_reference_transformer.rb b/lib/bulk_imports/common/transformers/user_reference_transformer.rb
new file mode 100644
index 00000000000..ca077b4ef43
--- /dev/null
+++ b/lib/bulk_imports/common/transformers/user_reference_transformer.rb
@@ -0,0 +1,39 @@
+# frozen_string_literal: true
+
+# UserReferenceTransformer replaces specified user
+# reference key with a user id being either:
+# - A user id found by `public_email` in the group
+# - Current user id
+# under a new key `"#{@reference}_id"`.
+module BulkImports
+ module Common
+ module Transformers
+ class UserReferenceTransformer
+ DEFAULT_REFERENCE = 'user'
+
+ def initialize(options = {})
+ @reference = options[:reference] || DEFAULT_REFERENCE
+ @suffixed_reference = "#{@reference}_id"
+ end
+
+ def transform(context, data)
+ return unless data
+
+ user = find_user(context, data&.dig(@reference, 'public_email')) || context.current_user
+
+ data
+ .except(@reference)
+ .merge(@suffixed_reference => user.id)
+ end
+
+ private
+
+ def find_user(context, email)
+ return if email.blank?
+
+ context.group.users.find_by_any_email(email, confirmed: true) # rubocop: disable CodeReuse/ActiveRecord
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb
index b01fb6f68ac..e5e2b9fdbd4 100644
--- a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb
+++ b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb
@@ -4,8 +4,6 @@ module BulkImports
module Groups
module Extractors
class SubgroupsExtractor
- def initialize(*args); end
-
def extract(context)
encoded_parent_path = ERB::Util.url_encode(context.entity.source_full_path)
diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb
index d1fe791c2ce..23efbc33581 100644
--- a/lib/bulk_imports/groups/graphql/get_labels_query.rb
+++ b/lib/bulk_imports/groups/graphql/get_labels_query.rb
@@ -10,7 +10,7 @@ module BulkImports
<<-'GRAPHQL'
query ($full_path: ID!, $cursor: String) {
group(fullPath: $full_path) {
- labels(first: 100, after: $cursor) {
+ labels(first: 100, after: $cursor, onlyGroupLabels: true) {
page_info: pageInfo {
end_cursor: endCursor
has_next_page: hasNextPage
@@ -19,6 +19,8 @@ module BulkImports
title
description
color
+ created_at: createdAt
+ updated_at: updatedAt
}
}
}
diff --git a/lib/bulk_imports/groups/graphql/get_milestones_query.rb b/lib/bulk_imports/groups/graphql/get_milestones_query.rb
new file mode 100644
index 00000000000..2ade87e6fa0
--- /dev/null
+++ b/lib/bulk_imports/groups/graphql/get_milestones_query.rb
@@ -0,0 +1,54 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Groups
+ module Graphql
+ module GetMilestonesQuery
+ extend self
+
+ def to_s
+ <<-'GRAPHQL'
+ query ($full_path: ID!, $cursor: String) {
+ group(fullPath: $full_path) {
+ milestones(first: 100, after: $cursor, includeDescendants: false) {
+ page_info: pageInfo {
+ end_cursor: endCursor
+ has_next_page: hasNextPage
+ }
+ nodes {
+ title
+ description
+ state
+ start_date: startDate
+ due_date: dueDate
+ created_at: createdAt
+ updated_at: updatedAt
+ }
+ }
+ }
+ }
+ GRAPHQL
+ end
+
+ def variables(context)
+ {
+ full_path: context.entity.source_full_path,
+ cursor: context.entity.next_page_for(:milestones)
+ }
+ end
+
+ def base_path
+ %w[data group milestones]
+ end
+
+ def data_path
+ base_path << 'nodes'
+ end
+
+ def page_info_path
+ base_path << 'page_info'
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/loaders/group_loader.rb b/lib/bulk_imports/groups/loaders/group_loader.rb
index 386fc695182..a631685c2ad 100644
--- a/lib/bulk_imports/groups/loaders/group_loader.rb
+++ b/lib/bulk_imports/groups/loaders/group_loader.rb
@@ -4,10 +4,6 @@ module BulkImports
module Groups
module Loaders
class GroupLoader
- def initialize(options = {})
- @options = options
- end
-
def load(context, data)
return unless user_can_create_group?(context.current_user, data)
diff --git a/lib/bulk_imports/groups/loaders/labels_loader.rb b/lib/bulk_imports/groups/loaders/labels_loader.rb
deleted file mode 100644
index b8c9ba9609c..00000000000
--- a/lib/bulk_imports/groups/loaders/labels_loader.rb
+++ /dev/null
@@ -1,15 +0,0 @@
-# frozen_string_literal: true
-
-module BulkImports
- module Groups
- module Loaders
- class LabelsLoader
- def initialize(*); end
-
- def load(context, data)
- Labels::CreateService.new(data).execute(group: context.group)
- end
- end
- end
- end
-end
diff --git a/lib/bulk_imports/groups/loaders/members_loader.rb b/lib/bulk_imports/groups/loaders/members_loader.rb
deleted file mode 100644
index ccf44b31aee..00000000000
--- a/lib/bulk_imports/groups/loaders/members_loader.rb
+++ /dev/null
@@ -1,17 +0,0 @@
-# frozen_string_literal: true
-
-module BulkImports
- module Groups
- module Loaders
- class MembersLoader
- def initialize(*); end
-
- def load(context, data)
- return unless data
-
- context.group.members.create!(data)
- end
- end
- end
- end
-end
diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb
index 40dab9b444c..9f8b8682751 100644
--- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb
+++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb
@@ -11,7 +11,9 @@ module BulkImports
transformer Common::Transformers::ProhibitedAttributesTransformer
- loader BulkImports::Groups::Loaders::LabelsLoader
+ def load(context, data)
+ Labels::CreateService.new(data).execute(group: context.group)
+ end
def after_run(extracted_data)
context.entity.update_tracker_for(
diff --git a/lib/bulk_imports/groups/pipelines/members_pipeline.rb b/lib/bulk_imports/groups/pipelines/members_pipeline.rb
index b00c4c1a659..32fc931e8c3 100644
--- a/lib/bulk_imports/groups/pipelines/members_pipeline.rb
+++ b/lib/bulk_imports/groups/pipelines/members_pipeline.rb
@@ -12,7 +12,11 @@ module BulkImports
transformer Common::Transformers::ProhibitedAttributesTransformer
transformer BulkImports::Groups::Transformers::MemberAttributesTransformer
- loader BulkImports::Groups::Loaders::MembersLoader
+ def load(context, data)
+ return unless data
+
+ context.group.members.create!(data)
+ end
def after_run(extracted_data)
context.entity.update_tracker_for(
diff --git a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb
new file mode 100644
index 00000000000..8497162e0e7
--- /dev/null
+++ b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb
@@ -0,0 +1,42 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Groups
+ module Pipelines
+ class MilestonesPipeline
+ include Pipeline
+
+ extractor BulkImports::Common::Extractors::GraphqlExtractor,
+ query: BulkImports::Groups::Graphql::GetMilestonesQuery
+
+ transformer Common::Transformers::ProhibitedAttributesTransformer
+
+ def load(context, data)
+ return unless data
+
+ raise ::BulkImports::Pipeline::NotAllowedError unless authorized?
+
+ context.group.milestones.create!(data)
+ end
+
+ def after_run(extracted_data)
+ context.entity.update_tracker_for(
+ relation: :milestones,
+ has_next_page: extracted_data.has_next_page?,
+ next_page: extracted_data.next_page
+ )
+
+ if extracted_data.has_next_page?
+ run
+ end
+ end
+
+ private
+
+ def authorized?
+ context.current_user.can?(:admin_milestone, context.group)
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb
index d7e1a118d0b..c47a8bd1daa 100644
--- a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb
+++ b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb
@@ -9,7 +9,10 @@ module BulkImports
extractor BulkImports::Groups::Extractors::SubgroupsExtractor
transformer Common::Transformers::ProhibitedAttributesTransformer
transformer BulkImports::Groups::Transformers::SubgroupToEntityTransformer
- loader BulkImports::Common::Loaders::EntityLoader
+
+ def load(context, data)
+ context.bulk_import.entities.create!(data)
+ end
end
end
end
diff --git a/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb b/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb
index 7de9a430421..23e898a7bb2 100644
--- a/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb
+++ b/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb
@@ -4,10 +4,6 @@ module BulkImports
module Groups
module Transformers
class GroupAttributesTransformer
- def initialize(options = {})
- @options = options
- end
-
def transform(context, data)
import_entity = context.entity
@@ -39,12 +35,11 @@ module BulkImports
end
def transform_parent(context, import_entity, data)
- current_user = context.current_user
- namespace = Namespace.find_by_full_path(import_entity.destination_namespace)
-
- return data if namespace == current_user.namespace
+ unless import_entity.destination_namespace.blank?
+ namespace = Namespace.find_by_full_path(import_entity.destination_namespace)
+ data['parent_id'] = namespace.id
+ end
- data['parent_id'] = namespace.id
data
end
diff --git a/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb b/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb
index 622f5b60ffe..e92c898171a 100644
--- a/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb
+++ b/lib/bulk_imports/groups/transformers/member_attributes_transformer.rb
@@ -4,8 +4,6 @@ module BulkImports
module Groups
module Transformers
class MemberAttributesTransformer
- def initialize(*); end
-
def transform(context, data)
data
.then { |data| add_user(data) }
diff --git a/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb b/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb
index 6c3c299c2d2..676a6ca8d2a 100644
--- a/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb
+++ b/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb
@@ -4,8 +4,6 @@ module BulkImports
module Groups
module Transformers
class SubgroupToEntityTransformer
- def initialize(*args); end
-
def transform(context, entry)
{
source_type: :group_entity,
diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb
index f967b7ad7ab..f016b552fd4 100644
--- a/lib/bulk_imports/importers/group_importer.rb
+++ b/lib/bulk_imports/importers/group_importer.rb
@@ -24,7 +24,8 @@ module BulkImports
BulkImports::Groups::Pipelines::GroupPipeline,
BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline,
BulkImports::Groups::Pipelines::MembersPipeline,
- BulkImports::Groups::Pipelines::LabelsPipeline
+ BulkImports::Groups::Pipelines::LabelsPipeline,
+ BulkImports::Groups::Pipelines::MilestonesPipeline
]
end
end
diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb
index 1d55ad95887..14445162737 100644
--- a/lib/bulk_imports/pipeline.rb
+++ b/lib/bulk_imports/pipeline.rb
@@ -3,9 +3,14 @@
module BulkImports
module Pipeline
extend ActiveSupport::Concern
+ include Gitlab::Utils::StrongMemoize
include Gitlab::ClassAttributes
include Runner
+ NotAllowedError = Class.new(StandardError)
+
+ CACHE_KEY_EXPIRATION = 2.hours
+
def initialize(context)
@context = context
end
@@ -15,16 +20,83 @@ module BulkImports
attr_reader :context
+ # Fetch pipeline extractor.
+ # An extractor is defined either by instance `#extract(context)` method
+ # or by using `extractor` DSL.
+ #
+ # @example
+ # class MyPipeline
+ # extractor MyExtractor, foo: :bar
+ # end
+ #
+ # class MyPipeline
+ # def extract(context)
+ # puts 'Fetch some data'
+ # end
+ # end
+ #
+ # If pipeline implements instance method `extract` - use it
+ # and ignore class `extractor` method implementation.
def extractor
- @extractor ||= instantiate(self.class.get_extractor)
+ @extractor ||= self.respond_to?(:extract) ? self : instantiate(self.class.get_extractor)
end
+ # Fetch pipeline transformers.
+ #
+ # A transformer can be defined using:
+ # - `transformer` class method
+ # - `transform` instance method
+ #
+ # Multiple transformers can be defined within a single
+ # pipeline and run sequentially for each record in the
+ # following order:
+ # - Transformers defined using `transformer` class method
+ # - Instance method `transform`
+ #
+ # Instance method `transform` is always the last to run.
+ #
+ # @example
+ # class MyPipeline
+ # transformer MyTransformerOne, foo: :bar
+ # transformer MyTransformerTwo, foo: :bar
+ #
+ # def transform(context, data)
+ # # perform transformation here
+ # end
+ # end
+ #
+ # In the example above `#transform` is the first to run and
+ # `MyTransformerTwo` method is the last.
def transformers
- @transformers ||= self.class.transformers.map(&method(:instantiate))
+ strong_memoize(:transformers) do
+ defined_transformers = self.class.transformers.map(&method(:instantiate))
+
+ transformers = []
+ transformers << self if respond_to?(:transform)
+ transformers.concat(defined_transformers)
+ transformers
+ end
end
+ # Fetch pipeline loader.
+ # A loader is defined either by instance method `#load(context, data)`
+ # or by using `loader` DSL.
+ #
+ # @example
+ # class MyPipeline
+ # loader MyLoader, foo: :bar
+ # end
+ #
+ # class MyPipeline
+ # def load(context, data)
+ # puts 'Load some data'
+ # end
+ # end
+ #
+ # If pipeline implements instance method `load` - use it
+ # and ignore class `loader` method implementation.
def loader
- @loaders ||= instantiate(self.class.get_loader)
+ @loader ||= self.respond_to?(:load) ? self : instantiate(self.class.get_loader)
end
def pipeline
@@ -32,7 +104,13 @@ module BulkImports
end
def instantiate(class_config)
- class_config[:klass].new(class_config[:options])
+ options = class_config[:options]
+
+ if options
+ class_config[:klass].new(class_config[:options])
+ else
+ class_config[:klass].new
+ end
end
def abort_on_failure?
@@ -58,7 +136,7 @@ module BulkImports
end
def transformers
- class_attributes[:transformers]
+ class_attributes[:transformers] || []
end
def get_loader
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index d39f4121b51..e3535e585cc 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -8,7 +8,7 @@ module BulkImports
MarkedAsFailedError = Class.new(StandardError)
def run
- raise MarkedAsFailedError if marked_as_failed?
+ raise MarkedAsFailedError if context.entity.failed?
info(message: 'Pipeline started')
@@ -40,7 +40,7 @@ module BulkImports
private # rubocop:disable Lint/UselessAccessModifier
def run_pipeline_step(step, class_name = nil)
- raise MarkedAsFailedError if marked_as_failed?
+ raise MarkedAsFailedError if context.entity.failed?
info(pipeline_step: step, step_class: class_name)
@@ -62,24 +62,13 @@ module BulkImports
end
def mark_as_failed
- warn(message: 'Pipeline failed', pipeline_class: pipeline)
+ warn(message: 'Pipeline failed')
context.entity.fail_op!
end
- def marked_as_failed?
- return true if context.entity.failed?
-
- false
- end
-
def log_skip(extra = {})
- log = {
- message: 'Skipping due to failed pipeline status',
- pipeline_class: pipeline
- }.merge(extra)
-
- info(log)
+ info({ message: 'Skipping due to failed pipeline status' }.merge(extra))
end
def log_import_failure(exception, step)
@@ -92,25 +81,39 @@ module BulkImports
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
+ error(
+ pipeline_step: step,
+ exception_class: exception.class.to_s,
+ exception_message: exception.message
+ )
+
BulkImports::Failure.create(attributes)
end
+ def info(extra = {})
+ logger.info(log_params(extra))
+ end
+
def warn(extra = {})
logger.warn(log_params(extra))
end
- def info(extra = {})
- logger.info(log_params(extra))
+ def error(extra = {})
+ logger.error(log_params(extra))
end
def log_params(extra)
defaults = {
+ bulk_import_id: context.bulk_import.id,
bulk_import_entity_id: context.entity.id,
bulk_import_entity_type: context.entity.source_type,
- pipeline_class: pipeline
+ pipeline_class: pipeline,
+ context_extra: context.extra
}
- defaults.merge(extra).compact
+ defaults
+ .merge(extra)
+ .reject { |_key, value| value.blank? }
end
def logger