diff options
Diffstat (limited to 'app/services/ci')
8 files changed, 411 insertions, 110 deletions
diff --git a/app/services/ci/create_pipeline_service.rb b/app/services/ci/create_pipeline_service.rb index ce3a9eb0772..2daf3a51235 100644 --- a/app/services/ci/create_pipeline_service.rb +++ b/app/services/ci/create_pipeline_service.rb @@ -23,7 +23,7 @@ module Ci Gitlab::Ci::Pipeline::Chain::Limit::JobActivity].freeze # rubocop: disable Metrics/ParameterLists - def execute(source, ignore_skip_ci: false, save_on_errors: true, trigger_request: nil, schedule: nil, merge_request: nil, external_pull_request: nil, **options, &block) + def execute(source, ignore_skip_ci: false, save_on_errors: true, trigger_request: nil, schedule: nil, merge_request: nil, external_pull_request: nil, bridge: nil, **options, &block) @pipeline = Ci::Pipeline.new command = Gitlab::Ci::Pipeline::Chain::Command.new( @@ -46,6 +46,7 @@ module Ci current_user: current_user, push_options: params[:push_options] || {}, chat_data: params[:chat_data], + bridge: bridge, **extra_options(options)) sequence = Gitlab::Ci::Pipeline::Chain::Sequence @@ -104,14 +105,14 @@ module Ci if Feature.enabled?(:ci_support_interruptible_pipelines, project, default_enabled: true) project.ci_pipelines .where(ref: pipeline.ref) - .where.not(id: pipeline.id) + .where.not(id: pipeline.same_family_pipeline_ids) .where.not(sha: project.commit(pipeline.ref).try(:id)) .alive_or_scheduled .with_only_interruptible_builds else project.ci_pipelines .where(ref: pipeline.ref) - .where.not(id: pipeline.id) + .where.not(id: pipeline.same_family_pipeline_ids) .where.not(sha: project.commit(pipeline.ref).try(:id)) .created_or_pending end diff --git a/app/services/ci/pipeline_processing/atomic_processing_service.rb b/app/services/ci/pipeline_processing/atomic_processing_service.rb new file mode 100644 index 00000000000..1ed295f5950 --- /dev/null +++ b/app/services/ci/pipeline_processing/atomic_processing_service.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +module Ci + module PipelineProcessing + class AtomicProcessingService + include Gitlab::Utils::StrongMemoize + include ExclusiveLeaseGuard + + attr_reader :pipeline + + DEFAULT_LEASE_TIMEOUT = 1.minute + BATCH_SIZE = 20 + + def initialize(pipeline) + @pipeline = pipeline + @collection = AtomicProcessingService::StatusCollection.new(pipeline) + end + + def execute + return unless pipeline.needs_processing? + + success = try_obtain_lease { process! } + + # re-schedule if we need further processing + if success && pipeline.needs_processing? + PipelineProcessWorker.perform_async(pipeline.id) + end + + success + end + + private + + def process! + update_stages! + update_pipeline! + update_statuses_processed! + + true + end + + def update_stages! + pipeline.stages.ordered.each(&method(:update_stage!)) + end + + def update_stage!(stage) + # Update processables for a given stage in bulk/slices + ids = @collection.created_processable_ids_for_stage_position(stage.position) + ids.in_groups_of(BATCH_SIZE, false, &method(:update_processables!)) + + status = @collection.status_for_stage_position(stage.position) + stage.set_status(status) + end + + def update_processables!(ids) + created_processables = pipeline.processables.for_ids(ids) + .with_project_preload + .created + .latest + .ordered_by_stage + .select_with_aggregated_needs(project) + + created_processables.each(&method(:update_processable!)) + end + + def update_pipeline! + pipeline.set_status(@collection.status_of_all) + end + + def update_statuses_processed! + processing = @collection.processing_processables + processing.each_slice(BATCH_SIZE) do |slice| + pipeline.statuses.match_id_and_lock_version(slice) + .update_as_processed! + end + end + + def update_processable!(processable) + status = processable_status(processable) + return unless HasStatus::COMPLETED_STATUSES.include?(status) + + # transition status if possible + Gitlab::OptimisticLocking.retry_lock(processable) do |subject| + Ci::ProcessBuildService.new(project, subject.user) + .execute(subject, status) + + # update internal representation of status + # to make the status change of processable + # to be taken into account during further processing + @collection.set_processable_status( + processable.id, processable.status, processable.lock_version) + end + end + + def processable_status(processable) + if needs_names = processable.aggregated_needs_names + # Processable uses DAG, get status of all dependent needs + @collection.status_for_names(needs_names) + else + # Processable uses Stages, get status of prior stage + @collection.status_for_prior_stage_position(processable.stage_idx.to_i) + end + end + + def project + pipeline.project + end + + def lease_key + "#{super}::pipeline_id:#{pipeline.id}" + end + + def lease_timeout + DEFAULT_LEASE_TIMEOUT + end + end + end +end diff --git a/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb b/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb new file mode 100644 index 00000000000..42e38a5c80f --- /dev/null +++ b/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb @@ -0,0 +1,135 @@ +# frozen_string_literal: true + +module Ci + module PipelineProcessing + class AtomicProcessingService + class StatusCollection + include Gitlab::Utils::StrongMemoize + + attr_reader :pipeline + + # We use these columns to perform an efficient + # calculation of a status + STATUSES_COLUMNS = [ + :id, :name, :status, :allow_failure, + :stage_idx, :processed, :lock_version + ].freeze + + def initialize(pipeline) + @pipeline = pipeline + @stage_statuses = {} + @prior_stage_statuses = {} + end + + # This method updates internal status for given ID + def set_processable_status(id, status, lock_version) + processable = all_statuses_by_id[id] + return unless processable + + processable[:status] = status + processable[:lock_version] = lock_version + end + + # This methods gets composite status of all processables + def status_of_all + status_for_array(all_statuses) + end + + # This methods gets composite status for processables with given names + def status_for_names(names) + name_statuses = all_statuses_by_name.slice(*names) + + status_for_array(name_statuses.values) + end + + # This methods gets composite status for processables before given stage + def status_for_prior_stage_position(position) + strong_memoize("status_for_prior_stage_position_#{position}") do + stage_statuses = all_statuses_grouped_by_stage_position + .select { |stage_position, _| stage_position < position } + + status_for_array(stage_statuses.values.flatten) + end + end + + # This methods gets a list of processables for a given stage + def created_processable_ids_for_stage_position(current_position) + all_statuses_grouped_by_stage_position[current_position] + .to_a + .select { |processable| processable[:status] == 'created' } + .map { |processable| processable[:id] } + end + + # This methods gets composite status for processables at a given stage + def status_for_stage_position(current_position) + strong_memoize("status_for_stage_position_#{current_position}") do + stage_statuses = all_statuses_grouped_by_stage_position[current_position].to_a + + status_for_array(stage_statuses.flatten) + end + end + + # This method returns a list of all processable, that are to be processed + def processing_processables + all_statuses.lazy.reject { |status| status[:processed] } + end + + private + + def status_for_array(statuses) + result = Gitlab::Ci::Status::Composite + .new(statuses) + .status + result || 'success' + end + + def all_statuses_grouped_by_stage_position + strong_memoize(:all_statuses_by_order) do + all_statuses.group_by { |status| status[:stage_idx].to_i } + end + end + + def all_statuses_by_id + strong_memoize(:all_statuses_by_id) do + all_statuses.map do |row| + [row[:id], row] + end.to_h + end + end + + def all_statuses_by_name + strong_memoize(:statuses_by_name) do + all_statuses.map do |row| + [row[:name], row] + end.to_h + end + end + + # rubocop: disable CodeReuse/ActiveRecord + def all_statuses + # We fetch all relevant data in one go. + # + # This is more efficient than relying + # on PostgreSQL to calculate composite status + # for us + # + # Since we need to reprocess everything + # we can fetch all of them and do processing + # ourselves. + strong_memoize(:all_statuses) do + raw_statuses = pipeline + .statuses + .latest + .ordered_by_stage + .pluck(*STATUSES_COLUMNS) + + raw_statuses.map do |row| + STATUSES_COLUMNS.zip(row).to_h + end + end + end + # rubocop: enable CodeReuse/ActiveRecord + end + end + end +end diff --git a/app/services/ci/pipeline_processing/legacy_processing_service.rb b/app/services/ci/pipeline_processing/legacy_processing_service.rb new file mode 100644 index 00000000000..400dc9f0abb --- /dev/null +++ b/app/services/ci/pipeline_processing/legacy_processing_service.rb @@ -0,0 +1,119 @@ +# frozen_string_literal: true + +module Ci + module PipelineProcessing + class LegacyProcessingService + include Gitlab::Utils::StrongMemoize + + attr_reader :pipeline + + def initialize(pipeline) + @pipeline = pipeline + end + + def execute(trigger_build_ids = nil) + success = process_stages_without_needs + + # we evaluate dependent needs, + # only when the another job has finished + success = process_builds_with_needs(trigger_build_ids) || success + + @pipeline.update_legacy_status + + success + end + + private + + def process_stages_without_needs + stage_indexes_of_created_processables_without_needs.flat_map do |index| + process_stage_without_needs(index) + end.any? + end + + def process_stage_without_needs(index) + current_status = status_for_prior_stages(index) + + return unless HasStatus::COMPLETED_STATUSES.include?(current_status) + + created_processables_in_stage_without_needs(index).find_each.select do |build| + process_build(build, current_status) + end.any? + end + + def process_builds_with_needs(trigger_build_ids) + return false unless trigger_build_ids.present? + return false unless Feature.enabled?(:ci_dag_support, project, default_enabled: true) + + # we find processables that are dependent: + # 1. because of current dependency, + trigger_build_names = pipeline.processables.latest + .for_ids(trigger_build_ids).names + + # 2. does not have builds that not yet complete + incomplete_build_names = pipeline.processables.latest + .incomplete.names + + # Each found processable is guaranteed here to have completed status + created_processables + .with_needs(trigger_build_names) + .without_needs(incomplete_build_names) + .find_each + .map(&method(:process_build_with_needs)) + .any? + end + + def process_build_with_needs(build) + current_status = status_for_build_needs(build.needs.map(&:name)) + + return unless HasStatus::COMPLETED_STATUSES.include?(current_status) + + process_build(build, current_status) + end + + def process_build(build, current_status) + Gitlab::OptimisticLocking.retry_lock(build) do |subject| + Ci::ProcessBuildService.new(project, subject.user) + .execute(subject, current_status) + end + end + + def status_for_prior_stages(index) + pipeline.processables.status_for_prior_stages(index) + end + + def status_for_build_needs(needs) + pipeline.processables.status_for_names(needs) + end + + # rubocop: disable CodeReuse/ActiveRecord + def stage_indexes_of_created_processables_without_needs + created_processables_without_needs.order(:stage_idx) + .pluck(Arel.sql('DISTINCT stage_idx')) + end + # rubocop: enable CodeReuse/ActiveRecord + + def created_processables_in_stage_without_needs(index) + created_processables_without_needs + .with_preloads + .for_stage(index) + end + + def created_processables_without_needs + if Feature.enabled?(:ci_dag_support, project, default_enabled: true) + pipeline.processables.created.without_needs + else + pipeline.processables.created + end + end + + def created_processables + pipeline.processables.created + end + + def project + pipeline.project + end + end + end +end diff --git a/app/services/ci/prepare_build_service.rb b/app/services/ci/prepare_build_service.rb index 5d024c45e5f..3f87c711270 100644 --- a/app/services/ci/prepare_build_service.rb +++ b/app/services/ci/prepare_build_service.rb @@ -11,7 +11,7 @@ module Ci def execute prerequisites.each(&:complete!) - build.enqueue! + build.enqueue_preparing! rescue => e Gitlab::ErrorTracking.track_exception(e, build_id: build.id) diff --git a/app/services/ci/process_pipeline_service.rb b/app/services/ci/process_pipeline_service.rb index f33cbf7ab29..1ecef256233 100644 --- a/app/services/ci/process_pipeline_service.rb +++ b/app/services/ci/process_pipeline_service.rb @@ -2,8 +2,6 @@ module Ci class ProcessPipelineService - include Gitlab::Utils::StrongMemoize - attr_reader :pipeline def initialize(pipeline) @@ -13,104 +11,18 @@ module Ci def execute(trigger_build_ids = nil) update_retried - success = process_stages_without_needs - - # we evaluate dependent needs, - # only when the another job has finished - success = process_builds_with_needs(trigger_build_ids) || success - - @pipeline.update_status - - success - end - - private - - def process_stages_without_needs - stage_indexes_of_created_processables_without_needs.flat_map do |index| - process_stage_without_needs(index) - end.any? - end - - def process_stage_without_needs(index) - current_status = status_for_prior_stages(index) - - return unless HasStatus::COMPLETED_STATUSES.include?(current_status) - - created_processables_in_stage_without_needs(index).find_each.select do |build| - process_build(build, current_status) - end.any? - end - - def process_builds_with_needs(trigger_build_ids) - return false unless trigger_build_ids.present? - return false unless Feature.enabled?(:ci_dag_support, project, default_enabled: true) - - # we find processables that are dependent: - # 1. because of current dependency, - trigger_build_names = pipeline.processables.latest - .for_ids(trigger_build_ids).names - - # 2. does not have builds that not yet complete - incomplete_build_names = pipeline.processables.latest - .incomplete.names - - # Each found processable is guaranteed here to have completed status - created_processables - .with_needs(trigger_build_names) - .without_needs(incomplete_build_names) - .find_each - .map(&method(:process_build_with_needs)) - .any? - end - - def process_build_with_needs(build) - current_status = status_for_build_needs(build.needs.map(&:name)) - - return unless HasStatus::COMPLETED_STATUSES.include?(current_status) - - process_build(build, current_status) - end - - def process_build(build, current_status) - Gitlab::OptimisticLocking.retry_lock(build) do |subject| - Ci::ProcessBuildService.new(project, build.user) - .execute(subject, current_status) - end - end - - def status_for_prior_stages(index) - pipeline.processables.status_for_prior_stages(index) - end - - def status_for_build_needs(needs) - pipeline.processables.status_for_names(needs) - end - - # rubocop: disable CodeReuse/ActiveRecord - def stage_indexes_of_created_processables_without_needs - created_processables_without_needs.order(:stage_idx) - .pluck(Arel.sql('DISTINCT stage_idx')) - end - # rubocop: enable CodeReuse/ActiveRecord - - def created_processables_in_stage_without_needs(index) - created_processables_without_needs - .with_preloads - .for_stage(index) - end - - def created_processables_without_needs - if Feature.enabled?(:ci_dag_support, project, default_enabled: true) - pipeline.processables.created.without_needs + if Feature.enabled?(:ci_atomic_processing, pipeline.project) + Ci::PipelineProcessing::AtomicProcessingService + .new(pipeline) + .execute else - pipeline.processables.created + Ci::PipelineProcessing::LegacyProcessingService + .new(pipeline) + .execute(trigger_build_ids) end end - def created_processables - pipeline.processables.created - end + private # This method is for compatibility and data consistency and should be removed with 9.3 version of GitLab # This replicates what is db/post_migrate/20170416103934_upate_retried_for_ci_build.rb @@ -131,9 +43,5 @@ module Ci .update_all(retried: true) if latest_statuses.any? end # rubocop: enable CodeReuse/ActiveRecord - - def project - pipeline.project - end end end diff --git a/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb b/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb new file mode 100644 index 00000000000..a4bcca8e8b3 --- /dev/null +++ b/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb @@ -0,0 +1,17 @@ +# frozen_string_literal: true + +module Ci + module ResourceGroups + class AssignResourceFromResourceGroupService < ::BaseService + # rubocop: disable CodeReuse/ActiveRecord + def execute(resource_group) + free_resources = resource_group.resources.free.count + + resource_group.builds.waiting_for_resource.take(free_resources).each do |build| + build.enqueue_waiting_for_resource + end + end + # rubocop: enable CodeReuse/ActiveRecord + end + end +end diff --git a/app/services/ci/retry_build_service.rb b/app/services/ci/retry_build_service.rb index 7a5e33c61ba..1f00d54b6a7 100644 --- a/app/services/ci/retry_build_service.rb +++ b/app/services/ci/retry_build_service.rb @@ -5,13 +5,13 @@ module Ci CLONE_ACCESSORS = %i[pipeline project ref tag options name allow_failure stage stage_id stage_idx trigger_request yaml_variables when environment coverage_regex - description tag_list protected needs].freeze + description tag_list protected needs resource_group].freeze def execute(build) reprocess!(build).tap do |new_build| build.pipeline.mark_as_processable_after_stage(build.stage_idx) - new_build.enqueue! + Gitlab::OptimisticLocking.retry_lock(new_build, &:enqueue) MergeRequests::AddTodoWhenBuildFailsService .new(project, current_user) @@ -31,15 +31,17 @@ module Ci attributes.push([:user, current_user]) - build.retried = true - Ci::Build.transaction do # mark all other builds of that name as retried build.pipeline.builds.latest .where(name: build.name) - .update_all(retried: true) + .update_all(retried: true, processed: true) - create_build!(attributes) + create_build!(attributes).tap do + # mark existing object as retried/processed without a reload + build.retried = true + build.processed = true + end end end # rubocop: enable CodeReuse/ActiveRecord @@ -49,6 +51,7 @@ module Ci def create_build!(attributes) build = project.builds.new(Hash[attributes]) build.deployment = ::Gitlab::Ci::Pipeline::Seed::Deployment.new(build).to_resource + build.retried = false build.save! build end |