summaryrefslogtreecommitdiff
path: root/lib/gitlab/github_import/parallel_scheduling.rb
blob: a8c18c74d246cf2054790a3f2529f76424006b05 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
# frozen_string_literal: true

module Gitlab
  module GithubImport
    module ParallelScheduling
      attr_reader :project, :client, :page_counter, :already_imported_cache_key

      # The base cache key to use for tracking already imported objects.
      ALREADY_IMPORTED_CACHE_KEY =
        'github-importer/already-imported/%{project}/%{collection}'

      # project - An instance of `Project`.
      # client - An instance of `Gitlab::GithubImport::Client`.
      # parallel - When set to true the objects will be imported in parallel.
      def initialize(project, client, parallel: true)
        @project = project
        @client = client
        @parallel = parallel
        @page_counter = PageCounter.new(project, collection_method)
        @already_imported_cache_key = ALREADY_IMPORTED_CACHE_KEY %
          { project: project.id, collection: collection_method }
      end

      def parallel?
        @parallel
      end

      def execute
        info(project.id, message: "starting importer")

        retval =
          if parallel?
            parallel_import
          else
            sequential_import
          end

        # Once we have completed all work we can remove our "already exists"
        # cache so we don't put too much pressure on Redis.
        #
        # We don't immediately remove it since it's technically possible for
        # other instances of this job to still run, instead we set the
        # expiration time to a lower value. This prevents the other jobs from
        # still scheduling duplicates while. Since all work has already been
        # completed those jobs will just cycle through any remaining pages while
        # not scheduling anything.
        Gitlab::Cache::Import::Caching.expire(already_imported_cache_key, Gitlab::Cache::Import::Caching::SHORTER_TIMEOUT)
        info(project.id, message: "importer finished")

        retval
      rescue StandardError => e
        Gitlab::Import::ImportFailureService.track(
          project_id: project.id,
          error_source: self.class.name,
          exception: e,
          fail_import: abort_on_failure,
          metrics: true
        )

        raise(e)
      end

      # Imports all the objects in sequence in the current thread.
      def sequential_import
        each_object_to_import do |object|
          repr = representation_class.from_api_response(object, additional_object_data)

          importer_class.new(repr, project, client).execute
        end
      end

      # Imports all objects in parallel by scheduling a Sidekiq job for every
      # individual object.
      def parallel_import
        raise 'Batch settings must be defined for parallel import' if parallel_import_batch.blank?

        spread_parallel_import
      end

      def spread_parallel_import
        waiter = JobWaiter.new

        import_arguments = []

        each_object_to_import do |object|
          repr = representation_class.from_api_response(object, additional_object_data)

          import_arguments << [project.id, repr.to_hash, waiter.key]

          waiter.jobs_remaining += 1
        end

        # rubocop:disable Scalability/BulkPerformWithContext
        Gitlab::ApplicationContext.with_context(project: project) do
          sidekiq_worker_class.bulk_perform_in(
            1.second,
            import_arguments,
            batch_size: parallel_import_batch[:size],
            batch_delay: parallel_import_batch[:delay]
          )
        end
        # rubocop:enable Scalability/BulkPerformWithContext

        waiter
      end

      # The method that will be called for traversing through all the objects to
      # import, yielding them to the supplied block.
      def each_object_to_import
        repo = project.import_source

        # We inject the page number here to make sure that all importers always
        # start where they left off. Simply starting over wouldn't work for
        # repositories with a lot of data (e.g. tens of thousands of comments).
        options = collection_options.merge(page: page_counter.current)

        client.each_page(collection_method, repo, options) do |page|
          # Technically it's possible that the same work is performed multiple
          # times, as Sidekiq doesn't guarantee there will ever only be one
          # instance of a job. In such a scenario it's possible for one job to
          # have a lower page number (e.g. 5) compared to another (e.g. 10). In
          # this case we skip over all the objects until we have caught up,
          # reducing the number of duplicate jobs scheduled by the provided
          # block.
          next unless page_counter.set(page.number)

          page.objects.each do |object|
            next if already_imported?(object)

            Gitlab::GithubImport::ObjectCounter.increment(project, object_type, :fetched)

            yield object

            # We mark the object as imported immediately so we don't end up
            # scheduling it multiple times.
            mark_as_imported(object)
          end
        end
      end

      # Returns true if the given object has already been imported, false
      # otherwise.
      #
      # object - The object to check.
      def already_imported?(object)
        id = id_for_already_imported_cache(object)

        Gitlab::Cache::Import::Caching.set_includes?(already_imported_cache_key, id)
      end

      # Marks the given object as "already imported".
      def mark_as_imported(object)
        id = id_for_already_imported_cache(object)

        Gitlab::Cache::Import::Caching.set_add(already_imported_cache_key, id)
      end

      def object_type
        raise NotImplementedError
      end

      # Returns the ID to use for the cache used for checking if an object has
      # already been imported or not.
      #
      # object - The object we may want to import.
      def id_for_already_imported_cache(object)
        raise NotImplementedError
      end

      # The class used for converting API responses to Hashes when performing
      # the import.
      def representation_class
        raise NotImplementedError
      end

      # The class to use for importing objects when importing them sequentially.
      def importer_class
        raise NotImplementedError
      end

      # The Sidekiq worker class used for scheduling the importing of objects in
      # parallel.
      def sidekiq_worker_class
        raise NotImplementedError
      end

      # The name of the method to call to retrieve the data to import.
      def collection_method
        raise NotImplementedError
      end

      # Default batch settings for parallel import (can be redefined in Importer classes)
      def parallel_import_batch
        { size: 1000, delay: 1.minute }
      end

      def abort_on_failure
        false
      end

      # Any options to be passed to the method used for retrieving the data to
      # import.
      def collection_options
        {}
      end

      private

      def additional_object_data
        {}
      end

      def info(project_id, extra = {})
        Logger.info(log_attributes(project_id, extra))
      end

      def log_attributes(project_id, extra = {})
        extra.merge(
          project_id: project_id,
          importer: importer_class.name,
          parallel: parallel?
        )
      end
    end
  end
end