diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-04-20 23:50:22 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-04-20 23:50:22 +0000 |
commit | 9dc93a4519d9d5d7be48ff274127136236a3adb3 (patch) | |
tree | 70467ae3692a0e35e5ea56bcb803eb512a10bedb /lib/bulk_imports | |
parent | 4b0f34b6d759d6299322b3a54453e930c6121ff0 (diff) | |
download | gitlab-ce-9dc93a4519d9d5d7be48ff274127136236a3adb3.tar.gz |
Add latest changes from gitlab-org/gitlab@13-11-stable-eev13.11.0-rc43
Diffstat (limited to 'lib/bulk_imports')
17 files changed, 208 insertions, 110 deletions
diff --git a/lib/bulk_imports/clients/http.rb b/lib/bulk_imports/clients/http.rb index 2e81863e53a..ef99122cdfd 100644 --- a/lib/bulk_imports/clients/http.rb +++ b/lib/bulk_imports/clients/http.rb @@ -3,9 +3,9 @@ module BulkImports module Clients class Http - API_VERSION = 'v4'.freeze - DEFAULT_PAGE = 1.freeze - DEFAULT_PER_PAGE = 30.freeze + API_VERSION = 'v4' + DEFAULT_PAGE = 1 + DEFAULT_PER_PAGE = 30 ConnectionError = Class.new(StandardError) @@ -23,7 +23,7 @@ module BulkImports resource_url(resource), headers: request_headers, follow_redirects: false, - query: query.merge(request_query) + query: query.reverse_merge(request_query) ) end end diff --git a/lib/bulk_imports/common/extractors/rest_extractor.rb b/lib/bulk_imports/common/extractors/rest_extractor.rb new file mode 100644 index 00000000000..b18e27fd475 --- /dev/null +++ b/lib/bulk_imports/common/extractors/rest_extractor.rb @@ -0,0 +1,45 @@ +# frozen_string_literal: true + +module BulkImports + module Common + module Extractors + class RestExtractor + def initialize(options = {}) + @query = options[:query] + end + + def extract(context) + client = http_client(context.configuration) + params = query.to_h(context) + response = client.get(params[:resource], params[:query]) + + BulkImports::Pipeline::ExtractedData.new( + data: response.parsed_response, + page_info: page_info(response.headers) + ) + end + + private + + attr_reader :query + + def http_client(configuration) + @http_client ||= BulkImports::Clients::Http.new( + uri: configuration.url, + token: configuration.access_token, + per_page: 100 + ) + end + + def page_info(headers) + next_page = headers['x-next-page'] + + { + 'has_next_page' => next_page.present?, + 'next_page' => next_page + } + end + end + end + end +end diff --git a/lib/bulk_imports/common/transformers/user_reference_transformer.rb b/lib/bulk_imports/common/transformers/user_reference_transformer.rb index ca077b4ef43..c330ea59113 100644 --- a/lib/bulk_imports/common/transformers/user_reference_transformer.rb +++ b/lib/bulk_imports/common/transformers/user_reference_transformer.rb @@ -12,7 +12,7 @@ module BulkImports DEFAULT_REFERENCE = 'user' def initialize(options = {}) - @reference = options[:reference] || DEFAULT_REFERENCE + @reference = options[:reference].to_s.presence || DEFAULT_REFERENCE @suffixed_reference = "#{@reference}_id" end diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb index 23efbc33581..f957cf0be52 100644 --- a/lib/bulk_imports/groups/graphql/get_labels_query.rb +++ b/lib/bulk_imports/groups/graphql/get_labels_query.rb @@ -8,11 +8,11 @@ module BulkImports def to_s <<-'GRAPHQL' - query ($full_path: ID!, $cursor: String) { + query ($full_path: ID!, $cursor: String, $per_page: Int) { group(fullPath: $full_path) { - labels(first: 100, after: $cursor, onlyGroupLabels: true) { + labels(first: $per_page, after: $cursor, onlyGroupLabels: true) { page_info: pageInfo { - end_cursor: endCursor + next_page: endCursor has_next_page: hasNextPage } nodes { @@ -31,7 +31,8 @@ module BulkImports def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:labels) + cursor: context.tracker.next_page, + per_page: ::BulkImports::Tracker::DEFAULT_PAGE_SIZE } end diff --git a/lib/bulk_imports/groups/graphql/get_members_query.rb b/lib/bulk_imports/groups/graphql/get_members_query.rb index e3a78124a47..e44d3c5aa9b 100644 --- a/lib/bulk_imports/groups/graphql/get_members_query.rb +++ b/lib/bulk_imports/groups/graphql/get_members_query.rb @@ -7,11 +7,11 @@ module BulkImports extend self def to_s <<-'GRAPHQL' - query($full_path: ID!, $cursor: String) { + query($full_path: ID!, $cursor: String, $per_page: Int) { group(fullPath: $full_path) { - group_members: groupMembers(relations: DIRECT, first: 100, after: $cursor) { + group_members: groupMembers(relations: DIRECT, first: $per_page, after: $cursor) { page_info: pageInfo { - end_cursor: endCursor + next_page: endCursor has_next_page: hasNextPage } nodes { @@ -34,7 +34,8 @@ module BulkImports def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:group_members) + cursor: context.tracker.next_page, + per_page: ::BulkImports::Tracker::DEFAULT_PAGE_SIZE } end diff --git a/lib/bulk_imports/groups/graphql/get_milestones_query.rb b/lib/bulk_imports/groups/graphql/get_milestones_query.rb index 2ade87e6fa0..5dd5b31cf0e 100644 --- a/lib/bulk_imports/groups/graphql/get_milestones_query.rb +++ b/lib/bulk_imports/groups/graphql/get_milestones_query.rb @@ -8,14 +8,15 @@ module BulkImports def to_s <<-'GRAPHQL' - query ($full_path: ID!, $cursor: String) { + query ($full_path: ID!, $cursor: String, $per_page: Int) { group(fullPath: $full_path) { - milestones(first: 100, after: $cursor, includeDescendants: false) { + milestones(first: $per_page, after: $cursor, includeDescendants: false) { page_info: pageInfo { - end_cursor: endCursor + next_page: endCursor has_next_page: hasNextPage } nodes { + iid title description state @@ -33,7 +34,8 @@ module BulkImports def variables(context) { full_path: context.entity.source_full_path, - cursor: context.entity.next_page_for(:milestones) + cursor: context.tracker.next_page, + per_page: ::BulkImports::Tracker::DEFAULT_PAGE_SIZE } end diff --git a/lib/bulk_imports/groups/pipelines/badges_pipeline.rb b/lib/bulk_imports/groups/pipelines/badges_pipeline.rb new file mode 100644 index 00000000000..8569ff3f77a --- /dev/null +++ b/lib/bulk_imports/groups/pipelines/badges_pipeline.rb @@ -0,0 +1,32 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Pipelines + class BadgesPipeline + include Pipeline + + extractor BulkImports::Common::Extractors::RestExtractor, + query: BulkImports::Groups::Rest::GetBadgesQuery + + transformer Common::Transformers::ProhibitedAttributesTransformer + + def transform(_, data) + return if data.blank? + + { + name: data['name'], + link_url: data['link_url'], + image_url: data['image_url'] + } + end + + def load(context, data) + return if data.blank? + + context.group.badges.create!(data) + end + end + end + end +end diff --git a/lib/bulk_imports/groups/pipelines/entity_finisher.rb b/lib/bulk_imports/groups/pipelines/entity_finisher.rb new file mode 100644 index 00000000000..1d237bc0f7f --- /dev/null +++ b/lib/bulk_imports/groups/pipelines/entity_finisher.rb @@ -0,0 +1,35 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Pipelines + class EntityFinisher + def initialize(context) + @context = context + end + + def run + return if context.entity.finished? + + context.entity.finish! + + logger.info( + bulk_import_id: context.bulk_import.id, + bulk_import_entity_id: context.entity.id, + bulk_import_entity_type: context.entity.source_type, + pipeline_class: self.class.name, + message: 'Entity finished' + ) + end + + private + + attr_reader :context + + def logger + @logger ||= Gitlab::Import::Logger.build + end + end + end + end +end diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb index 9f8b8682751..0dc4a968b84 100644 --- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -14,18 +14,6 @@ module BulkImports def load(context, data) Labels::CreateService.new(data).execute(group: context.group) end - - def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :labels, - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) - - if extracted_data.has_next_page? - run - end - end end end end diff --git a/lib/bulk_imports/groups/pipelines/members_pipeline.rb b/lib/bulk_imports/groups/pipelines/members_pipeline.rb index 32fc931e8c3..5e4293d2c06 100644 --- a/lib/bulk_imports/groups/pipelines/members_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/members_pipeline.rb @@ -17,18 +17,6 @@ module BulkImports context.group.members.create!(data) end - - def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :group_members, - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) - - if extracted_data.has_next_page? - run - end - end end end end diff --git a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb index 8497162e0e7..9b2be30735c 100644 --- a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb @@ -19,18 +19,6 @@ module BulkImports context.group.milestones.create!(data) end - def after_run(extracted_data) - context.entity.update_tracker_for( - relation: :milestones, - has_next_page: extracted_data.has_next_page?, - next_page: extracted_data.next_page - ) - - if extracted_data.has_next_page? - run - end - end - private def authorized? diff --git a/lib/bulk_imports/groups/rest/get_badges_query.rb b/lib/bulk_imports/groups/rest/get_badges_query.rb new file mode 100644 index 00000000000..79ffdd9a1f6 --- /dev/null +++ b/lib/bulk_imports/groups/rest/get_badges_query.rb @@ -0,0 +1,22 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Rest + module GetBadgesQuery + extend self + + def to_h(context) + encoded_full_path = ERB::Util.url_encode(context.entity.source_full_path) + + { + resource: ['groups', encoded_full_path, 'badges'].join('/'), + query: { + page: context.tracker.next_page + } + } + end + end + end + end +end diff --git a/lib/bulk_imports/importers/group_importer.rb b/lib/bulk_imports/importers/group_importer.rb deleted file mode 100644 index f016b552fd4..00000000000 --- a/lib/bulk_imports/importers/group_importer.rb +++ /dev/null @@ -1,35 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Importers - class GroupImporter - def initialize(entity) - @entity = entity - end - - def execute - context = BulkImports::Pipeline::Context.new(entity) - - pipelines.each { |pipeline| pipeline.new(context).run } - - entity.finish! - end - - private - - attr_reader :entity - - def pipelines - [ - BulkImports::Groups::Pipelines::GroupPipeline, - BulkImports::Groups::Pipelines::SubgroupEntitiesPipeline, - BulkImports::Groups::Pipelines::MembersPipeline, - BulkImports::Groups::Pipelines::LabelsPipeline, - BulkImports::Groups::Pipelines::MilestonesPipeline - ] - end - end - end -end - -BulkImports::Importers::GroupImporter.prepend_if_ee('EE::BulkImports::Importers::GroupImporter') diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index 14445162737..df4f020d6b2 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -15,6 +15,10 @@ module BulkImports @context = context end + def tracker + @tracker ||= context.tracker + end + included do private diff --git a/lib/bulk_imports/pipeline/context.rb b/lib/bulk_imports/pipeline/context.rb index dd121b2dbed..3c69c729f36 100644 --- a/lib/bulk_imports/pipeline/context.rb +++ b/lib/bulk_imports/pipeline/context.rb @@ -3,25 +3,33 @@ module BulkImports module Pipeline class Context - attr_reader :entity, :bulk_import attr_accessor :extra - def initialize(entity, extra = {}) - @entity = entity - @bulk_import = entity.bulk_import + attr_reader :tracker + + def initialize(tracker, extra = {}) + @tracker = tracker @extra = extra end + def entity + @entity ||= tracker.entity + end + def group - entity.group + @group ||= entity.group + end + + def bulk_import + @bulk_import ||= entity.bulk_import end def current_user - bulk_import.user + @current_user ||= bulk_import.user end def configuration - bulk_import.configuration + @configuration ||= bulk_import.configuration end end end diff --git a/lib/bulk_imports/pipeline/extracted_data.rb b/lib/bulk_imports/pipeline/extracted_data.rb index 685a91a4afe..c9e54b61dd3 100644 --- a/lib/bulk_imports/pipeline/extracted_data.rb +++ b/lib/bulk_imports/pipeline/extracted_data.rb @@ -11,11 +11,14 @@ module BulkImports end def has_next_page? - @page_info['has_next_page'] + Gitlab::Utils.to_boolean( + @page_info&.dig('has_next_page'), + default: false + ) end def next_page - @page_info['end_cursor'] + @page_info&.dig('next_page') end def each(&block) diff --git a/lib/bulk_imports/pipeline/runner.rb b/lib/bulk_imports/pipeline/runner.rb index e3535e585cc..b756fba3bee 100644 --- a/lib/bulk_imports/pipeline/runner.rb +++ b/lib/bulk_imports/pipeline/runner.rb @@ -14,19 +14,24 @@ module BulkImports extracted_data = extracted_data_from - extracted_data&.each do |entry| - transformers.each do |transformer| - entry = run_pipeline_step(:transformer, transformer.class.name) do - transformer.transform(context, entry) + if extracted_data + extracted_data.each do |entry| + transformers.each do |transformer| + entry = run_pipeline_step(:transformer, transformer.class.name) do + transformer.transform(context, entry) + end end - end - run_pipeline_step(:loader, loader.class.name) do - loader.load(context, entry) + run_pipeline_step(:loader, loader.class.name) do + loader.load(context, entry) + end end - end - if respond_to?(:after_run) + tracker.update!( + has_next_page: extracted_data.has_next_page?, + next_page: extracted_data.next_page + ) + run_pipeline_step(:after_run) do after_run(extracted_data) end @@ -34,7 +39,7 @@ module BulkImports info(message: 'Pipeline finished') rescue MarkedAsFailedError - log_skip + skip!('Skipping pipeline due to failed entity') end private # rubocop:disable Lint/UselessAccessModifier @@ -46,7 +51,11 @@ module BulkImports yield rescue MarkedAsFailedError - log_skip(step => class_name) + skip!( + 'Skipping pipeline due to failed entity', + pipeline_step: step, + step_class: class_name + ) rescue => e log_import_failure(e, step) @@ -61,14 +70,21 @@ module BulkImports end end + def after_run(extracted_data) + run if extracted_data.has_next_page? + end + def mark_as_failed warn(message: 'Pipeline failed') context.entity.fail_op! + tracker.fail_op! end - def log_skip(extra = {}) - info({ message: 'Skipping due to failed pipeline status' }.merge(extra)) + def skip!(message, extra = {}) + warn({ message: message }.merge(extra)) + + tracker.skip! end def log_import_failure(exception, step) |