diff options
Diffstat (limited to 'lib/gitlab/github_import/parallel_scheduling.rb')
-rw-r--r-- | lib/gitlab/github_import/parallel_scheduling.rb | 162 |
1 files changed, 162 insertions, 0 deletions
diff --git a/lib/gitlab/github_import/parallel_scheduling.rb b/lib/gitlab/github_import/parallel_scheduling.rb new file mode 100644 index 00000000000..d4d1357f5a3 --- /dev/null +++ b/lib/gitlab/github_import/parallel_scheduling.rb @@ -0,0 +1,162 @@ +# frozen_string_literal: true + +module Gitlab + module GithubImport + module ParallelScheduling + attr_reader :project, :client, :page_counter, :already_imported_cache_key + + # The base cache key to use for tracking already imported objects. + ALREADY_IMPORTED_CACHE_KEY = + 'github-importer/already-imported/%{project}/%{collection}'.freeze + + # project - An instance of `Project`. + # client - An instance of `Gitlab::GithubImport::Client`. + # parallel - When set to true the objects will be imported in parallel. + def initialize(project, client, parallel: true) + @project = project + @client = client + @parallel = parallel + @page_counter = PageCounter.new(project, collection_method) + @already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY % + { project: project.id, collection: collection_method } + end + + def parallel? + @parallel + end + + def execute + retval = + if parallel? + parallel_import + else + sequential_import + end + + # Once we have completed all work we can remove our "already exists" + # cache so we don't put too much pressure on Redis. + # + # We don't immediately remove it since it's technically possible for + # other instances of this job to still run, instead we set the + # expiration time to a lower value. This prevents the other jobs from + # still scheduling duplicates while. Since all work has already been + # completed those jobs will just cycle through any remaining pages while + # not scheduling anything. + Caching.expire(already_imported_cache_key, 15.minutes.to_i) + + retval + end + + # Imports all the objects in sequence in the current thread. + def sequential_import + each_object_to_import do |object| + repr = representation_class.from_api_response(object) + + importer_class.new(repr, project, client).execute + end + end + + # Imports all objects in parallel by scheduling a Sidekiq job for every + # individual object. + def parallel_import + waiter = JobWaiter.new + + each_object_to_import do |object| + repr = representation_class.from_api_response(object) + + sidekiq_worker_class + .perform_async(project.id, repr.to_hash, waiter.key) + + waiter.jobs_remaining += 1 + end + + waiter + end + + # The method that will be called for traversing through all the objects to + # import, yielding them to the supplied block. + def each_object_to_import + repo = project.import_source + + # We inject the page number here to make sure that all importers always + # start where they left off. Simply starting over wouldn't work for + # repositories with a lot of data (e.g. tens of thousands of comments). + options = collection_options.merge(page: page_counter.current) + + client.each_page(collection_method, repo, options) do |page| + # Technically it's possible that the same work is performed multiple + # times, as Sidekiq doesn't guarantee there will ever only be one + # instance of a job. In such a scenario it's possible for one job to + # have a lower page number (e.g. 5) compared to another (e.g. 10). In + # this case we skip over all the objects until we have caught up, + # reducing the number of duplicate jobs scheduled by the provided + # block. + next unless page_counter.set(page.number) + + page.objects.each do |object| + next if already_imported?(object) + + yield object + + # We mark the object as imported immediately so we don't end up + # scheduling it multiple times. + mark_as_imported(object) + end + end + end + + # Returns true if the given object has already been imported, false + # otherwise. + # + # object - The object to check. + def already_imported?(object) + id = id_for_already_imported_cache(object) + + Caching.set_includes?(already_imported_cache_key, id) + end + + # Marks the given object as "already imported". + def mark_as_imported(object) + id = id_for_already_imported_cache(object) + + Caching.set_add(already_imported_cache_key, id) + end + + # Returns the ID to use for the cache used for checking if an object has + # already been imported or not. + # + # object - The object we may want to import. + def id_for_already_imported_cache(object) + raise NotImplementedError + end + + # The class used for converting API responses to Hashes when performing + # the import. + def representation_class + raise NotImplementedError + end + + # The class to use for importing objects when importing them sequentially. + def importer_class + raise NotImplementedError + end + + # The Sidekiq worker class used for scheduling the importing of objects in + # parallel. + def sidekiq_worker_class + raise NotImplementedError + end + + # The name of the method to call to retrieve the data to import. + def collection_method + raise NotImplementedError + end + + # Any options to be passed to the method used for retrieving the data to + # import. + def collection_options + {} + end + end + end +end |