# frozen_string_literal: true module Gitlab module GithubImport module ParallelScheduling attr_reader :project, :client, :page_counter, :already_imported_cache_key # The base cache key to use for tracking already imported objects. ALREADY_IMPORTED_CACHE_KEY = 'github-importer/already-imported/%{project}/%{collection}'.freeze # project - An instance of `Project`. # client - An instance of `Gitlab::GithubImport::Client`. # parallel - When set to true the objects will be imported in parallel. def initialize(project, client, parallel: true) @project = project @client = client @parallel = parallel @page_counter = PageCounter.new(project, collection_method) @already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY % { project: project.id, collection: collection_method } end def parallel? @parallel end def execute retval = if parallel? parallel_import else sequential_import end # Once we have completed all work we can remove our "already exists" # cache so we don't put too much pressure on Redis. # # We don't immediately remove it since it's technically possible for # other instances of this job to still run, instead we set the # expiration time to a lower value. This prevents the other jobs from # still scheduling duplicates while. Since all work has already been # completed those jobs will just cycle through any remaining pages while # not scheduling anything. Caching.expire(already_imported_cache_key, 15.minutes.to_i) retval end # Imports all the objects in sequence in the current thread. def sequential_import each_object_to_import do |object| repr = representation_class.from_api_response(object) importer_class.new(repr, project, client).execute end end # Imports all objects in parallel by scheduling a Sidekiq job for every # individual object. def parallel_import waiter = JobWaiter.new each_object_to_import do |object| repr = representation_class.from_api_response(object) sidekiq_worker_class .perform_async(project.id, repr.to_hash, waiter.key) waiter.jobs_remaining += 1 end waiter end # The method that will be called for traversing through all the objects to # import, yielding them to the supplied block. def each_object_to_import repo = project.import_source # We inject the page number here to make sure that all importers always # start where they left off. Simply starting over wouldn't work for # repositories with a lot of data (e.g. tens of thousands of comments). options = collection_options.merge(page: page_counter.current) client.each_page(collection_method, repo, options) do |page| # Technically it's possible that the same work is performed multiple # times, as Sidekiq doesn't guarantee there will ever only be one # instance of a job. In such a scenario it's possible for one job to # have a lower page number (e.g. 5) compared to another (e.g. 10). In # this case we skip over all the objects until we have caught up, # reducing the number of duplicate jobs scheduled by the provided # block. next unless page_counter.set(page.number) page.objects.each do |object| next if already_imported?(object) yield object # We mark the object as imported immediately so we don't end up # scheduling it multiple times. mark_as_imported(object) end end end # Returns true if the given object has already been imported, false # otherwise. # # object - The object to check. def already_imported?(object) id = id_for_already_imported_cache(object) Caching.set_includes?(already_imported_cache_key, id) end # Marks the given object as "already imported". def mark_as_imported(object) id = id_for_already_imported_cache(object) Caching.set_add(already_imported_cache_key, id) end # Returns the ID to use for the cache used for checking if an object has # already been imported or not. # # object - The object we may want to import. def id_for_already_imported_cache(object) raise NotImplementedError end # The class used for converting API responses to Hashes when performing # the import. def representation_class raise NotImplementedError end # The class to use for importing objects when importing them sequentially. def importer_class raise NotImplementedError end # The Sidekiq worker class used for scheduling the importing of objects in # parallel. def sidekiq_worker_class raise NotImplementedError end # The name of the method to call to retrieve the data to import. def collection_method raise NotImplementedError end # Any options to be passed to the method used for retrieving the data to # import. def collection_options {} end end end end