summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDouwe Maan <douwe@gitlab.com>2016-12-02 04:32:23 +0000
committerDouwe Maan <douwe@gitlab.com>2016-12-02 04:32:23 +0000
commitec4e7d9a9f7f3526a15905c3fb5d851e4c173349 (patch)
treea67d8d342dc0e317a38014fb1c93de771bb137db
parent71ba28e43967b5eaf0499c2a93c0b1d4f78a35f5 (diff)
parent6b4d33566f5f434cc86381a4a1347e42bbe348ee (diff)
downloadgitlab-ce-ec4e7d9a9f7f3526a15905c3fb5d851e4c173349.tar.gz
Merge branch 'process-commit-worker-improvements' into 'master'
Pass commit data to ProcessCommitWorker This changes `ProcessCommitWorker` so that it takes a Hash containing commit data instead of a commit SHA. This means the worker doesn't have to access Git just to process a commit message (and other data it may use). This in turn should solve the problem of ending up with 15 000-something jobs in the `process_commit` queue that take forever to process. See merge request !7744
-rw-r--r--app/models/commit.rb4
-rw-r--r--app/services/git_push_service.rb2
-rw-r--r--app/workers/process_commit_worker.rb25
-rw-r--r--changelogs/unreleased/process-commit-worker-improvements.yml4
-rw-r--r--db/migrate/20161124141322_migrate_process_commit_worker_jobs.rb92
-rw-r--r--spec/lib/gitlab/cycle_analytics/events_spec.rb2
-rw-r--r--spec/migrations/migrate_process_commit_worker_jobs_spec.rb194
-rw-r--r--spec/models/commit_spec.rb17
-rw-r--r--spec/requests/projects/cycle_analytics_events_spec.rb2
-rw-r--r--spec/services/git_push_service_spec.rb6
-rw-r--r--spec/spec_helper.rb4
-rw-r--r--spec/workers/process_commit_worker_spec.rb29
12 files changed, 356 insertions, 25 deletions
diff --git a/app/models/commit.rb b/app/models/commit.rb
index 9e7fde9503d..176c524cf7b 100644
--- a/app/models/commit.rb
+++ b/app/models/commit.rb
@@ -48,6 +48,10 @@ class Commit
max_lines: DIFF_HARD_LIMIT_LINES,
}
end
+
+ def from_hash(hash, project)
+ new(Gitlab::Git::Commit.new(hash), project)
+ end
end
attr_accessor :raw
diff --git a/app/services/git_push_service.rb b/app/services/git_push_service.rb
index 647930d555c..185556c12cc 100644
--- a/app/services/git_push_service.rb
+++ b/app/services/git_push_service.rb
@@ -135,7 +135,7 @@ class GitPushService < BaseService
@push_commits.each do |commit|
ProcessCommitWorker.
- perform_async(project.id, current_user.id, commit.id, default)
+ perform_async(project.id, current_user.id, commit.to_hash, default)
end
end
diff --git a/app/workers/process_commit_worker.rb b/app/workers/process_commit_worker.rb
index 071741fbacd..e9a5bd7f24e 100644
--- a/app/workers/process_commit_worker.rb
+++ b/app/workers/process_commit_worker.rb
@@ -10,9 +10,10 @@ class ProcessCommitWorker
# project_id - The ID of the project this commit belongs to.
# user_id - The ID of the user that pushed the commit.
- # commit_sha - The SHA1 of the commit to process.
+ # commit_hash - Hash containing commit details to use for constructing a
+ # Commit object without having to use the Git repository.
# default - The data was pushed to the default branch.
- def perform(project_id, user_id, commit_sha, default = false)
+ def perform(project_id, user_id, commit_hash, default = false)
project = Project.find_by(id: project_id)
return unless project
@@ -21,10 +22,7 @@ class ProcessCommitWorker
return unless user
- commit = find_commit(project, commit_sha)
-
- return unless commit
-
+ commit = build_commit(project, commit_hash)
author = commit.author || user
process_commit_message(project, commit, user, author, default)
@@ -59,9 +57,18 @@ class ProcessCommitWorker
update_all(first_mentioned_in_commit_at: commit.committed_date)
end
- private
+ def build_commit(project, hash)
+ date_suffix = '_date'
+
+ # When processing Sidekiq payloads various timestamps are stored as Strings.
+ # Commit in turn expects Time-like instances upon input, so we have to
+ # manually parse these values.
+ hash.each do |key, value|
+ if key.to_s.end_with?(date_suffix) && value.is_a?(String)
+ hash[key] = Time.parse(value)
+ end
+ end
- def find_commit(project, sha)
- project.commit(sha)
+ Commit.from_hash(hash, project)
end
end
diff --git a/changelogs/unreleased/process-commit-worker-improvements.yml b/changelogs/unreleased/process-commit-worker-improvements.yml
new file mode 100644
index 00000000000..0038c6e34e6
--- /dev/null
+++ b/changelogs/unreleased/process-commit-worker-improvements.yml
@@ -0,0 +1,4 @@
+---
+title: Pass commit data to ProcessCommitWorker to reduce Git overhead
+merge_request: 7744
+author:
diff --git a/db/migrate/20161124141322_migrate_process_commit_worker_jobs.rb b/db/migrate/20161124141322_migrate_process_commit_worker_jobs.rb
new file mode 100644
index 00000000000..453a44e271a
--- /dev/null
+++ b/db/migrate/20161124141322_migrate_process_commit_worker_jobs.rb
@@ -0,0 +1,92 @@
+# See http://doc.gitlab.com/ce/development/migration_style_guide.html
+# for more information on how to write migrations for GitLab.
+
+class MigrateProcessCommitWorkerJobs < ActiveRecord::Migration
+ include Gitlab::Database::MigrationHelpers
+
+ class Project < ActiveRecord::Base
+ def self.find_including_path(id)
+ select("projects.*, CONCAT(namespaces.path, '/', projects.path) AS path_with_namespace").
+ joins('INNER JOIN namespaces ON namespaces.id = projects.namespace_id').
+ find_by(id: id)
+ end
+
+ def repository_storage_path
+ Gitlab.config.repositories.storages[repository_storage]
+ end
+
+ def repository_path
+ File.join(repository_storage_path, read_attribute(:path_with_namespace) + '.git')
+ end
+
+ def repository
+ @repository ||= Rugged::Repository.new(repository_path)
+ end
+ end
+
+ DOWNTIME = true
+ DOWNTIME_REASON = 'Existing workers will error until they are using a newer version of the code'
+
+ disable_ddl_transaction!
+
+ def up
+ Sidekiq.redis do |redis|
+ new_jobs = []
+
+ while job = redis.lpop('queue:process_commit')
+ payload = JSON.load(job)
+ project = Project.find_including_path(payload['args'][0])
+
+ next unless project
+
+ begin
+ commit = project.repository.lookup(payload['args'][2])
+ rescue Rugged::OdbError
+ next
+ end
+
+ hash = {
+ id: commit.oid,
+ message: commit.message,
+ parent_ids: commit.parent_ids,
+ authored_date: commit.author[:time],
+ author_name: commit.author[:name],
+ author_email: commit.author[:email],
+ committed_date: commit.committer[:time],
+ committer_email: commit.committer[:email],
+ committer_name: commit.committer[:name]
+ }
+
+ payload['args'][2] = hash
+
+ new_jobs << JSON.dump(payload)
+ end
+
+ redis.multi do |multi|
+ new_jobs.each do |j|
+ multi.lpush('queue:process_commit', j)
+ end
+ end
+ end
+ end
+
+ def down
+ Sidekiq.redis do |redis|
+ new_jobs = []
+
+ while job = redis.lpop('queue:process_commit')
+ payload = JSON.load(job)
+
+ payload['args'][2] = payload['args'][2]['id']
+
+ new_jobs << JSON.dump(payload)
+ end
+
+ redis.multi do |multi|
+ new_jobs.each do |j|
+ multi.lpush('queue:process_commit', j)
+ end
+ end
+ end
+ end
+end
diff --git a/spec/lib/gitlab/cycle_analytics/events_spec.rb b/spec/lib/gitlab/cycle_analytics/events_spec.rb
index 9aeaa6b3ee8..6062e7af4f5 100644
--- a/spec/lib/gitlab/cycle_analytics/events_spec.rb
+++ b/spec/lib/gitlab/cycle_analytics/events_spec.rb
@@ -321,6 +321,6 @@ describe Gitlab::CycleAnalytics::Events do
context.update(milestone: milestone)
mr = create_merge_request_closing_issue(context)
- ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha)
+ ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash)
end
end
diff --git a/spec/migrations/migrate_process_commit_worker_jobs_spec.rb b/spec/migrations/migrate_process_commit_worker_jobs_spec.rb
new file mode 100644
index 00000000000..52428547a9f
--- /dev/null
+++ b/spec/migrations/migrate_process_commit_worker_jobs_spec.rb
@@ -0,0 +1,194 @@
+require 'spec_helper'
+require Rails.root.join('db', 'migrate', '20161124141322_migrate_process_commit_worker_jobs.rb')
+
+describe MigrateProcessCommitWorkerJobs do
+ let(:project) { create(:project) }
+ let(:user) { create(:user) }
+ let(:commit) { project.commit.raw.raw_commit }
+
+ describe 'Project' do
+ describe 'find_including_path' do
+ it 'returns Project instances' do
+ expect(described_class::Project.find_including_path(project.id)).
+ to be_an_instance_of(described_class::Project)
+ end
+
+ it 'selects the full path for every Project' do
+ migration_project = described_class::Project.
+ find_including_path(project.id)
+
+ expect(migration_project[:path_with_namespace]).
+ to eq(project.path_with_namespace)
+ end
+ end
+
+ describe '#repository_storage_path' do
+ it 'returns the storage path for the repository' do
+ migration_project = described_class::Project.
+ find_including_path(project.id)
+
+ expect(File.directory?(migration_project.repository_storage_path)).
+ to eq(true)
+ end
+ end
+
+ describe '#repository_path' do
+ it 'returns the path to the repository' do
+ migration_project = described_class::Project.
+ find_including_path(project.id)
+
+ expect(File.directory?(migration_project.repository_path)).to eq(true)
+ end
+ end
+
+ describe '#repository' do
+ it 'returns a Rugged::Repository' do
+ migration_project = described_class::Project.
+ find_including_path(project.id)
+
+ expect(migration_project.repository).
+ to be_an_instance_of(Rugged::Repository)
+ end
+ end
+ end
+
+ describe '#up', :redis do
+ let(:migration) { described_class.new }
+
+ def job_count
+ Sidekiq.redis { |r| r.llen('queue:process_commit') }
+ end
+
+ before do
+ Sidekiq.redis do |redis|
+ job = JSON.dump(args: [project.id, user.id, commit.oid])
+ redis.lpush('queue:process_commit', job)
+ end
+ end
+
+ it 'skips jobs using a project that no longer exists' do
+ allow(described_class::Project).to receive(:find_including_path).
+ with(project.id).
+ and_return(nil)
+
+ migration.up
+
+ expect(job_count).to eq(0)
+ end
+
+ it 'skips jobs using commits that no longer exist' do
+ allow_any_instance_of(Rugged::Repository).to receive(:lookup).
+ with(commit.oid).
+ and_raise(Rugged::OdbError)
+
+ migration.up
+
+ expect(job_count).to eq(0)
+ end
+
+ it 'inserts migrated jobs back into the queue' do
+ migration.up
+
+ expect(job_count).to eq(1)
+ end
+
+ context 'a migrated job' do
+ let(:job) do
+ migration.up
+
+ JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') })
+ end
+
+ let(:commit_hash) do
+ job['args'][2]
+ end
+
+ it 'includes the project ID' do
+ expect(job['args'][0]).to eq(project.id)
+ end
+
+ it 'includes the user ID' do
+ expect(job['args'][1]).to eq(user.id)
+ end
+
+ it 'includes the commit ID' do
+ expect(commit_hash['id']).to eq(commit.oid)
+ end
+
+ it 'includes the commit message' do
+ expect(commit_hash['message']).to eq(commit.message)
+ end
+
+ it 'includes the parent IDs' do
+ expect(commit_hash['parent_ids']).to eq(commit.parent_ids)
+ end
+
+ it 'includes the author date' do
+ expect(commit_hash['authored_date']).to eq(commit.author[:time].to_s)
+ end
+
+ it 'includes the author name' do
+ expect(commit_hash['author_name']).to eq(commit.author[:name])
+ end
+
+ it 'includes the author Email' do
+ expect(commit_hash['author_email']).to eq(commit.author[:email])
+ end
+
+ it 'includes the commit date' do
+ expect(commit_hash['committed_date']).to eq(commit.committer[:time].to_s)
+ end
+
+ it 'includes the committer name' do
+ expect(commit_hash['committer_name']).to eq(commit.committer[:name])
+ end
+
+ it 'includes the committer Email' do
+ expect(commit_hash['committer_email']).to eq(commit.committer[:email])
+ end
+ end
+ end
+
+ describe '#down', :redis do
+ let(:migration) { described_class.new }
+
+ def job_count
+ Sidekiq.redis { |r| r.llen('queue:process_commit') }
+ end
+
+ before do
+ Sidekiq.redis do |redis|
+ job = JSON.dump(args: [project.id, user.id, commit.oid])
+ redis.lpush('queue:process_commit', job)
+
+ migration.up
+ end
+ end
+
+ it 'pushes migrated jobs back into the queue' do
+ migration.down
+
+ expect(job_count).to eq(1)
+ end
+
+ context 'a migrated job' do
+ let(:job) do
+ migration.down
+
+ JSON.load(Sidekiq.redis { |r| r.lpop('queue:process_commit') })
+ end
+
+ it 'includes the project ID' do
+ expect(job['args'][0]).to eq(project.id)
+ end
+
+ it 'includes the user ID' do
+ expect(job['args'][1]).to eq(user.id)
+ end
+
+ it 'includes the commit SHA' do
+ expect(job['args'][2]).to eq(commit.oid)
+ end
+ end
+ end
+end
diff --git a/spec/models/commit_spec.rb b/spec/models/commit_spec.rb
index e3bb3482d67..7194c20d3bf 100644
--- a/spec/models/commit_spec.rb
+++ b/spec/models/commit_spec.rb
@@ -302,4 +302,21 @@ eos
expect(commit.uri_type('this/path/doesnt/exist')).to be_nil
end
end
+
+ describe '.from_hash' do
+ let(:new_commit) { described_class.from_hash(commit.to_hash, project) }
+
+ it 'returns a Commit' do
+ expect(new_commit).to be_an_instance_of(described_class)
+ end
+
+ it 'wraps a Gitlab::Git::Commit' do
+ expect(new_commit.raw).to be_an_instance_of(Gitlab::Git::Commit)
+ end
+
+ it 'stores the correct commit fields' do
+ expect(new_commit.id).to eq(commit.id)
+ expect(new_commit.message).to eq(commit.message)
+ end
+ end
end
diff --git a/spec/requests/projects/cycle_analytics_events_spec.rb b/spec/requests/projects/cycle_analytics_events_spec.rb
index 5c90fd9bad9..f5e0fdcda2d 100644
--- a/spec/requests/projects/cycle_analytics_events_spec.rb
+++ b/spec/requests/projects/cycle_analytics_events_spec.rb
@@ -135,6 +135,6 @@ describe 'cycle analytics events' do
merge_merge_requests_closing_issue(issue)
- ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.sha)
+ ProcessCommitWorker.new.perform(project.id, user.id, mr.commits.last.to_hash)
end
end
diff --git a/spec/services/git_push_service_spec.rb b/spec/services/git_push_service_spec.rb
index 9d7702f5c96..e7624e70725 100644
--- a/spec/services/git_push_service_spec.rb
+++ b/spec/services/git_push_service_spec.rb
@@ -263,7 +263,7 @@ describe GitPushService, services: true do
author_email: commit_author.email
)
- allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
+ allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
and_return(commit)
allow(project.repository).to receive(:commits_between).and_return([commit])
@@ -321,7 +321,7 @@ describe GitPushService, services: true do
committed_date: commit_time
)
- allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
+ allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
and_return(commit)
allow(project.repository).to receive(:commits_between).and_return([commit])
@@ -360,7 +360,7 @@ describe GitPushService, services: true do
allow(project.repository).to receive(:commits_between).
and_return([closing_commit])
- allow_any_instance_of(ProcessCommitWorker).to receive(:find_commit).
+ allow_any_instance_of(ProcessCommitWorker).to receive(:build_commit).
and_return(closing_commit)
project.team << [commit_author, :master]
diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb
index ef33c473d38..6ee3307512d 100644
--- a/spec/spec_helper.rb
+++ b/spec/spec_helper.rb
@@ -55,8 +55,12 @@ RSpec.configure do |config|
config.around(:each, :redis) do |example|
Gitlab::Redis.with(&:flushall)
+ Sidekiq.redis(&:flushall)
+
example.run
+
Gitlab::Redis.with(&:flushall)
+ Sidekiq.redis(&:flushall)
end
end
diff --git a/spec/workers/process_commit_worker_spec.rb b/spec/workers/process_commit_worker_spec.rb
index 3e4fee42240..75c7fc1efd2 100644
--- a/spec/workers/process_commit_worker_spec.rb
+++ b/spec/workers/process_commit_worker_spec.rb
@@ -11,31 +11,25 @@ describe ProcessCommitWorker do
it 'does not process the commit when the project does not exist' do
expect(worker).not_to receive(:close_issues)
- worker.perform(-1, user.id, commit.id)
+ worker.perform(-1, user.id, commit.to_hash)
end
it 'does not process the commit when the user does not exist' do
expect(worker).not_to receive(:close_issues)
- worker.perform(project.id, -1, commit.id)
- end
-
- it 'does not process the commit when the commit no longer exists' do
- expect(worker).not_to receive(:close_issues)
-
- worker.perform(project.id, user.id, 'this-should-does-not-exist')
+ worker.perform(project.id, -1, commit.to_hash)
end
it 'processes the commit message' do
expect(worker).to receive(:process_commit_message).and_call_original
- worker.perform(project.id, user.id, commit.id)
+ worker.perform(project.id, user.id, commit.to_hash)
end
it 'updates the issue metrics' do
expect(worker).to receive(:update_issue_metrics).and_call_original
- worker.perform(project.id, user.id, commit.id)
+ worker.perform(project.id, user.id, commit.to_hash)
end
end
@@ -106,4 +100,19 @@ describe ProcessCommitWorker do
expect(metric.first_mentioned_in_commit_at).to eq(commit.committed_date)
end
end
+
+ describe '#build_commit' do
+ it 'returns a Commit' do
+ commit = worker.build_commit(project, id: '123')
+
+ expect(commit).to be_an_instance_of(Commit)
+ end
+
+ it 'parses date strings into Time instances' do
+ commit = worker.
+ build_commit(project, id: '123', authored_date: Time.now.to_s)
+
+ expect(commit.authored_date).to be_an_instance_of(Time)
+ end
+ end
end