summaryrefslogtreecommitdiff
path: root/app/workers/bulk_imports/export_request_worker.rb
diff options
context:
space:
mode:
Diffstat (limited to 'app/workers/bulk_imports/export_request_worker.rb')
-rw-r--r--app/workers/bulk_imports/export_request_worker.rb89
1 files changed, 78 insertions, 11 deletions
diff --git a/app/workers/bulk_imports/export_request_worker.rb b/app/workers/bulk_imports/export_request_worker.rb
index 0d3e4f013dd..a57071ddcf1 100644
--- a/app/workers/bulk_imports/export_request_worker.rb
+++ b/app/workers/bulk_imports/export_request_worker.rb
@@ -13,45 +13,112 @@ module BulkImports
def perform(entity_id)
entity = BulkImports::Entity.find(entity_id)
+ entity.update!(source_xid: entity_source_xid(entity)) if entity.source_xid.nil?
+
request_export(entity)
+
+ BulkImports::EntityWorker.perform_async(entity_id)
rescue BulkImports::NetworkError => e
- log_export_failure(e, entity)
+ if e.retriable?(entity)
+ retry_request(e, entity)
+ else
+ log_export_failure(e, entity)
- entity.fail_op!
+ entity.fail_op!
+ end
end
private
def request_export(entity)
- http_client(entity.bulk_import.configuration).post(entity.export_relations_url_path)
+ http_client(entity).post(entity.export_relations_url_path)
end
- def http_client(configuration)
+ def http_client(entity)
@client ||= Clients::HTTP.new(
- url: configuration.url,
- token: configuration.access_token
+ url: entity.bulk_import.configuration.url,
+ token: entity.bulk_import.configuration.access_token
)
end
def log_export_failure(exception, entity)
- attributes = {
+ Gitlab::Import::Logger.error(
+ structured_payload(
+ log_attributes(exception, entity).merge(
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ message: "Request to export #{entity.source_type} failed",
+ importer: 'gitlab_migration'
+ )
+ )
+ )
+
+ BulkImports::Failure.create(log_attributes(exception, entity))
+ end
+
+ def log_attributes(exception, entity)
+ {
bulk_import_entity_id: entity.id,
pipeline_class: 'ExportRequestWorker',
exception_class: exception.class.to_s,
exception_message: exception.message.truncate(255),
correlation_id_value: Labkit::Correlation::CorrelationId.current_or_new_id
}
+ end
+
+ def graphql_client(entity)
+ @graphql_client ||= BulkImports::Clients::Graphql.new(
+ url: entity.bulk_import.configuration.url,
+ token: entity.bulk_import.configuration.access_token
+ )
+ end
+
+ def entity_source_xid(entity)
+ query = entity_query(entity)
+ client = graphql_client(entity)
+
+ response = client.execute(
+ client.parse(query.to_s),
+ { full_path: entity.source_full_path }
+ ).original_hash
+
+ ::GlobalID.parse(response.dig(*query.data_path, 'id')).model_id
+ rescue StandardError => e
+ Gitlab::Import::Logger.error(
+ structured_payload(
+ log_attributes(e, entity).merge(
+ message: 'Failed to fetch source entity id',
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ importer: 'gitlab_migration'
+ )
+ )
+ )
+
+ nil
+ end
+
+ def entity_query(entity)
+ if entity.group?
+ BulkImports::Groups::Graphql::GetGroupQuery.new(context: nil)
+ else
+ BulkImports::Projects::Graphql::GetProjectQuery.new(context: nil)
+ end
+ end
+ def retry_request(exception, entity)
Gitlab::Import::Logger.error(
structured_payload(
- attributes.merge(
- bulk_import_id: entity.bulk_import.id,
- bulk_import_entity_type: entity.source_type
+ log_attributes(exception, entity).merge(
+ message: 'Retrying export request',
+ bulk_import_id: entity.bulk_import_id,
+ bulk_import_entity_type: entity.source_type,
+ importer: 'gitlab_migration'
)
)
)
- BulkImports::Failure.create(attributes)
+ self.class.perform_in(2.seconds, entity.id)
end
end
end