summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKamil Trzciński <ayufan@ayufan.eu>2019-06-07 15:42:39 +0200
committerKamil Trzciński <ayufan@ayufan.eu>2019-06-07 15:42:39 +0200
commit3146322c5718d5c0cb628b026265942c20a4d038 (patch)
tree14a7b8b785cb73b2df5cedcd12a7fc7efc9179bd
parente19c620c1fc92798ead536ab3415fdbd403cb39e (diff)
downloadgitlab-ce-add-merge-train-auto-merge-strategy-split-service.tar.gz
Refactor `MergeTrainService` into multiple smaller servicesadd-merge-train-auto-merge-strategy-split-service
-rw-r--r--app/services/merge_requests/create_pipeline_service.rb2
-rw-r--r--ee/app/models/merge_train.rb12
-rw-r--r--ee/app/services/auto_merge/merge_train_service.rb96
-rw-r--r--ee/app/services/merge_train/process_merge_train_service.rb107
-rw-r--r--ee/app/services/merge_train/process_merge_trains_service.rb37
-rw-r--r--ee/app/workers/merge_train/process_merge_trains_worker.rb15
6 files changed, 184 insertions, 85 deletions
diff --git a/app/services/merge_requests/create_pipeline_service.rb b/app/services/merge_requests/create_pipeline_service.rb
index 58b924c148b..2d67dfb33f0 100644
--- a/app/services/merge_requests/create_pipeline_service.rb
+++ b/app/services/merge_requests/create_pipeline_service.rb
@@ -9,6 +9,8 @@ module MergeRequests
end
def create_detached_merge_request_pipeline(merge_request)
+ raise MismatchingProjectError unless merge_request.target_project == project
+
if can_use_merge_request_ref?(merge_request)
Ci::CreatePipelineService.new(merge_request.source_project, current_user,
ref: merge_request.ref_path)
diff --git a/ee/app/models/merge_train.rb b/ee/app/models/merge_train.rb
index de323c9f677..1ea8efdb53e 100644
--- a/ee/app/models/merge_train.rb
+++ b/ee/app/models/merge_train.rb
@@ -7,19 +7,13 @@ class MergeTrain < ApplicationRecord
belongs_to :user
belongs_to :pipeline, class_name: 'Ci::Pipeline'
- after_create do |merge_train|
- run_after_commit { AutoMergeProcessWorker.perform_async(merge_train.merge_request_id) }
- end
-
- after_destroy do |merge_train|
- run_after_commit { AutoMergeProcessWorker.perform_async(merge_train.merge_request_id) }
- end
-
delegate :project, to: :merge_request
+ scope :ordered -> { order('merge_trains.id ASC') }
+
class << self
def all_in_train(merge_request)
- joined_merge_requests(merge_request).order('merge_trains.id ASC')
+ joined_merge_requests(merge_request).ordered
end
def first_in_train(merge_request)
diff --git a/ee/app/services/auto_merge/merge_train_service.rb b/ee/app/services/auto_merge/merge_train_service.rb
index 2049cdfac20..cc50fcc4d25 100644
--- a/ee/app/services/auto_merge/merge_train_service.rb
+++ b/ee/app/services/auto_merge/merge_train_service.rb
@@ -7,26 +7,38 @@ module AutoMerge
ProcessError = Class.new(StandardError)
StalePipelineError = Class.new(StandardError)
+ # enqueue to the merge train
def execute(merge_request)
- merge_request.build_merge_train(user: current_user) unless merge_request.merge_train
+ unless merge_request.merge_train
+ merge_request.build_merge_train(
+ user: current_user,
+ target_project: merge_request.target_project,
+ target_branch: merge_request.target_branch)
+ end
super do
if merge_request.saved_change_to_auto_merge_enabled?
SystemNoteService.merge_train(merge_request, project, current_user, merge_request.merge_train)
end
+
+ refresh_train(merge_request)
end
end
+ # process is validation of merge request status as part of train
def process(merge_request)
- in_lock("merge_train:#{merge_request.target_project_id}-#{merge_request.target_branch}") do
- unsafe_process(merge_request)
- end
+ refresh_train(merge_request)
end
+ # cancel of merge train, and refresh of merge train
def cancel(merge_request, reason: nil)
super(merge_request) do
- SystemNoteService.cancel_merge_train(merge_request, project, current_user, reason: reason)
+ SystemNoteService.cancel_merge_train(merge_request, project, current_user,
+ reason: reason)
+
merge_request.merge_train.destroy!
+
+ refresh_train(merge_request)
end
end
@@ -41,77 +53,9 @@ module AutoMerge
private
- def unsafe_process(merge_request)
- MergeTrain.all_in_train(merge_request).each do |merge_request|
- process_train(merge_request.merge_train)
- end
- end
-
- def process_train(merge_train)
- ensure_pipeline!(merge_train)
- validate!(merge_train)
- merge!(merge_train) if should_merge?(merge_train)
- rescue ProcessError => e
- cancel(merge_train.merge_request, reason: e.message)
- rescue StalePipelineError
- reset_pipeline(merge_train)
- end
-
- def ensure_pipeline!(merge_train)
- # NOTE: We will remove this line for running pipelines in parallel in the next iteration.
- return unless merge_train.first_in_train?
- return if merge_train.pipeline_id.present?
-
- pipeline = MergeRequests::CreatePipelineService
- .new(project, current_user, allow_duplicate: true).execute(merge_train.merge_request)
-
- unless pipeline&.latest_merge_request_pipeline?
- raise ProcessError, 'failed to create the latest pipeline for merged results'
- end
-
- merge_train.update!(pipeline: pipeline)
- end
-
- def validate!(merge_train)
- unless merge_train.project.merge_trains_enabled?
- raise ProcessError, 'project disabled merge trains'
- end
-
- unless merge_train.merge_request.mergeable?(skip_ci_check: true)
- raise ProcessError, 'merge request is not mergeable'
- end
-
- if merge_train.pipeline
- if merge_train.pipeline.complete? && !merge_train.pipeline.success?
- raise ProcessError, 'pipeline did not succeed'
- end
-
- unless merge_train.pipeline.latest_merge_request_pipeline?
- raise StalePipelineError, 'pipeline for merged results is stale'
- end
- end
- end
-
- def merge!(merge_train)
- merge_request = merge_train.merge_request
-
- MergeRequests::MergeService.new(project, current_user, merge_request.merge_params)
- .execute(merge_request)
-
- raise ProcessError, 'failed to merge' unless merge_request.merged?
-
- merge_train.destroy!
- end
-
- def should_merge?(merge_train)
- merge_train.pipeline&.success? && merge_train.first_in_train?
- end
-
- def reset_pipeline(merge_train)
- merge_train.pipeline_id = nil
- ensure_pipeline!(merge_train)
- rescue ProcessError => e
- cancel(merge_train.merge_request, reason: e.message)
+ def refresh_train(merge_request)
+ ProcessMergeTrainsWorker.perform_async(
+ merge_request.target_project_id, merge_request.target_branch)
end
end
end
diff --git a/ee/app/services/merge_train/process_merge_train_service.rb b/ee/app/services/merge_train/process_merge_train_service.rb
new file mode 100644
index 00000000000..ae2904d10f9
--- /dev/null
+++ b/ee/app/services/merge_train/process_merge_train_service.rb
@@ -0,0 +1,107 @@
+module MergeTrain
+ class ProcessMergeTrainService < ::BaseService
+ include ::Gitlab::ExclusiveLeaseHelpers
+
+ attr_reader :merge_train
+
+ ValidationError = Class.new(StandardError)
+
+ def initalize(merge_train)
+ @merge_train = merge_train
+ end
+
+ def execute
+ validate!
+ drop_stale_pipeline
+ ensure_pipeline
+ merge! if pipeline.success? && merge_train.first_in_train?
+ success
+ rescue ValidationError => e
+ error(e.message)
+ end
+
+ private
+
+ def validate!
+ unless project.merge_trains_enabled?
+ raise ValidationError, "merge trains not enabled"
+ end
+
+ unless merge_request.mergeable?(skip_ci_check: true)
+ raise ValidationError, "merge not mergeable"
+ end
+
+ unless merge_request.target_project == merge_train.target_project
+ raise ValidationError, "invalid target project"
+ end
+
+ unless merge_request.target_branch == merge_train.target_branch
+ raise ValidationError, "invalid target branch"
+ end
+
+ if pipeline&.latest_merge_request_pipeline? &&
+ pipeline.complete? &&
+ !pipeline.success?
+ raise ValidationError, "pipeline did not succeed"
+ end
+ end
+
+ def merge!
+ MergeRequests::MergeService
+ .new(merge_request.target_project, merge_train.user, merge_request.merge_params)
+ .execute(merge_request)
+ end
+
+ def drop_stale_pipeline
+ return unless merge_train&.pipeline&.latest_merge_request_pipeline?
+
+ # TODO: should we execute cancel on `pipeline_id`
+
+ merge_train.pipeline_id = nil
+ end
+
+ def ensure_pipeline!(merge_train)
+ return unless merge_train.pipeline
+
+ # TODO: this should create `refs/merge-request/train`
+ # TODO: this should call directly
+ # Ci::CreatePipelineService.new(merge_request.source_project, current_user,
+ # ref: merge_request.ref_path)
+ # .execute(:merge_request_event, merge_request: merge_request)
+
+ pipeline = MergeRequests::CreatePipelineService
+ .new(target_project, merge_user, allow_duplicate: true)
+ .execute(merge_request)
+
+ # TODO: is that needed?
+ unless pipeline&.latest_merge_request_pipeline?
+ raise ValidationError, 'failed to create the latest pipeline for merged results'
+ end
+
+ merge_train.update!(pipeline: pipeline)
+ end
+
+ def error(reason, params = {})
+ AutoMerge::MergeTrainService.new(target_project, merge_user)
+ .cancel(merge_request, reason: reason)
+
+ super
+ end
+
+ def merge_user
+ merge_train.user
+ end
+
+ def merge_request
+ merge_train.merge_request
+ end
+
+ def target_project
+ merge_request.target_project
+ end
+
+ def pipeline
+ merge_train.pipeline
+ end
+ end
+end
diff --git a/ee/app/services/merge_train/process_merge_trains_service.rb b/ee/app/services/merge_train/process_merge_trains_service.rb
new file mode 100644
index 00000000000..0a15f1cabe0
--- /dev/null
+++ b/ee/app/services/merge_train/process_merge_trains_service.rb
@@ -0,0 +1,37 @@
+module MergeTrain
+ class ProcessMergeTrainsService
+ include ::Gitlab::ExclusiveLeaseHelpers
+
+ attr_reader :project, :ref
+
+ MAX_PARALLEL_IN_TRAIN = 1
+
+ def initalize(project, ref)
+ @project, @ref = project, ref
+ end
+
+ def execute
+ in_lock(lock_key) do
+ all_in_train.find_each do |merge_train|
+ # TODO: it costs one additional query, up to MAX_PARALLEL_IN_TRAIN
+ break if merge_train.index > MAX_PARALLEL_IN_TRAIN
+
+ ProcessMergeTrainService.new(merge_train)
+ .execute
+ end
+ end
+ end
+
+ private
+
+ def all_in_train
+ MergeTrain.where(merge_request:
+ MergeRequest.where(target_project: project, target_branch: ref))
+ .ordered
+ end
+
+ def lock_key
+ "merge_train:#{project.id}-#{ref}"
+ end
+ end
+end
diff --git a/ee/app/workers/merge_train/process_merge_trains_worker.rb b/ee/app/workers/merge_train/process_merge_trains_worker.rb
new file mode 100644
index 00000000000..4a42f68d932
--- /dev/null
+++ b/ee/app/workers/merge_train/process_merge_trains_worker.rb
@@ -0,0 +1,15 @@
+# frozen_string_literal: true
+
+module MergeTrain
+ class ProcessMergeTrainsWorker
+ include ApplicationWorker
+
+ def perform(project_id, ref)
+ project = Project.find_by(id: project_id)
+ return unless project
+
+ MergeTrain::ProcessMergeTrainsService.new(
+ project, ref).execute
+ end
+ end
+end