diff options
author | Kamil Trzciński <ayufan@ayufan.eu> | 2019-06-07 15:42:39 +0200 |
---|---|---|
committer | Kamil Trzciński <ayufan@ayufan.eu> | 2019-06-07 15:42:39 +0200 |
commit | 3146322c5718d5c0cb628b026265942c20a4d038 (patch) | |
tree | 14a7b8b785cb73b2df5cedcd12a7fc7efc9179bd | |
parent | e19c620c1fc92798ead536ab3415fdbd403cb39e (diff) | |
download | gitlab-ce-add-merge-train-auto-merge-strategy-split-service.tar.gz |
Refactor `MergeTrainService` into multiple smaller servicesadd-merge-train-auto-merge-strategy-split-service
-rw-r--r-- | app/services/merge_requests/create_pipeline_service.rb | 2 | ||||
-rw-r--r-- | ee/app/models/merge_train.rb | 12 | ||||
-rw-r--r-- | ee/app/services/auto_merge/merge_train_service.rb | 96 | ||||
-rw-r--r-- | ee/app/services/merge_train/process_merge_train_service.rb | 107 | ||||
-rw-r--r-- | ee/app/services/merge_train/process_merge_trains_service.rb | 37 | ||||
-rw-r--r-- | ee/app/workers/merge_train/process_merge_trains_worker.rb | 15 |
6 files changed, 184 insertions, 85 deletions
diff --git a/app/services/merge_requests/create_pipeline_service.rb b/app/services/merge_requests/create_pipeline_service.rb index 58b924c148b..2d67dfb33f0 100644 --- a/app/services/merge_requests/create_pipeline_service.rb +++ b/app/services/merge_requests/create_pipeline_service.rb @@ -9,6 +9,8 @@ module MergeRequests end def create_detached_merge_request_pipeline(merge_request) + raise MismatchingProjectError unless merge_request.target_project == project + if can_use_merge_request_ref?(merge_request) Ci::CreatePipelineService.new(merge_request.source_project, current_user, ref: merge_request.ref_path) diff --git a/ee/app/models/merge_train.rb b/ee/app/models/merge_train.rb index de323c9f677..1ea8efdb53e 100644 --- a/ee/app/models/merge_train.rb +++ b/ee/app/models/merge_train.rb @@ -7,19 +7,13 @@ class MergeTrain < ApplicationRecord belongs_to :user belongs_to :pipeline, class_name: 'Ci::Pipeline' - after_create do |merge_train| - run_after_commit { AutoMergeProcessWorker.perform_async(merge_train.merge_request_id) } - end - - after_destroy do |merge_train| - run_after_commit { AutoMergeProcessWorker.perform_async(merge_train.merge_request_id) } - end - delegate :project, to: :merge_request + scope :ordered -> { order('merge_trains.id ASC') } + class << self def all_in_train(merge_request) - joined_merge_requests(merge_request).order('merge_trains.id ASC') + joined_merge_requests(merge_request).ordered end def first_in_train(merge_request) diff --git a/ee/app/services/auto_merge/merge_train_service.rb b/ee/app/services/auto_merge/merge_train_service.rb index 2049cdfac20..cc50fcc4d25 100644 --- a/ee/app/services/auto_merge/merge_train_service.rb +++ b/ee/app/services/auto_merge/merge_train_service.rb @@ -7,26 +7,38 @@ module AutoMerge ProcessError = Class.new(StandardError) StalePipelineError = Class.new(StandardError) + # enqueue to the merge train def execute(merge_request) - merge_request.build_merge_train(user: current_user) unless merge_request.merge_train + unless merge_request.merge_train + merge_request.build_merge_train( + user: current_user, + target_project: merge_request.target_project, + target_branch: merge_request.target_branch) + end super do if merge_request.saved_change_to_auto_merge_enabled? SystemNoteService.merge_train(merge_request, project, current_user, merge_request.merge_train) end + + refresh_train(merge_request) end end + # process is validation of merge request status as part of train def process(merge_request) - in_lock("merge_train:#{merge_request.target_project_id}-#{merge_request.target_branch}") do - unsafe_process(merge_request) - end + refresh_train(merge_request) end + # cancel of merge train, and refresh of merge train def cancel(merge_request, reason: nil) super(merge_request) do - SystemNoteService.cancel_merge_train(merge_request, project, current_user, reason: reason) + SystemNoteService.cancel_merge_train(merge_request, project, current_user, + reason: reason) + merge_request.merge_train.destroy! + + refresh_train(merge_request) end end @@ -41,77 +53,9 @@ module AutoMerge private - def unsafe_process(merge_request) - MergeTrain.all_in_train(merge_request).each do |merge_request| - process_train(merge_request.merge_train) - end - end - - def process_train(merge_train) - ensure_pipeline!(merge_train) - validate!(merge_train) - merge!(merge_train) if should_merge?(merge_train) - rescue ProcessError => e - cancel(merge_train.merge_request, reason: e.message) - rescue StalePipelineError - reset_pipeline(merge_train) - end - - def ensure_pipeline!(merge_train) - # NOTE: We will remove this line for running pipelines in parallel in the next iteration. - return unless merge_train.first_in_train? - return if merge_train.pipeline_id.present? - - pipeline = MergeRequests::CreatePipelineService - .new(project, current_user, allow_duplicate: true).execute(merge_train.merge_request) - - unless pipeline&.latest_merge_request_pipeline? - raise ProcessError, 'failed to create the latest pipeline for merged results' - end - - merge_train.update!(pipeline: pipeline) - end - - def validate!(merge_train) - unless merge_train.project.merge_trains_enabled? - raise ProcessError, 'project disabled merge trains' - end - - unless merge_train.merge_request.mergeable?(skip_ci_check: true) - raise ProcessError, 'merge request is not mergeable' - end - - if merge_train.pipeline - if merge_train.pipeline.complete? && !merge_train.pipeline.success? - raise ProcessError, 'pipeline did not succeed' - end - - unless merge_train.pipeline.latest_merge_request_pipeline? - raise StalePipelineError, 'pipeline for merged results is stale' - end - end - end - - def merge!(merge_train) - merge_request = merge_train.merge_request - - MergeRequests::MergeService.new(project, current_user, merge_request.merge_params) - .execute(merge_request) - - raise ProcessError, 'failed to merge' unless merge_request.merged? - - merge_train.destroy! - end - - def should_merge?(merge_train) - merge_train.pipeline&.success? && merge_train.first_in_train? - end - - def reset_pipeline(merge_train) - merge_train.pipeline_id = nil - ensure_pipeline!(merge_train) - rescue ProcessError => e - cancel(merge_train.merge_request, reason: e.message) + def refresh_train(merge_request) + ProcessMergeTrainsWorker.perform_async( + merge_request.target_project_id, merge_request.target_branch) end end end diff --git a/ee/app/services/merge_train/process_merge_train_service.rb b/ee/app/services/merge_train/process_merge_train_service.rb new file mode 100644 index 00000000000..ae2904d10f9 --- /dev/null +++ b/ee/app/services/merge_train/process_merge_train_service.rb @@ -0,0 +1,107 @@ +module MergeTrain + class ProcessMergeTrainService < ::BaseService + include ::Gitlab::ExclusiveLeaseHelpers + + attr_reader :merge_train + + ValidationError = Class.new(StandardError) + + def initalize(merge_train) + @merge_train = merge_train + end + + def execute + validate! + drop_stale_pipeline + ensure_pipeline + merge! if pipeline.success? && merge_train.first_in_train? + success + rescue ValidationError => e + error(e.message) + end + + private + + def validate! + unless project.merge_trains_enabled? + raise ValidationError, "merge trains not enabled" + end + + unless merge_request.mergeable?(skip_ci_check: true) + raise ValidationError, "merge not mergeable" + end + + unless merge_request.target_project == merge_train.target_project + raise ValidationError, "invalid target project" + end + + unless merge_request.target_branch == merge_train.target_branch + raise ValidationError, "invalid target branch" + end + + if pipeline&.latest_merge_request_pipeline? && + pipeline.complete? && + !pipeline.success? + raise ValidationError, "pipeline did not succeed" + end + end + + def merge! + MergeRequests::MergeService + .new(merge_request.target_project, merge_train.user, merge_request.merge_params) + .execute(merge_request) + end + + def drop_stale_pipeline + return unless merge_train&.pipeline&.latest_merge_request_pipeline? + + # TODO: should we execute cancel on `pipeline_id` + + merge_train.pipeline_id = nil + end + + def ensure_pipeline!(merge_train) + return unless merge_train.pipeline + + # TODO: this should create `refs/merge-request/train` + # TODO: this should call directly + # Ci::CreatePipelineService.new(merge_request.source_project, current_user, + # ref: merge_request.ref_path) + # .execute(:merge_request_event, merge_request: merge_request) + + pipeline = MergeRequests::CreatePipelineService + .new(target_project, merge_user, allow_duplicate: true) + .execute(merge_request) + + # TODO: is that needed? + unless pipeline&.latest_merge_request_pipeline? + raise ValidationError, 'failed to create the latest pipeline for merged results' + end + + merge_train.update!(pipeline: pipeline) + end + + def error(reason, params = {}) + AutoMerge::MergeTrainService.new(target_project, merge_user) + .cancel(merge_request, reason: reason) + + super + end + + def merge_user + merge_train.user + end + + def merge_request + merge_train.merge_request + end + + def target_project + merge_request.target_project + end + + def pipeline + merge_train.pipeline + end + end +end diff --git a/ee/app/services/merge_train/process_merge_trains_service.rb b/ee/app/services/merge_train/process_merge_trains_service.rb new file mode 100644 index 00000000000..0a15f1cabe0 --- /dev/null +++ b/ee/app/services/merge_train/process_merge_trains_service.rb @@ -0,0 +1,37 @@ +module MergeTrain + class ProcessMergeTrainsService + include ::Gitlab::ExclusiveLeaseHelpers + + attr_reader :project, :ref + + MAX_PARALLEL_IN_TRAIN = 1 + + def initalize(project, ref) + @project, @ref = project, ref + end + + def execute + in_lock(lock_key) do + all_in_train.find_each do |merge_train| + # TODO: it costs one additional query, up to MAX_PARALLEL_IN_TRAIN + break if merge_train.index > MAX_PARALLEL_IN_TRAIN + + ProcessMergeTrainService.new(merge_train) + .execute + end + end + end + + private + + def all_in_train + MergeTrain.where(merge_request: + MergeRequest.where(target_project: project, target_branch: ref)) + .ordered + end + + def lock_key + "merge_train:#{project.id}-#{ref}" + end + end +end diff --git a/ee/app/workers/merge_train/process_merge_trains_worker.rb b/ee/app/workers/merge_train/process_merge_trains_worker.rb new file mode 100644 index 00000000000..4a42f68d932 --- /dev/null +++ b/ee/app/workers/merge_train/process_merge_trains_worker.rb @@ -0,0 +1,15 @@ +# frozen_string_literal: true + +module MergeTrain + class ProcessMergeTrainsWorker + include ApplicationWorker + + def perform(project_id, ref) + project = Project.find_by(id: project_id) + return unless project + + MergeTrain::ProcessMergeTrainsService.new( + project, ref).execute + end + end +end |