diff options
Diffstat (limited to 'lib/bulk_imports')
-rw-r--r-- | lib/bulk_imports/clients/graphql.rb | 2 | ||||
-rw-r--r-- | lib/bulk_imports/clients/http.rb | 60 | ||||
-rw-r--r-- | lib/bulk_imports/common/extractors/ndjson_extractor.rb | 68 | ||||
-rw-r--r-- | lib/bulk_imports/common/extractors/rest_extractor.rb | 2 | ||||
-rw-r--r-- | lib/bulk_imports/groups/extractors/subgroups_extractor.rb | 2 | ||||
-rw-r--r-- | lib/bulk_imports/groups/graphql/get_labels_query.rb | 53 | ||||
-rw-r--r-- | lib/bulk_imports/groups/pipelines/boards_pipeline.rb | 15 | ||||
-rw-r--r-- | lib/bulk_imports/groups/pipelines/entity_finisher.rb | 22 | ||||
-rw-r--r-- | lib/bulk_imports/groups/pipelines/labels_pipeline.rb | 11 | ||||
-rw-r--r-- | lib/bulk_imports/groups/pipelines/milestones_pipeline.rb | 21 | ||||
-rw-r--r-- | lib/bulk_imports/ndjson_pipeline.rb | 99 | ||||
-rw-r--r-- | lib/bulk_imports/pipeline.rb | 33 | ||||
-rw-r--r-- | lib/bulk_imports/pipeline/context.rb | 8 | ||||
-rw-r--r-- | lib/bulk_imports/pipeline/extracted_data.rb | 2 | ||||
-rw-r--r-- | lib/bulk_imports/stage.rb | 6 |
15 files changed, 294 insertions, 110 deletions
diff --git a/lib/bulk_imports/clients/graphql.rb b/lib/bulk_imports/clients/graphql.rb index b067431aeae..ca549c4be14 100644 --- a/lib/bulk_imports/clients/graphql.rb +++ b/lib/bulk_imports/clients/graphql.rb @@ -25,7 +25,7 @@ module BulkImports delegate :query, :parse, :execute, to: :client - def initialize(url: Gitlab::COM_URL, token: nil) + def initialize(url: Gitlab::Saas.com_url, token: nil) @url = Gitlab::Utils.append_path(url, '/api/graphql') @token = token @client = Graphlient::Client.new( diff --git a/lib/bulk_imports/clients/http.rb b/lib/bulk_imports/clients/http.rb index c89679f63b5..c5f12d8c2ba 100644 --- a/lib/bulk_imports/clients/http.rb +++ b/lib/bulk_imports/clients/http.rb @@ -2,7 +2,7 @@ module BulkImports module Clients - class Http + class HTTP API_VERSION = 'v4' DEFAULT_PAGE = 1 DEFAULT_PER_PAGE = 30 @@ -18,25 +18,19 @@ module BulkImports end def get(resource, query = {}) - with_error_handling do - Gitlab::HTTP.get( - resource_url(resource), - headers: request_headers, - follow_redirects: false, - query: query.reverse_merge(request_query) - ) - end + request(:get, resource, query: query.reverse_merge(request_query)) end def post(resource, body = {}) - with_error_handling do - Gitlab::HTTP.post( - resource_url(resource), - headers: request_headers, - follow_redirects: false, - body: body - ) - end + request(:post, resource, body: body) + end + + def head(resource) + request(:head, resource) + end + + def stream(resource, &block) + request(:get, resource, stream_body: true, &block) end def each_page(method, resource, query = {}, &block) @@ -55,8 +49,36 @@ module BulkImports end end + def resource_url(resource) + Gitlab::Utils.append_path(api_url, resource) + end + private + # rubocop:disable GitlabSecurity/PublicSend + def request(method, resource, options = {}, &block) + with_error_handling do + Gitlab::HTTP.public_send( + method, + resource_url(resource), + request_options(options), + &block + ) + end + end + # rubocop:enable GitlabSecurity/PublicSend + + def request_options(options) + default_options.merge(options) + end + + def default_options + { + headers: request_headers, + follow_redirects: false + } + end + def request_query { page: @page, @@ -88,10 +110,6 @@ module BulkImports def api_url Gitlab::Utils.append_path(base_uri, "/api/#{@api_version}") end - - def resource_url(resource) - Gitlab::Utils.append_path(api_url, resource) - end end end end diff --git a/lib/bulk_imports/common/extractors/ndjson_extractor.rb b/lib/bulk_imports/common/extractors/ndjson_extractor.rb new file mode 100644 index 00000000000..79d626001a0 --- /dev/null +++ b/lib/bulk_imports/common/extractors/ndjson_extractor.rb @@ -0,0 +1,68 @@ +# frozen_string_literal: true + +module BulkImports + module Common + module Extractors + class NdjsonExtractor + include Gitlab::ImportExport::CommandLineUtil + include Gitlab::Utils::StrongMemoize + + EXPORT_DOWNLOAD_URL_PATH = "/%{resource}/%{full_path}/export_relations/download?relation=%{relation}" + + def initialize(relation:) + @relation = relation + @tmp_dir = Dir.mktmpdir + end + + def extract(context) + download_service(tmp_dir, context).execute + decompression_service(tmp_dir).execute + relations = ndjson_reader(tmp_dir).consume_relation('', relation) + + BulkImports::Pipeline::ExtractedData.new(data: relations) + end + + def remove_tmp_dir + FileUtils.remove_entry(tmp_dir) + end + + private + + attr_reader :relation, :tmp_dir + + def filename + @filename ||= "#{relation}.ndjson.gz" + end + + def download_service(tmp_dir, context) + @download_service ||= BulkImports::FileDownloadService.new( + configuration: context.configuration, + relative_url: relative_resource_url(context), + dir: tmp_dir, + filename: filename + ) + end + + def decompression_service(tmp_dir) + @decompression_service ||= BulkImports::FileDecompressionService.new( + dir: tmp_dir, + filename: filename + ) + end + + def ndjson_reader(tmp_dir) + @ndjson_reader ||= Gitlab::ImportExport::Json::NdjsonReader.new(tmp_dir) + end + + def relative_resource_url(context) + strong_memoize(:relative_resource_url) do + resource = context.portable.class.name.downcase.pluralize + encoded_full_path = context.entity.encoded_source_full_path + + EXPORT_DOWNLOAD_URL_PATH % { resource: resource, full_path: encoded_full_path, relation: relation } + end + end + end + end + end +end diff --git a/lib/bulk_imports/common/extractors/rest_extractor.rb b/lib/bulk_imports/common/extractors/rest_extractor.rb index b18e27fd475..2179e0575c5 100644 --- a/lib/bulk_imports/common/extractors/rest_extractor.rb +++ b/lib/bulk_imports/common/extractors/rest_extractor.rb @@ -24,7 +24,7 @@ module BulkImports attr_reader :query def http_client(configuration) - @http_client ||= BulkImports::Clients::Http.new( + @http_client ||= BulkImports::Clients::HTTP.new( uri: configuration.url, token: configuration.access_token, per_page: 100 diff --git a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb index e5e2b9fdbd4..db5882d49a9 100644 --- a/lib/bulk_imports/groups/extractors/subgroups_extractor.rb +++ b/lib/bulk_imports/groups/extractors/subgroups_extractor.rb @@ -17,7 +17,7 @@ module BulkImports private def http_client(configuration) - @http_client ||= BulkImports::Clients::Http.new( + @http_client ||= BulkImports::Clients::HTTP.new( uri: configuration.url, token: configuration.access_token, per_page: 100 diff --git a/lib/bulk_imports/groups/graphql/get_labels_query.rb b/lib/bulk_imports/groups/graphql/get_labels_query.rb deleted file mode 100644 index f957cf0be52..00000000000 --- a/lib/bulk_imports/groups/graphql/get_labels_query.rb +++ /dev/null @@ -1,53 +0,0 @@ -# frozen_string_literal: true - -module BulkImports - module Groups - module Graphql - module GetLabelsQuery - extend self - - def to_s - <<-'GRAPHQL' - query ($full_path: ID!, $cursor: String, $per_page: Int) { - group(fullPath: $full_path) { - labels(first: $per_page, after: $cursor, onlyGroupLabels: true) { - page_info: pageInfo { - next_page: endCursor - has_next_page: hasNextPage - } - nodes { - title - description - color - created_at: createdAt - updated_at: updatedAt - } - } - } - } - GRAPHQL - end - - def variables(context) - { - full_path: context.entity.source_full_path, - cursor: context.tracker.next_page, - per_page: ::BulkImports::Tracker::DEFAULT_PAGE_SIZE - } - end - - def base_path - %w[data group labels] - end - - def data_path - base_path << 'nodes' - end - - def page_info_path - base_path << 'page_info' - end - end - end - end -end diff --git a/lib/bulk_imports/groups/pipelines/boards_pipeline.rb b/lib/bulk_imports/groups/pipelines/boards_pipeline.rb new file mode 100644 index 00000000000..08a0a4abc9f --- /dev/null +++ b/lib/bulk_imports/groups/pipelines/boards_pipeline.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module BulkImports + module Groups + module Pipelines + class BoardsPipeline + include NdjsonPipeline + + relation_name 'boards' + + extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: relation + end + end + end +end diff --git a/lib/bulk_imports/groups/pipelines/entity_finisher.rb b/lib/bulk_imports/groups/pipelines/entity_finisher.rb index 1d237bc0f7f..1a709179bf9 100644 --- a/lib/bulk_imports/groups/pipelines/entity_finisher.rb +++ b/lib/bulk_imports/groups/pipelines/entity_finisher.rb @@ -4,31 +4,45 @@ module BulkImports module Groups module Pipelines class EntityFinisher + def self.ndjson_pipeline? + false + end + def initialize(context) @context = context + @entity = @context.entity + @trackers = @entity.trackers end def run - return if context.entity.finished? + return if entity.finished? || entity.failed? - context.entity.finish! + if all_other_trackers_failed? + entity.fail_op! + else + entity.finish! + end logger.info( bulk_import_id: context.bulk_import.id, bulk_import_entity_id: context.entity.id, bulk_import_entity_type: context.entity.source_type, pipeline_class: self.class.name, - message: 'Entity finished' + message: "Entity #{entity.status_name}" ) end private - attr_reader :context + attr_reader :context, :entity, :trackers def logger @logger ||= Gitlab::Import::Logger.build end + + def all_other_trackers_failed? + trackers.where.not(relation: self.class.name).all? { |tracker| tracker.failed? } # rubocop: disable CodeReuse/ActiveRecord + end end end end diff --git a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb index 0dc4a968b84..1dd74c10b65 100644 --- a/lib/bulk_imports/groups/pipelines/labels_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/labels_pipeline.rb @@ -4,16 +4,11 @@ module BulkImports module Groups module Pipelines class LabelsPipeline - include Pipeline + include NdjsonPipeline - extractor BulkImports::Common::Extractors::GraphqlExtractor, - query: BulkImports::Groups::Graphql::GetLabelsQuery + relation_name 'labels' - transformer Common::Transformers::ProhibitedAttributesTransformer - - def load(context, data) - Labels::CreateService.new(data).execute(group: context.group) - end + extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: relation end end end diff --git a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb index 9b2be30735c..b2bd14952e7 100644 --- a/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb +++ b/lib/bulk_imports/groups/pipelines/milestones_pipeline.rb @@ -4,26 +4,11 @@ module BulkImports module Groups module Pipelines class MilestonesPipeline - include Pipeline + include NdjsonPipeline - extractor BulkImports::Common::Extractors::GraphqlExtractor, - query: BulkImports::Groups::Graphql::GetMilestonesQuery + relation_name 'milestones' - transformer Common::Transformers::ProhibitedAttributesTransformer - - def load(context, data) - return unless data - - raise ::BulkImports::Pipeline::NotAllowedError unless authorized? - - context.group.milestones.create!(data) - end - - private - - def authorized? - context.current_user.can?(:admin_milestone, context.group) - end + extractor ::BulkImports::Common::Extractors::NdjsonExtractor, relation: relation end end end diff --git a/lib/bulk_imports/ndjson_pipeline.rb b/lib/bulk_imports/ndjson_pipeline.rb new file mode 100644 index 00000000000..2de06bbcb88 --- /dev/null +++ b/lib/bulk_imports/ndjson_pipeline.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +module BulkImports + module NdjsonPipeline + extend ActiveSupport::Concern + + include Pipeline + + included do + ndjson_pipeline! + + def transform(context, data) + relation_hash, relation_index = data + relation_definition = import_export_config.top_relation_tree(relation) + + deep_transform_relation!(relation_hash, relation, relation_definition) do |key, hash| + Gitlab::ImportExport::Group::RelationFactory.create( + relation_index: relation_index, + relation_sym: key.to_sym, + relation_hash: hash, + importable: context.portable, + members_mapper: members_mapper, + object_builder: object_builder, + user: context.current_user, + excluded_keys: import_export_config.relation_excluded_keys(key) + ) + end + end + + def load(_, object) + return unless object + + object.save! unless object.persisted? + end + + def deep_transform_relation!(relation_hash, relation_key, relation_definition, &block) + relation_key = relation_key_override(relation_key) + + relation_definition.each do |sub_relation_key, sub_relation_definition| + sub_relation = relation_hash[sub_relation_key] + + next unless sub_relation + + current_item = + if sub_relation.is_a?(Array) + sub_relation + .map { |entry| deep_transform_relation!(entry, sub_relation_key, sub_relation_definition, &block) } + .tap { |entry| entry.compact! } + .presence + else + deep_transform_relation!(sub_relation, sub_relation_key, sub_relation_definition, &block) + end + + if current_item + relation_hash[sub_relation_key] = current_item + else + relation_hash.delete(sub_relation_key) + end + end + + yield(relation_key, relation_hash) + end + + def after_run(_) + extractor.remove_tmp_dir if extractor.respond_to?(:remove_tmp_dir) + end + + def relation_class(relation_key) + relation_key.to_s.classify.constantize + rescue NameError + relation_key.to_s.constantize + end + + def relation_key_override(relation_key) + relation_key_overrides[relation_key.to_sym]&.to_s || relation_key + end + + def relation_key_overrides + "Gitlab::ImportExport::#{portable.class}::RelationFactory::OVERRIDES".constantize + end + + def object_builder + "Gitlab::ImportExport::#{portable.class}::ObjectBuilder".constantize + end + + def relation + self.class.relation + end + + def members_mapper + @members_mapper ||= Gitlab::ImportExport::MembersMapper.new( + exported_members: [], + user: current_user, + importable: portable + ) + end + end + end +end diff --git a/lib/bulk_imports/pipeline.rb b/lib/bulk_imports/pipeline.rb index df4f020d6b2..f27818dae18 100644 --- a/lib/bulk_imports/pipeline.rb +++ b/lib/bulk_imports/pipeline.rb @@ -8,8 +8,11 @@ module BulkImports include Runner NotAllowedError = Class.new(StandardError) + ExpiredError = Class.new(StandardError) + FailedError = Class.new(StandardError) CACHE_KEY_EXPIRATION = 2.hours + NDJSON_EXPORT_TIMEOUT = 30.minutes def initialize(context) @context = context @@ -19,6 +22,18 @@ module BulkImports @tracker ||= context.tracker end + def portable + @portable ||= context.portable + end + + def import_export_config + @import_export_config ||= context.import_export_config + end + + def current_user + @current_user ||= context.current_user + end + included do private @@ -111,7 +126,7 @@ module BulkImports options = class_config[:options] if options - class_config[:klass].new(class_config[:options]) + class_config[:klass].new(**class_config[:options]) else class_config[:klass].new end @@ -155,6 +170,22 @@ module BulkImports class_attributes[:abort_on_failure] end + def ndjson_pipeline! + class_attributes[:ndjson_pipeline] = true + end + + def ndjson_pipeline? + class_attributes[:ndjson_pipeline] + end + + def relation_name(name) + class_attributes[:relation_name] = name + end + + def relation + class_attributes[:relation_name] + end + private def add_attribute(sym, klass, options) diff --git a/lib/bulk_imports/pipeline/context.rb b/lib/bulk_imports/pipeline/context.rb index 3c69c729f36..d753f888671 100644 --- a/lib/bulk_imports/pipeline/context.rb +++ b/lib/bulk_imports/pipeline/context.rb @@ -16,6 +16,14 @@ module BulkImports @entity ||= tracker.entity end + def portable + @portable ||= entity.group || entity.project + end + + def import_export_config + @import_export_config ||= ::BulkImports::FileTransfer.config_for(portable) + end + def group @group ||= entity.group end diff --git a/lib/bulk_imports/pipeline/extracted_data.rb b/lib/bulk_imports/pipeline/extracted_data.rb index c9e54b61dd3..0b36c068298 100644 --- a/lib/bulk_imports/pipeline/extracted_data.rb +++ b/lib/bulk_imports/pipeline/extracted_data.rb @@ -6,7 +6,7 @@ module BulkImports attr_reader :data def initialize(data: nil, page_info: {}) - @data = Array.wrap(data) + @data = data.is_a?(Enumerator) ? data : Array.wrap(data) @page_info = page_info end diff --git a/lib/bulk_imports/stage.rb b/lib/bulk_imports/stage.rb index 35b77240ea7..bc7fc14b5a0 100644 --- a/lib/bulk_imports/stage.rb +++ b/lib/bulk_imports/stage.rb @@ -29,9 +29,13 @@ module BulkImports pipeline: BulkImports::Groups::Pipelines::BadgesPipeline, stage: 1 }, + boards: { + pipeline: BulkImports::Groups::Pipelines::BoardsPipeline, + stage: 2 + }, finisher: { pipeline: BulkImports::Groups::Pipelines::EntityFinisher, - stage: 2 + stage: 3 } }.freeze |