diff options
Diffstat (limited to 'lib/bulk_imports/common')
4 files changed, 264 insertions, 3 deletions
diff --git a/lib/bulk_imports/common/extractors/graphql_extractor.rb b/lib/bulk_imports/common/extractors/graphql_extractor.rb index cde3d1cad5b..bfdc0b13603 100644 --- a/lib/bulk_imports/common/extractors/graphql_extractor.rb +++ b/lib/bulk_imports/common/extractors/graphql_extractor.rb @@ -5,15 +5,16 @@ module BulkImports module Extractors class GraphqlExtractor def initialize(options = {}) - @query = options[:query] + @query_klass = options[:query] end def extract(context) client = graphql_client(context) + query = query_klass.new(context: context) response = client.execute( client.parse(query.to_s), - query.variables(context) + query.variables ).original_hash.deep_dup BulkImports::Pipeline::ExtractedData.new( @@ -24,7 +25,7 @@ module BulkImports private - attr_reader :query + attr_reader :query_klass def graphql_client(context) @graphql_client ||= BulkImports::Clients::Graphql.new( diff --git a/lib/bulk_imports/common/graphql/get_members_query.rb b/lib/bulk_imports/common/graphql/get_members_query.rb new file mode 100644 index 00000000000..00977f694d7 --- /dev/null +++ b/lib/bulk_imports/common/graphql/get_members_query.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +module BulkImports + module Common + module Graphql + class GetMembersQuery + attr_reader :context + + def initialize(context:) + @context = context + end + + def to_s + <<-GRAPHQL + query($full_path: ID!, $cursor: String, $per_page: Int) { + portable: #{context.entity.entity_type}(fullPath: $full_path) { + members: #{members_type}(relations: [DIRECT, INHERITED], first: $per_page, after: $cursor) { + page_info: pageInfo { + next_page: endCursor + has_next_page: hasNextPage + } + nodes { + created_at: createdAt + updated_at: updatedAt + expires_at: expiresAt + access_level: accessLevel { + integer_value: integerValue + } + user { + user_gid: id + public_email: publicEmail + } + } + } + } + } + GRAPHQL + end + + def variables + { + full_path: context.entity.source_full_path, + cursor: context.tracker.next_page, + per_page: ::BulkImports::Tracker::DEFAULT_PAGE_SIZE + } + end + + def data_path + base_path << 'nodes' + end + + def page_info_path + base_path << 'page_info' + end + + private + + def base_path + %w[data portable members] + end + + def members_type + if context.entity.group? + 'groupMembers' + else + 'projectMembers' + end + end + end + end + end +end diff --git a/lib/bulk_imports/common/pipelines/lfs_objects_pipeline.rb b/lib/bulk_imports/common/pipelines/lfs_objects_pipeline.rb new file mode 100644 index 00000000000..2e6a29f4738 --- /dev/null +++ b/lib/bulk_imports/common/pipelines/lfs_objects_pipeline.rb @@ -0,0 +1,134 @@ +# frozen_string_literal: true + +module BulkImports + module Common + module Pipelines + class LfsObjectsPipeline + include Pipeline + + def extract(_context) + download_service.execute + decompression_service.execute + extraction_service.execute + + file_paths = Dir.glob(File.join(tmpdir, '*')) + + BulkImports::Pipeline::ExtractedData.new(data: file_paths) + end + + # rubocop: disable CodeReuse/ActiveRecord + def load(_context, file_path) + Gitlab::Utils.check_path_traversal!(file_path) + Gitlab::Utils.check_allowed_absolute_path!(file_path, [Dir.tmpdir]) + + return if tar_filepath?(file_path) + return if lfs_json_filepath?(file_path) + return if File.directory?(file_path) + return if File.lstat(file_path).symlink? + + size = File.size(file_path) + oid = LfsObject.calculate_oid(file_path) + + lfs_object = LfsObject.find_or_initialize_by(oid: oid, size: size) + lfs_object.file = File.open(file_path) unless lfs_object.file&.exists? + lfs_object.save! if lfs_object.changed? + + repository_types(oid)&.each do |type| + create_lfs_objects_project(lfs_object, type) + end + end + # rubocop: enable CodeReuse/ActiveRecord + + def after_run(_) + FileUtils.remove_entry(tmpdir) if Dir.exist?(tmpdir) + end + + private + + def download_service + BulkImports::FileDownloadService.new( + configuration: context.configuration, + relative_url: context.entity.relation_download_url_path(relation), + tmpdir: tmpdir, + filename: targz_filename + ) + end + + def decompression_service + BulkImports::FileDecompressionService.new(tmpdir: tmpdir, filename: targz_filename) + end + + def extraction_service + BulkImports::ArchiveExtractionService.new(tmpdir: tmpdir, filename: tar_filename) + end + + def lfs_json + @lfs_json ||= Gitlab::Json.parse(File.read(lfs_json_filepath)) + rescue StandardError + raise BulkImports::Error, 'LFS Objects JSON read failed' + end + + def tmpdir + @tmpdir ||= Dir.mktmpdir('bulk_imports') + end + + def relation + BulkImports::FileTransfer::ProjectConfig::LFS_OBJECTS_RELATION + end + + def tar_filename + "#{relation}.tar" + end + + def targz_filename + "#{tar_filename}.gz" + end + + def lfs_json_filepath?(file_path) + file_path == lfs_json_filepath + end + + def tar_filepath?(file_path) + File.join(tmpdir, tar_filename) == file_path + end + + def lfs_json_filepath + File.join(tmpdir, "#{relation}.json") + end + + def create_lfs_objects_project(lfs_object, repository_type) + return unless allowed_repository_types.include?(repository_type) + + lfs_objects_project = LfsObjectsProject.create( + project: portable, + lfs_object: lfs_object, + repository_type: repository_type + ) + + return if lfs_objects_project.persisted? + + logger.warn( + project_id: portable.id, + message: 'Failed to save lfs objects project', + errors: lfs_objects_project.errors.full_messages.to_sentence, + **Gitlab::ApplicationContext.current + ) + end + + def repository_types(oid) + types = lfs_json[oid] + + return [] unless types + return [] unless types.is_a?(Array) + + # only return allowed repository types + types.uniq & allowed_repository_types + end + + def allowed_repository_types + @allowed_repository_types ||= LfsObjectsProject.repository_types.values.push(nil) + end + end + end + end +end diff --git a/lib/bulk_imports/common/pipelines/members_pipeline.rb b/lib/bulk_imports/common/pipelines/members_pipeline.rb new file mode 100644 index 00000000000..f35eb5ccf5e --- /dev/null +++ b/lib/bulk_imports/common/pipelines/members_pipeline.rb @@ -0,0 +1,54 @@ +# frozen_string_literal: true + +module BulkImports + module Common + module Pipelines + class MembersPipeline + include Pipeline + + transformer Common::Transformers::ProhibitedAttributesTransformer + transformer BulkImports::Groups::Transformers::MemberAttributesTransformer + + def extract(context) + graphql_extractor.extract(context) + end + + def load(_context, data) + return unless data + + user_id = data[:user_id] + + # Current user is already a member + return if user_id == current_user.id + + user_membership = existing_user_membership(user_id) + + # User is already a member with higher existing (inherited) membership + return if user_membership && user_membership[:access_level] >= data[:access_level] + + # Create new membership for any other access level + portable.members.create!(data) + end + + private + + def graphql_extractor + @graphql_extractor ||= BulkImports::Common::Extractors::GraphqlExtractor + .new(query: BulkImports::Common::Graphql::GetMembersQuery) + end + + def existing_user_membership(user_id) + members_finder.execute.find_by_user_id(user_id) + end + + def members_finder + @members_finder ||= if context.entity.group? + ::GroupMembersFinder.new(portable, current_user) + else + ::MembersFinder.new(portable, current_user) + end + end + end + end + end +end |