summaryrefslogtreecommitdiff
path: root/lib/bulk_imports
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2020-11-19 08:27:35 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2020-11-19 08:27:35 +0000
commit7e9c479f7de77702622631cff2628a9c8dcbc627 (patch)
treec8f718a08e110ad7e1894510980d2155a6549197 /lib/bulk_imports
parente852b0ae16db4052c1c567d9efa4facc81146e88 (diff)
downloadgitlab-ce-7e9c479f7de77702622631cff2628a9c8dcbc627.tar.gz
Add latest changes from gitlab-org/gitlab@13-6-stable-eev13.6.0-rc42
Diffstat (limited to 'lib/bulk_imports')
-rw-r--r--lib/bulk_imports/clients/graphql.rb49
-rw-r--r--lib/bulk_imports/clients/http.rb86
-rw-r--r--lib/bulk_imports/common/extractors/graphql_extractor.rb46
-rw-r--r--lib/bulk_imports/common/loaders/entity_loader.rb15
-rw-r--r--lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb54
-rw-r--r--lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb19
-rw-r--r--lib/bulk_imports/groups/extractors/subgroups_extractor.rb29
-rw-r--r--lib/bulk_imports/groups/graphql/get_group_query.rb38
-rw-r--r--lib/bulk_imports/groups/loaders/group_loader.rb35
-rw-r--r--lib/bulk_imports/groups/pipelines/group_pipeline.rb19
-rw-r--r--lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb15
-rw-r--r--lib/bulk_imports/groups/transformers/group_attributes_transformer.rb81
-rw-r--r--lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb21
-rw-r--r--lib/bulk_imports/importers/group_importer.rb32
-rw-r--r--lib/bulk_imports/importers/groups_importer.rb36
-rw-r--r--lib/bulk_imports/pipeline.rb12
-rw-r--r--lib/bulk_imports/pipeline/attributes.rb41
-rw-r--r--lib/bulk_imports/pipeline/context.rb33
-rw-r--r--lib/bulk_imports/pipeline/runner.rb66
19 files changed, 727 insertions, 0 deletions
diff --git a/lib/bulk_imports/clients/graphql.rb b/lib/bulk_imports/clients/graphql.rb
new file mode 100644
index 00000000000..b067431aeae
--- /dev/null
+++ b/lib/bulk_imports/clients/graphql.rb
@@ -0,0 +1,49 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Clients
+ class Graphql
+ class HTTP < Graphlient::Adapters::HTTP::Adapter
+ def execute(document:, operation_name: nil, variables: {}, context: {})
+ response = ::Gitlab::HTTP.post(
+ url,
+ headers: headers,
+ follow_redirects: false,
+ body: {
+ query: document.to_query_string,
+ operationName: operation_name,
+ variables: variables
+ }.to_json
+ )
+
+ ::Gitlab::Json.parse(response.body)
+ end
+ end
+ private_constant :HTTP
+
+ attr_reader :client
+
+ delegate :query, :parse, :execute, to: :client
+
+ def initialize(url: Gitlab::COM_URL, token: nil)
+ @url = Gitlab::Utils.append_path(url, '/api/graphql')
+ @token = token
+ @client = Graphlient::Client.new(
+ @url,
+ options(http: HTTP)
+ )
+ end
+
+ def options(extra = {})
+ return extra unless @token
+
+ {
+ headers: {
+ 'Content-Type' => 'application/json',
+ 'Authorization' => "Bearer #{@token}"
+ }
+ }.merge(extra)
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/clients/http.rb b/lib/bulk_imports/clients/http.rb
new file mode 100644
index 00000000000..2e81863e53a
--- /dev/null
+++ b/lib/bulk_imports/clients/http.rb
@@ -0,0 +1,86 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Clients
+ class Http
+ API_VERSION = 'v4'.freeze
+ DEFAULT_PAGE = 1.freeze
+ DEFAULT_PER_PAGE = 30.freeze
+
+ ConnectionError = Class.new(StandardError)
+
+ def initialize(uri:, token:, page: DEFAULT_PAGE, per_page: DEFAULT_PER_PAGE, api_version: API_VERSION)
+ @uri = URI.parse(uri)
+ @token = token&.strip
+ @page = page
+ @per_page = per_page
+ @api_version = api_version
+ end
+
+ def get(resource, query = {})
+ with_error_handling do
+ Gitlab::HTTP.get(
+ resource_url(resource),
+ headers: request_headers,
+ follow_redirects: false,
+ query: query.merge(request_query)
+ )
+ end
+ end
+
+ def each_page(method, resource, query = {}, &block)
+ return to_enum(__method__, method, resource, query) unless block_given?
+
+ next_page = @page
+
+ while next_page
+ @page = next_page.to_i
+
+ response = self.public_send(method, resource, query) # rubocop: disable GitlabSecurity/PublicSend
+ collection = response.parsed_response
+ next_page = response.headers['x-next-page'].presence
+
+ yield collection
+ end
+ end
+
+ private
+
+ def request_query
+ {
+ page: @page,
+ per_page: @per_page
+ }
+ end
+
+ def request_headers
+ {
+ 'Content-Type' => 'application/json',
+ 'Authorization' => "Bearer #{@token}"
+ }
+ end
+
+ def with_error_handling
+ response = yield
+
+ raise ConnectionError.new("Error #{response.code}") unless response.success?
+
+ response
+ rescue *Gitlab::HTTP::HTTP_ERRORS => e
+ raise ConnectionError, e
+ end
+
+ def base_uri
+ @base_uri ||= "#{@uri.scheme}://#{@uri.host}:#{@uri.port}"
+ end
+
+ def api_url
+ Gitlab::Utils.append_path(base_uri, "/api/#{@api_version}")
+ end
+
+ def resource_url(resource)
+ Gitlab::Utils.append_path(api_url, resource)
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/common/extractors/graphql_extractor.rb b/lib/bulk_imports/common/extractors/graphql_extractor.rb
new file mode 100644
index 00000000000..7d58032cfcc
--- /dev/null
+++ b/lib/bulk_imports/common/extractors/graphql_extractor.rb
@@ -0,0 +1,46 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Common
+ module Extractors
+ class GraphqlExtractor
+ def initialize(query)
+ @query = query[:query]
+ @query_string = @query.to_s
+ @variables = @query.variables
+ end
+
+ def extract(context)
+ @context = context
+
+ Enumerator.new do |yielder|
+ result = graphql_client.execute(parsed_query, query_variables(context.entity))
+
+ yielder << result.original_hash.deep_dup
+ end
+ end
+
+ private
+
+ def graphql_client
+ @graphql_client ||= BulkImports::Clients::Graphql.new(
+ url: @context.configuration.url,
+ token: @context.configuration.access_token
+ )
+ end
+
+ def parsed_query
+ @parsed_query ||= graphql_client.parse(@query.to_s)
+ end
+
+ def query_variables(entity)
+ return unless @variables
+
+ @variables.transform_values do |entity_attribute|
+ entity.public_send(entity_attribute) # rubocop:disable GitlabSecurity/PublicSend
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/common/loaders/entity_loader.rb b/lib/bulk_imports/common/loaders/entity_loader.rb
new file mode 100644
index 00000000000..4540b892c88
--- /dev/null
+++ b/lib/bulk_imports/common/loaders/entity_loader.rb
@@ -0,0 +1,15 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Common
+ module Loaders
+ class EntityLoader
+ def initialize(*args); end
+
+ def load(context, entity)
+ context.entity.bulk_import.entities.create!(entity)
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb b/lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb
new file mode 100644
index 00000000000..dce0fac6999
--- /dev/null
+++ b/lib/bulk_imports/common/transformers/graphql_cleaner_transformer.rb
@@ -0,0 +1,54 @@
+# frozen_string_literal: true
+
+# Cleanup GraphQL original response hash from unnecessary nesting
+# 1. Remove ['data']['group'] or ['data']['project'] hash nesting
+# 2. Remove ['edges'] & ['nodes'] array wrappings
+# 3. Remove ['node'] hash wrapping
+#
+# @example
+# data = {"data"=>{"group"=> {
+# "name"=>"test",
+# "fullName"=>"test",
+# "description"=>"test",
+# "labels"=>{"edges"=>[{"node"=>{"title"=>"label1"}}, {"node"=>{"title"=>"label2"}}, {"node"=>{"title"=>"label3"}}]}}}}
+#
+# BulkImports::Common::Transformers::GraphqlCleanerTransformer.new.transform(nil, data)
+#
+# {"name"=>"test", "fullName"=>"test", "description"=>"test", "labels"=>[{"title"=>"label1"}, {"title"=>"label2"}, {"title"=>"label3"}]}
+module BulkImports
+ module Common
+ module Transformers
+ class GraphqlCleanerTransformer
+ EDGES = 'edges'
+ NODE = 'node'
+
+ def initialize(options = {})
+ @options = options
+ end
+
+ def transform(_, data)
+ return data unless data.is_a?(Hash)
+
+ data = data.dig('data', 'group') || data.dig('data', 'project') || data
+
+ clean_edges_and_nodes(data)
+ end
+
+ def clean_edges_and_nodes(data)
+ case data
+ when Array
+ data.map(&method(:clean_edges_and_nodes))
+ when Hash
+ if data.key?(NODE)
+ clean_edges_and_nodes(data[NODE])
+ else
+ data.transform_values { |value| clean_edges_and_nodes(value.try(:fetch, EDGES, value) || value) }
+ end
+ else
+ data
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb b/lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb
new file mode 100644
index 00000000000..b32ab28fdbb
--- /dev/null
+++ b/lib/bulk_imports/common/transformers/underscorify_keys_transformer.rb
@@ -0,0 +1,19 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Common
+ module Transformers
+ class UnderscorifyKeysTransformer
+ def initialize(options = {})
+ @options = options
+ end
+
+ def transform(_, data)
+ data.deep_transform_keys do |key|
+ key.to_s.underscore
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb
new file mode 100644
index 00000000000..5c5e686cec5
--- /dev/null
+++ b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb
@@ -0,0 +1,29 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Groups
+ module Extractors
+ class SubgroupsExtractor
+ def initialize(*args); end
+
+ def extract(context)
+ encoded_parent_path = ERB::Util.url_encode(context.entity.source_full_path)
+
+ http_client(context.entity.bulk_import.configuration)
+ .each_page(:get, "groups/#{encoded_parent_path}/subgroups")
+ .flat_map(&:itself)
+ end
+
+ private
+
+ def http_client(configuration)
+ @http_client ||= BulkImports::Clients::Http.new(
+ uri: configuration.url,
+ token: configuration.access_token,
+ per_page: 100
+ )
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/graphql/get_group_query.rb b/lib/bulk_imports/groups/graphql/get_group_query.rb
new file mode 100644
index 00000000000..c50b99aae4e
--- /dev/null
+++ b/lib/bulk_imports/groups/graphql/get_group_query.rb
@@ -0,0 +1,38 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Groups
+ module Graphql
+ module GetGroupQuery
+ extend self
+
+ def to_s
+ <<-'GRAPHQL'
+ query($full_path: ID!) {
+ group(fullPath: $full_path) {
+ name
+ path
+ fullPath
+ description
+ visibility
+ emailsDisabled
+ lfsEnabled
+ mentionsDisabled
+ projectCreationLevel
+ requestAccessEnabled
+ requireTwoFactorAuthentication
+ shareWithGroupLock
+ subgroupCreationLevel
+ twoFactorGracePeriod
+ }
+ }
+ GRAPHQL
+ end
+
+ def variables
+ { full_path: :source_full_path }
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/loaders/group_loader.rb b/lib/bulk_imports/groups/loaders/group_loader.rb
new file mode 100644
index 00000000000..386fc695182
--- /dev/null
+++ b/lib/bulk_imports/groups/loaders/group_loader.rb
@@ -0,0 +1,35 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Groups
+ module Loaders
+ class GroupLoader
+ def initialize(options = {})
+ @options = options
+ end
+
+ def load(context, data)
+ return unless user_can_create_group?(context.current_user, data)
+
+ group = ::Groups::CreateService.new(context.current_user, data).execute
+
+ context.entity.update!(group: group)
+
+ group
+ end
+
+ private
+
+ def user_can_create_group?(current_user, data)
+ if data['parent_id']
+ parent = Namespace.find_by_id(data['parent_id'])
+
+ Ability.allowed?(current_user, :create_subgroup, parent)
+ else
+ Ability.allowed?(current_user, :create_group)
+ end
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/pipelines/group_pipeline.rb b/lib/bulk_imports/groups/pipelines/group_pipeline.rb
new file mode 100644
index 00000000000..2b7d0ef7658
--- /dev/null
+++ b/lib/bulk_imports/groups/pipelines/group_pipeline.rb
@@ -0,0 +1,19 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Groups
+ module Pipelines
+ class GroupPipeline
+ include Pipeline
+
+ extractor Common::Extractors::GraphqlExtractor, query: Graphql::GetGroupQuery
+
+ transformer Common::Transformers::GraphqlCleanerTransformer
+ transformer Common::Transformers::UnderscorifyKeysTransformer
+ transformer Groups::Transformers::GroupAttributesTransformer
+
+ loader Groups::Loaders::GroupLoader
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb
new file mode 100644
index 00000000000..6384e9d5972
--- /dev/null
+++ b/lib/bulk_imports/groups/pipelines/subgroup_entities_pipeline.rb
@@ -0,0 +1,15 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Groups
+ module Pipelines
+ class SubgroupEntitiesPipeline
+ include Pipeline
+
+ extractor BulkImports::Groups::Extractors::SubgroupsExtractor
+ transformer BulkImports::Groups::Transformers::SubgroupToEntityTransformer
+ loader BulkImports::Common::Loaders::EntityLoader
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb b/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb
new file mode 100644
index 00000000000..7de9a430421
--- /dev/null
+++ b/lib/bulk_imports/groups/transformers/group_attributes_transformer.rb
@@ -0,0 +1,81 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Groups
+ module Transformers
+ class GroupAttributesTransformer
+ def initialize(options = {})
+ @options = options
+ end
+
+ def transform(context, data)
+ import_entity = context.entity
+
+ data
+ .then { |data| transform_name(import_entity, data) }
+ .then { |data| transform_path(import_entity, data) }
+ .then { |data| transform_full_path(data) }
+ .then { |data| transform_parent(context, import_entity, data) }
+ .then { |data| transform_visibility_level(data) }
+ .then { |data| transform_project_creation_level(data) }
+ .then { |data| transform_subgroup_creation_level(data) }
+ end
+
+ private
+
+ def transform_name(import_entity, data)
+ data['name'] = import_entity.destination_name
+ data
+ end
+
+ def transform_path(import_entity, data)
+ data['path'] = import_entity.destination_name.parameterize
+ data
+ end
+
+ def transform_full_path(data)
+ data.delete('full_path')
+ data
+ end
+
+ def transform_parent(context, import_entity, data)
+ current_user = context.current_user
+ namespace = Namespace.find_by_full_path(import_entity.destination_namespace)
+
+ return data if namespace == current_user.namespace
+
+ data['parent_id'] = namespace.id
+ data
+ end
+
+ def transform_visibility_level(data)
+ visibility = data['visibility']
+
+ return data unless visibility.present?
+
+ data['visibility_level'] = Gitlab::VisibilityLevel.string_options[visibility]
+ data.delete('visibility')
+ data
+ end
+
+ def transform_project_creation_level(data)
+ project_creation_level = data['project_creation_level']
+
+ return data unless project_creation_level.present?
+
+ data['project_creation_level'] = Gitlab::Access.project_creation_string_options[project_creation_level]
+ data
+ end
+
+ def transform_subgroup_creation_level(data)
+ subgroup_creation_level = data['subgroup_creation_level']
+
+ return data unless subgroup_creation_level.present?
+
+ data['subgroup_creation_level'] = Gitlab::Access.subgroup_creation_string_options[subgroup_creation_level]
+ data
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb b/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb
new file mode 100644
index 00000000000..6c3c299c2d2
--- /dev/null
+++ b/lib/bulk_imports/groups/transformers/subgroup_to_entity_transformer.rb
@@ -0,0 +1,21 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Groups
+ module Transformers
+ class SubgroupToEntityTransformer
+ def initialize(*args); end
+
+ def transform(context, entry)
+ {
+ source_type: :group_entity,
+ source_full_path: entry['full_path'],
+ destination_name: entry['name'],
+ destination_namespace: context.entity.group.full_path,
+ parent_id: context.entity.id
+ }
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb
new file mode 100644
index 00000000000..c7253590c87
--- /dev/null
+++ b/lib/bulk_imports/importers/group_importer.rb
@@ -0,0 +1,32 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Importers
+ class GroupImporter
+ def initialize(entity)
+ @entity = entity
+ end
+
+ def execute
+ entity.start!
+ bulk_import = entity.bulk_import
+ configuration = bulk_import.configuration
+
+ context = BulkImports::Pipeline::Context.new(
+ current_user: bulk_import.user,
+ entity: entity,
+ configuration: configuration
+ )
+
+ BulkImports::Groups::Pipelines::GroupPipeline.new.run(context)
+ BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline.new.run(context)
+
+ entity.finish!
+ end
+
+ private
+
+ attr_reader :entity
+ end
+ end
+end
diff --git a/lib/bulk_imports/importers/groups_importer.rb b/lib/bulk_imports/importers/groups_importer.rb
new file mode 100644
index 00000000000..8641577ff47
--- /dev/null
+++ b/lib/bulk_imports/importers/groups_importer.rb
@@ -0,0 +1,36 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Importers
+ class GroupsImporter
+ def initialize(bulk_import_id)
+ @bulk_import = BulkImport.find(bulk_import_id)
+ end
+
+ def execute
+ bulk_import.start! unless bulk_import.started?
+
+ if entities_to_import.empty?
+ bulk_import.finish!
+ else
+ entities_to_import.each do |entity|
+ BulkImports::Importers::GroupImporter.new(entity).execute
+ end
+
+ # A new BulkImportWorker job is enqueued to either
+ # - Process the new BulkImports::Entity created for the subgroups
+ # - Or to mark the `bulk_import` as finished.
+ BulkImportWorker.perform_async(bulk_import.id)
+ end
+ end
+
+ private
+
+ attr_reader :bulk_import
+
+ def entities_to_import
+ @entities_to_import ||= bulk_import.entities.with_status(:created)
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb
new file mode 100644
index 00000000000..70e6030ea2c
--- /dev/null
+++ b/lib/bulk_imports/pipeline.rb
@@ -0,0 +1,12 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Pipeline
+ extend ActiveSupport::Concern
+
+ included do
+ include Attributes
+ include Runner
+ end
+ end
+end
diff --git a/lib/bulk_imports/pipeline/attributes.rb b/lib/bulk_imports/pipeline/attributes.rb
new file mode 100644
index 00000000000..ebfbaf6f6ba
--- /dev/null
+++ b/lib/bulk_imports/pipeline/attributes.rb
@@ -0,0 +1,41 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Pipeline
+ module Attributes
+ extend ActiveSupport::Concern
+ include Gitlab::ClassAttributes
+
+ class_methods do
+ def extractor(klass, options = nil)
+ add_attribute(:extractors, klass, options)
+ end
+
+ def transformer(klass, options = nil)
+ add_attribute(:transformers, klass, options)
+ end
+
+ def loader(klass, options = nil)
+ add_attribute(:loaders, klass, options)
+ end
+
+ def add_attribute(sym, klass, options)
+ class_attributes[sym] ||= []
+ class_attributes[sym] << { klass: klass, options: options }
+ end
+
+ def extractors
+ class_attributes[:extractors]
+ end
+
+ def transformers
+ class_attributes[:transformers]
+ end
+
+ def loaders
+ class_attributes[:loaders]
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/pipeline/context.rb b/lib/bulk_imports/pipeline/context.rb
new file mode 100644
index 00000000000..ad19f5cad7d
--- /dev/null
+++ b/lib/bulk_imports/pipeline/context.rb
@@ -0,0 +1,33 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Pipeline
+ class Context
+ include Gitlab::Utils::LazyAttributes
+
+ Attribute = Struct.new(:name, :type)
+
+ PIPELINE_ATTRIBUTES = [
+ Attribute.new(:current_user, User),
+ Attribute.new(:entity, ::BulkImports::Entity),
+ Attribute.new(:configuration, ::BulkImports::Configuration)
+ ].freeze
+
+ def initialize(args)
+ assign_attributes(args)
+ end
+
+ private
+
+ PIPELINE_ATTRIBUTES.each do |attr|
+ lazy_attr_reader attr.name, type: attr.type
+ end
+
+ def assign_attributes(values)
+ values.slice(*PIPELINE_ATTRIBUTES.map(&:name)).each do |name, value|
+ instance_variable_set("@#{name}", value)
+ end
+ end
+ end
+ end
+end
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
new file mode 100644
index 00000000000..04038e50399
--- /dev/null
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -0,0 +1,66 @@
+# frozen_string_literal: true
+
+module BulkImports
+ module Pipeline
+ module Runner
+ extend ActiveSupport::Concern
+
+ included do
+ private
+
+ def extractors
+ @extractors ||= self.class.extractors.map(&method(:instantiate))
+ end
+
+ def transformers
+ @transformers ||= self.class.transformers.map(&method(:instantiate))
+ end
+
+ def loaders
+ @loaders ||= self.class.loaders.map(&method(:instantiate))
+ end
+
+ def pipeline_name
+ @pipeline ||= self.class.name
+ end
+
+ def instantiate(class_config)
+ class_config[:klass].new(class_config[:options])
+ end
+ end
+
+ def run(context)
+ info(context, message: "Pipeline started", pipeline: pipeline_name)
+
+ extractors.each do |extractor|
+ extractor.extract(context).each do |entry|
+ info(context, extractor: extractor.class.name)
+
+ transformers.each do |transformer|
+ info(context, transformer: transformer.class.name)
+ entry = transformer.transform(context, entry)
+ end
+
+ loaders.each do |loader|
+ info(context, loader: loader.class.name)
+ loader.load(context, entry)
+ end
+ end
+ end
+ end
+
+ private # rubocop:disable Lint/UselessAccessModifier
+
+ def info(context, extra = {})
+ logger.info({
+ entity: context.entity.id,
+ entity_type: context.entity.source_type
+ }.merge(extra))
+ end
+
+ def logger
+ @logger ||= Gitlab::Import::Logger.build
+ end
+ end
+ end
+end