summaryrefslogtreecommitdiff
path: root/lib/bulk_imports
diff options
context:
space:
mode:
authorRobert Speicher <rspeicher@gmail.com>2021-01-20 13:34:23 -0600
committerRobert Speicher <rspeicher@gmail.com>2021-01-20 13:34:23 -0600
commit6438df3a1e0fb944485cebf07976160184697d72 (patch)
tree00b09bfd170e77ae9391b1a2f5a93ef6839f2597 /lib/bulk_imports
parent42bcd54d971da7ef2854b896a7b34f4ef8601067 (diff)
downloadgitlab-ce-6438df3a1e0fb944485cebf07976160184697d72.tar.gz
Add latest changes from gitlab-org/gitlab@13-8-stable-eev13.8.0-rc42
Diffstat (limited to 'lib/bulk_imports')
-rw-r--r--lib/bulk_imports/common/extractors/graphql_extractor.rb12
-rw-r--r--lib/bulk_imports/importers/group_importer.rb14
-rw-r--r--lib/bulk_imports/importers/groups_importer.rb36
-rw-r--r--lib/bulk_imports/pipeline.rb20
-rw-r--r--lib/bulk_imports/pipeline/runner.rb30
5 files changed, 37 insertions, 75 deletions
diff --git a/lib/bulk_imports/common/extractors/graphql_extractor.rb b/lib/bulk_imports/common/extractors/graphql_extractor.rb
index c0cef61d2b2..af274ee1299 100644
--- a/lib/bulk_imports/common/extractors/graphql_extractor.rb
+++ b/lib/bulk_imports/common/extractors/graphql_extractor.rb
@@ -11,14 +11,10 @@ module BulkImports
def extract(context)
client = graphql_client(context)
- Enumerator.new do |yielder|
- result = client.execute(
- client.parse(query.to_s),
- query.variables(context.entity)
- )
-
- yielder << result.original_hash.deep_dup
- end
+ client.execute(
+ client.parse(query.to_s),
+ query.variables(context.entity)
+ ).original_hash.deep_dup
end
private
diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb
index 82cb1ca03a2..6e1b86e9515 100644
--- a/lib/bulk_imports/importers/group_importer.rb
+++ b/lib/bulk_imports/importers/group_importer.rb
@@ -8,7 +8,6 @@ module BulkImports
end
def execute
- entity.start!
bulk_import = entity.bulk_import
configuration = bulk_import.configuration
@@ -18,9 +17,7 @@ module BulkImports
configuration: configuration
)
- BulkImports::Groups::Pipelines::GroupPipeline.new.run(context)
- 'BulkImports::EE::Groups::Pipelines::EpicsPipeline'.constantize.new.run(context) if Gitlab.ee?
- BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline.new.run(context)
+ pipelines.each { |pipeline| pipeline.new.run(context) }
entity.finish!
end
@@ -28,6 +25,15 @@ module BulkImports
private
attr_reader :entity
+
+ def pipelines
+ [
+ BulkImports::Groups::Pipelines::GroupPipeline,
+ BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline
+ ]
+ end
end
end
end
+
+BulkImports::Importers::GroupImporter.prepend_if_ee('EE::BulkImports::Importers::GroupImporter')
diff --git a/lib/bulk_imports/importers/groups_importer.rb b/lib/bulk_imports/importers/groups_importer.rb
deleted file mode 100644
index 8641577ff47..00000000000
--- a/lib/bulk_imports/importers/groups_importer.rb
+++ /dev/null
@@ -1,36 +0,0 @@
-# frozen_string_literal: true
-
-module BulkImports
- module Importers
- class GroupsImporter
- def initialize(bulk_import_id)
- @bulk_import = BulkImport.find(bulk_import_id)
- end
-
- def execute
- bulk_import.start! unless bulk_import.started?
-
- if entities_to_import.empty?
- bulk_import.finish!
- else
- entities_to_import.each do |entity|
- BulkImports::Importers::GroupImporter.new(entity).execute
- end
-
- # A new BulkImportWorker job is enqueued to either
- # - Process the new BulkImports::Entity created for the subgroups
- # - Or to mark the `bulk_import` as finished.
- BulkImportWorker.perform_async(bulk_import.id)
- end
- end
-
- private
-
- attr_reader :bulk_import
-
- def entities_to_import
- @entities_to_import ||= bulk_import.entities.with_status(:created)
- end
- end
- end
-end
diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb
index a44f8fc7193..06b81b5da14 100644
--- a/lib/bulk_imports/pipeline.rb
+++ b/lib/bulk_imports/pipeline.rb
@@ -10,16 +10,16 @@ module BulkImports
private
- def extractors
- @extractors ||= self.class.extractors.map(&method(:instantiate))
+ def extractor
+ @extractor ||= instantiate(self.class.get_extractor)
end
def transformers
@transformers ||= self.class.transformers.map(&method(:instantiate))
end
- def loaders
- @loaders ||= self.class.loaders.map(&method(:instantiate))
+ def loader
+ @loaders ||= instantiate(self.class.get_loader)
end
def after_run
@@ -41,7 +41,7 @@ module BulkImports
class_methods do
def extractor(klass, options = nil)
- add_attribute(:extractors, klass, options)
+ class_attributes[:extractor] = { klass: klass, options: options }
end
def transformer(klass, options = nil)
@@ -49,23 +49,23 @@ module BulkImports
end
def loader(klass, options = nil)
- add_attribute(:loaders, klass, options)
+ class_attributes[:loader] = { klass: klass, options: options }
end
def after_run(&block)
class_attributes[:after_run] = block
end
- def extractors
- class_attributes[:extractors]
+ def get_extractor
+ class_attributes[:extractor]
end
def transformers
class_attributes[:transformers]
end
- def loaders
- class_attributes[:loaders]
+ def get_loader
+ class_attributes[:loader]
end
def after_run_callback
diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb
index 88b96f0ab6e..11fb9722173 100644
--- a/lib/bulk_imports/pipeline/runner.rb
+++ b/lib/bulk_imports/pipeline/runner.rb
@@ -12,25 +12,15 @@ module BulkImports
info(context, message: 'Pipeline started', pipeline_class: pipeline)
- extractors.each do |extractor|
- data = run_pipeline_step(:extractor, extractor.class.name, context) do
- extractor.extract(context)
+ Array.wrap(extracted_data_from(context)).each do |entry|
+ transformers.each do |transformer|
+ entry = run_pipeline_step(:transformer, transformer.class.name, context) do
+ transformer.transform(context, entry)
+ end
end
- if data && data.respond_to?(:each)
- data.each do |entry|
- transformers.each do |transformer|
- entry = run_pipeline_step(:transformer, transformer.class.name, context) do
- transformer.transform(context, entry)
- end
- end
-
- loaders.each do |loader|
- run_pipeline_step(:loader, loader.class.name, context) do
- loader.load(context, entry)
- end
- end
- end
+ run_pipeline_step(:loader, loader.class.name, context) do
+ loader.load(context, entry)
end
end
@@ -55,6 +45,12 @@ module BulkImports
mark_as_failed(context) if abort_on_failure?
end
+ def extracted_data_from(context)
+ run_pipeline_step(:extractor, extractor.class.name, context) do
+ extractor.extract(context)
+ end
+ end
+
def mark_as_failed(context)
warn(context, message: 'Pipeline failed', pipeline_class: pipeline)