summaryrefslogtreecommitdiff
path: root/db/migrate/20161124141322_migrate_process_commit_worker_jobs.rb
blob: 0772821210c94e7cda44eb16f8955105c61537cc (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
class MigrateProcessCommitWorkerJobs < ActiveRecord::Migration[4.2]
  include Gitlab::Database::MigrationHelpers

  class Repository
    attr_reader :storage

    def initialize(storage, relative_path)
      @storage = storage
      @relative_path = relative_path
    end

    def gitaly_repository
      Gitaly::Repository.new(storage_name: @storage, relative_path: @relative_path)
    end
  end

  class Project < ActiveRecord::Base
    def self.find_including_path(id)
      select("projects.*, CONCAT(namespaces.path, '/', projects.path) AS path_with_namespace")
        .joins('INNER JOIN namespaces ON namespaces.id = projects.namespace_id')
        .find_by(id: id)
    end

    def commit(rev)
      Gitlab::GitalyClient::CommitService.new(repository).find_commit(rev)
    end

    def repository
      @repository ||= Repository.new(repository_storage, read_attribute(:path_with_namespace) + '.git')
    end
  end

  DOWNTIME = true
  DOWNTIME_REASON = 'Existing workers will error until they are using a newer version of the code'

  disable_ddl_transaction!

  def up
    Sidekiq.redis do |redis|
      new_jobs = []

      while job = redis.lpop('queue:process_commit')
        payload = JSON.parse(job)
        project = Project.find_including_path(payload['args'][0])

        next unless project

        commit = project.commit(payload['args'][2])
        next unless commit

        hash = {
          id: commit.id,
          message: encode(commit.body),
          parent_ids: commit.parent_ids.to_a,
          authored_date: Time.at(commit.author.date.seconds).utc,
          author_name: encode(commit.author.name),
          author_email: encode(commit.author.email),
          committed_date: Time.at(commit.committer.date.seconds).utc,
          committer_email: encode(commit.committer.email),
          committer_name: encode(commit.committer.name)
        }

        payload['args'][2] = hash

        new_jobs << JSON.dump(payload)
      end

      redis.multi do |multi|
        new_jobs.each do |j|
          multi.lpush('queue:process_commit', j)
        end
      end
    end
  end

  def down
    Sidekiq.redis do |redis|
      new_jobs = []

      while job = redis.lpop('queue:process_commit')
        payload = JSON.parse(job)

        payload['args'][2] = payload['args'][2]['id']

        new_jobs << JSON.dump(payload)
      end

      redis.multi do |multi|
        new_jobs.each do |j|
          multi.lpush('queue:process_commit', j)
        end
      end
    end
  end

  def encode(data)
    encoding = Encoding::UTF_8

    if data.encoding == encoding
      data
    else
      data.encode(encoding, invalid: :replace, undef: :replace)
    end
  end
end