summaryrefslogtreecommitdiff
path: root/app/services/ci
diff options
context:
space:
mode:
Diffstat (limited to 'app/services/ci')
-rw-r--r--app/services/ci/create_pipeline_service.rb7
-rw-r--r--app/services/ci/pipeline_processing/atomic_processing_service.rb118
-rw-r--r--app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb135
-rw-r--r--app/services/ci/pipeline_processing/legacy_processing_service.rb119
-rw-r--r--app/services/ci/prepare_build_service.rb2
-rw-r--r--app/services/ci/process_pipeline_service.rb108
-rw-r--r--app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb17
-rw-r--r--app/services/ci/retry_build_service.rb15
8 files changed, 411 insertions, 110 deletions
diff --git a/app/services/ci/create_pipeline_service.rb b/app/services/ci/create_pipeline_service.rb
index ce3a9eb0772..2daf3a51235 100644
--- a/app/services/ci/create_pipeline_service.rb
+++ b/app/services/ci/create_pipeline_service.rb
@@ -23,7 +23,7 @@ module Ci
Gitlab::Ci::Pipeline::Chain::Limit::JobActivity].freeze
# rubocop: disable Metrics/ParameterLists
- def execute(source, ignore_skip_ci: false, save_on_errors: true, trigger_request: nil, schedule: nil, merge_request: nil, external_pull_request: nil, **options, &block)
+ def execute(source, ignore_skip_ci: false, save_on_errors: true, trigger_request: nil, schedule: nil, merge_request: nil, external_pull_request: nil, bridge: nil, **options, &block)
@pipeline = Ci::Pipeline.new
command = Gitlab::Ci::Pipeline::Chain::Command.new(
@@ -46,6 +46,7 @@ module Ci
current_user: current_user,
push_options: params[:push_options] || {},
chat_data: params[:chat_data],
+ bridge: bridge,
**extra_options(options))
sequence = Gitlab::Ci::Pipeline::Chain::Sequence
@@ -104,14 +105,14 @@ module Ci
if Feature.enabled?(:ci_support_interruptible_pipelines, project, default_enabled: true)
project.ci_pipelines
.where(ref: pipeline.ref)
- .where.not(id: pipeline.id)
+ .where.not(id: pipeline.same_family_pipeline_ids)
.where.not(sha: project.commit(pipeline.ref).try(:id))
.alive_or_scheduled
.with_only_interruptible_builds
else
project.ci_pipelines
.where(ref: pipeline.ref)
- .where.not(id: pipeline.id)
+ .where.not(id: pipeline.same_family_pipeline_ids)
.where.not(sha: project.commit(pipeline.ref).try(:id))
.created_or_pending
end
diff --git a/app/services/ci/pipeline_processing/atomic_processing_service.rb b/app/services/ci/pipeline_processing/atomic_processing_service.rb
new file mode 100644
index 00000000000..1ed295f5950
--- /dev/null
+++ b/app/services/ci/pipeline_processing/atomic_processing_service.rb
@@ -0,0 +1,118 @@
+# frozen_string_literal: true
+
+module Ci
+ module PipelineProcessing
+ class AtomicProcessingService
+ include Gitlab::Utils::StrongMemoize
+ include ExclusiveLeaseGuard
+
+ attr_reader :pipeline
+
+ DEFAULT_LEASE_TIMEOUT = 1.minute
+ BATCH_SIZE = 20
+
+ def initialize(pipeline)
+ @pipeline = pipeline
+ @collection = AtomicProcessingService::StatusCollection.new(pipeline)
+ end
+
+ def execute
+ return unless pipeline.needs_processing?
+
+ success = try_obtain_lease { process! }
+
+ # re-schedule if we need further processing
+ if success && pipeline.needs_processing?
+ PipelineProcessWorker.perform_async(pipeline.id)
+ end
+
+ success
+ end
+
+ private
+
+ def process!
+ update_stages!
+ update_pipeline!
+ update_statuses_processed!
+
+ true
+ end
+
+ def update_stages!
+ pipeline.stages.ordered.each(&method(:update_stage!))
+ end
+
+ def update_stage!(stage)
+ # Update processables for a given stage in bulk/slices
+ ids = @collection.created_processable_ids_for_stage_position(stage.position)
+ ids.in_groups_of(BATCH_SIZE, false, &method(:update_processables!))
+
+ status = @collection.status_for_stage_position(stage.position)
+ stage.set_status(status)
+ end
+
+ def update_processables!(ids)
+ created_processables = pipeline.processables.for_ids(ids)
+ .with_project_preload
+ .created
+ .latest
+ .ordered_by_stage
+ .select_with_aggregated_needs(project)
+
+ created_processables.each(&method(:update_processable!))
+ end
+
+ def update_pipeline!
+ pipeline.set_status(@collection.status_of_all)
+ end
+
+ def update_statuses_processed!
+ processing = @collection.processing_processables
+ processing.each_slice(BATCH_SIZE) do |slice|
+ pipeline.statuses.match_id_and_lock_version(slice)
+ .update_as_processed!
+ end
+ end
+
+ def update_processable!(processable)
+ status = processable_status(processable)
+ return unless HasStatus::COMPLETED_STATUSES.include?(status)
+
+ # transition status if possible
+ Gitlab::OptimisticLocking.retry_lock(processable) do |subject|
+ Ci::ProcessBuildService.new(project, subject.user)
+ .execute(subject, status)
+
+ # update internal representation of status
+ # to make the status change of processable
+ # to be taken into account during further processing
+ @collection.set_processable_status(
+ processable.id, processable.status, processable.lock_version)
+ end
+ end
+
+ def processable_status(processable)
+ if needs_names = processable.aggregated_needs_names
+ # Processable uses DAG, get status of all dependent needs
+ @collection.status_for_names(needs_names)
+ else
+ # Processable uses Stages, get status of prior stage
+ @collection.status_for_prior_stage_position(processable.stage_idx.to_i)
+ end
+ end
+
+ def project
+ pipeline.project
+ end
+
+ def lease_key
+ "#{super}::pipeline_id:#{pipeline.id}"
+ end
+
+ def lease_timeout
+ DEFAULT_LEASE_TIMEOUT
+ end
+ end
+ end
+end
diff --git a/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb b/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb
new file mode 100644
index 00000000000..42e38a5c80f
--- /dev/null
+++ b/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb
@@ -0,0 +1,135 @@
+# frozen_string_literal: true
+
+module Ci
+ module PipelineProcessing
+ class AtomicProcessingService
+ class StatusCollection
+ include Gitlab::Utils::StrongMemoize
+
+ attr_reader :pipeline
+
+ # We use these columns to perform an efficient
+ # calculation of a status
+ STATUSES_COLUMNS = [
+ :id, :name, :status, :allow_failure,
+ :stage_idx, :processed, :lock_version
+ ].freeze
+
+ def initialize(pipeline)
+ @pipeline = pipeline
+ @stage_statuses = {}
+ @prior_stage_statuses = {}
+ end
+
+ # This method updates internal status for given ID
+ def set_processable_status(id, status, lock_version)
+ processable = all_statuses_by_id[id]
+ return unless processable
+
+ processable[:status] = status
+ processable[:lock_version] = lock_version
+ end
+
+ # This methods gets composite status of all processables
+ def status_of_all
+ status_for_array(all_statuses)
+ end
+
+ # This methods gets composite status for processables with given names
+ def status_for_names(names)
+ name_statuses = all_statuses_by_name.slice(*names)
+
+ status_for_array(name_statuses.values)
+ end
+
+ # This methods gets composite status for processables before given stage
+ def status_for_prior_stage_position(position)
+ strong_memoize("status_for_prior_stage_position_#{position}") do
+ stage_statuses = all_statuses_grouped_by_stage_position
+ .select { |stage_position, _| stage_position < position }
+
+ status_for_array(stage_statuses.values.flatten)
+ end
+ end
+
+ # This methods gets a list of processables for a given stage
+ def created_processable_ids_for_stage_position(current_position)
+ all_statuses_grouped_by_stage_position[current_position]
+ .to_a
+ .select { |processable| processable[:status] == 'created' }
+ .map { |processable| processable[:id] }
+ end
+
+ # This methods gets composite status for processables at a given stage
+ def status_for_stage_position(current_position)
+ strong_memoize("status_for_stage_position_#{current_position}") do
+ stage_statuses = all_statuses_grouped_by_stage_position[current_position].to_a
+
+ status_for_array(stage_statuses.flatten)
+ end
+ end
+
+ # This method returns a list of all processable, that are to be processed
+ def processing_processables
+ all_statuses.lazy.reject { |status| status[:processed] }
+ end
+
+ private
+
+ def status_for_array(statuses)
+ result = Gitlab::Ci::Status::Composite
+ .new(statuses)
+ .status
+ result || 'success'
+ end
+
+ def all_statuses_grouped_by_stage_position
+ strong_memoize(:all_statuses_by_order) do
+ all_statuses.group_by { |status| status[:stage_idx].to_i }
+ end
+ end
+
+ def all_statuses_by_id
+ strong_memoize(:all_statuses_by_id) do
+ all_statuses.map do |row|
+ [row[:id], row]
+ end.to_h
+ end
+ end
+
+ def all_statuses_by_name
+ strong_memoize(:statuses_by_name) do
+ all_statuses.map do |row|
+ [row[:name], row]
+ end.to_h
+ end
+ end
+
+ # rubocop: disable CodeReuse/ActiveRecord
+ def all_statuses
+ # We fetch all relevant data in one go.
+ #
+ # This is more efficient than relying
+ # on PostgreSQL to calculate composite status
+ # for us
+ #
+ # Since we need to reprocess everything
+ # we can fetch all of them and do processing
+ # ourselves.
+ strong_memoize(:all_statuses) do
+ raw_statuses = pipeline
+ .statuses
+ .latest
+ .ordered_by_stage
+ .pluck(*STATUSES_COLUMNS)
+
+ raw_statuses.map do |row|
+ STATUSES_COLUMNS.zip(row).to_h
+ end
+ end
+ end
+ # rubocop: enable CodeReuse/ActiveRecord
+ end
+ end
+ end
+end
diff --git a/app/services/ci/pipeline_processing/legacy_processing_service.rb b/app/services/ci/pipeline_processing/legacy_processing_service.rb
new file mode 100644
index 00000000000..400dc9f0abb
--- /dev/null
+++ b/app/services/ci/pipeline_processing/legacy_processing_service.rb
@@ -0,0 +1,119 @@
+# frozen_string_literal: true
+
+module Ci
+ module PipelineProcessing
+ class LegacyProcessingService
+ include Gitlab::Utils::StrongMemoize
+
+ attr_reader :pipeline
+
+ def initialize(pipeline)
+ @pipeline = pipeline
+ end
+
+ def execute(trigger_build_ids = nil)
+ success = process_stages_without_needs
+
+ # we evaluate dependent needs,
+ # only when the another job has finished
+ success = process_builds_with_needs(trigger_build_ids) || success
+
+ @pipeline.update_legacy_status
+
+ success
+ end
+
+ private
+
+ def process_stages_without_needs
+ stage_indexes_of_created_processables_without_needs.flat_map do |index|
+ process_stage_without_needs(index)
+ end.any?
+ end
+
+ def process_stage_without_needs(index)
+ current_status = status_for_prior_stages(index)
+
+ return unless HasStatus::COMPLETED_STATUSES.include?(current_status)
+
+ created_processables_in_stage_without_needs(index).find_each.select do |build|
+ process_build(build, current_status)
+ end.any?
+ end
+
+ def process_builds_with_needs(trigger_build_ids)
+ return false unless trigger_build_ids.present?
+ return false unless Feature.enabled?(:ci_dag_support, project, default_enabled: true)
+
+ # we find processables that are dependent:
+ # 1. because of current dependency,
+ trigger_build_names = pipeline.processables.latest
+ .for_ids(trigger_build_ids).names
+
+ # 2. does not have builds that not yet complete
+ incomplete_build_names = pipeline.processables.latest
+ .incomplete.names
+
+ # Each found processable is guaranteed here to have completed status
+ created_processables
+ .with_needs(trigger_build_names)
+ .without_needs(incomplete_build_names)
+ .find_each
+ .map(&method(:process_build_with_needs))
+ .any?
+ end
+
+ def process_build_with_needs(build)
+ current_status = status_for_build_needs(build.needs.map(&:name))
+
+ return unless HasStatus::COMPLETED_STATUSES.include?(current_status)
+
+ process_build(build, current_status)
+ end
+
+ def process_build(build, current_status)
+ Gitlab::OptimisticLocking.retry_lock(build) do |subject|
+ Ci::ProcessBuildService.new(project, subject.user)
+ .execute(subject, current_status)
+ end
+ end
+
+ def status_for_prior_stages(index)
+ pipeline.processables.status_for_prior_stages(index)
+ end
+
+ def status_for_build_needs(needs)
+ pipeline.processables.status_for_names(needs)
+ end
+
+ # rubocop: disable CodeReuse/ActiveRecord
+ def stage_indexes_of_created_processables_without_needs
+ created_processables_without_needs.order(:stage_idx)
+ .pluck(Arel.sql('DISTINCT stage_idx'))
+ end
+ # rubocop: enable CodeReuse/ActiveRecord
+
+ def created_processables_in_stage_without_needs(index)
+ created_processables_without_needs
+ .with_preloads
+ .for_stage(index)
+ end
+
+ def created_processables_without_needs
+ if Feature.enabled?(:ci_dag_support, project, default_enabled: true)
+ pipeline.processables.created.without_needs
+ else
+ pipeline.processables.created
+ end
+ end
+
+ def created_processables
+ pipeline.processables.created
+ end
+
+ def project
+ pipeline.project
+ end
+ end
+ end
+end
diff --git a/app/services/ci/prepare_build_service.rb b/app/services/ci/prepare_build_service.rb
index 5d024c45e5f..3f87c711270 100644
--- a/app/services/ci/prepare_build_service.rb
+++ b/app/services/ci/prepare_build_service.rb
@@ -11,7 +11,7 @@ module Ci
def execute
prerequisites.each(&:complete!)
- build.enqueue!
+ build.enqueue_preparing!
rescue => e
Gitlab::ErrorTracking.track_exception(e, build_id: build.id)
diff --git a/app/services/ci/process_pipeline_service.rb b/app/services/ci/process_pipeline_service.rb
index f33cbf7ab29..1ecef256233 100644
--- a/app/services/ci/process_pipeline_service.rb
+++ b/app/services/ci/process_pipeline_service.rb
@@ -2,8 +2,6 @@
module Ci
class ProcessPipelineService
- include Gitlab::Utils::StrongMemoize
-
attr_reader :pipeline
def initialize(pipeline)
@@ -13,104 +11,18 @@ module Ci
def execute(trigger_build_ids = nil)
update_retried
- success = process_stages_without_needs
-
- # we evaluate dependent needs,
- # only when the another job has finished
- success = process_builds_with_needs(trigger_build_ids) || success
-
- @pipeline.update_status
-
- success
- end
-
- private
-
- def process_stages_without_needs
- stage_indexes_of_created_processables_without_needs.flat_map do |index|
- process_stage_without_needs(index)
- end.any?
- end
-
- def process_stage_without_needs(index)
- current_status = status_for_prior_stages(index)
-
- return unless HasStatus::COMPLETED_STATUSES.include?(current_status)
-
- created_processables_in_stage_without_needs(index).find_each.select do |build|
- process_build(build, current_status)
- end.any?
- end
-
- def process_builds_with_needs(trigger_build_ids)
- return false unless trigger_build_ids.present?
- return false unless Feature.enabled?(:ci_dag_support, project, default_enabled: true)
-
- # we find processables that are dependent:
- # 1. because of current dependency,
- trigger_build_names = pipeline.processables.latest
- .for_ids(trigger_build_ids).names
-
- # 2. does not have builds that not yet complete
- incomplete_build_names = pipeline.processables.latest
- .incomplete.names
-
- # Each found processable is guaranteed here to have completed status
- created_processables
- .with_needs(trigger_build_names)
- .without_needs(incomplete_build_names)
- .find_each
- .map(&method(:process_build_with_needs))
- .any?
- end
-
- def process_build_with_needs(build)
- current_status = status_for_build_needs(build.needs.map(&:name))
-
- return unless HasStatus::COMPLETED_STATUSES.include?(current_status)
-
- process_build(build, current_status)
- end
-
- def process_build(build, current_status)
- Gitlab::OptimisticLocking.retry_lock(build) do |subject|
- Ci::ProcessBuildService.new(project, build.user)
- .execute(subject, current_status)
- end
- end
-
- def status_for_prior_stages(index)
- pipeline.processables.status_for_prior_stages(index)
- end
-
- def status_for_build_needs(needs)
- pipeline.processables.status_for_names(needs)
- end
-
- # rubocop: disable CodeReuse/ActiveRecord
- def stage_indexes_of_created_processables_without_needs
- created_processables_without_needs.order(:stage_idx)
- .pluck(Arel.sql('DISTINCT stage_idx'))
- end
- # rubocop: enable CodeReuse/ActiveRecord
-
- def created_processables_in_stage_without_needs(index)
- created_processables_without_needs
- .with_preloads
- .for_stage(index)
- end
-
- def created_processables_without_needs
- if Feature.enabled?(:ci_dag_support, project, default_enabled: true)
- pipeline.processables.created.without_needs
+ if Feature.enabled?(:ci_atomic_processing, pipeline.project)
+ Ci::PipelineProcessing::AtomicProcessingService
+ .new(pipeline)
+ .execute
else
- pipeline.processables.created
+ Ci::PipelineProcessing::LegacyProcessingService
+ .new(pipeline)
+ .execute(trigger_build_ids)
end
end
- def created_processables
- pipeline.processables.created
- end
+ private
# This method is for compatibility and data consistency and should be removed with 9.3 version of GitLab
# This replicates what is db/post_migrate/20170416103934_upate_retried_for_ci_build.rb
@@ -131,9 +43,5 @@ module Ci
.update_all(retried: true) if latest_statuses.any?
end
# rubocop: enable CodeReuse/ActiveRecord
-
- def project
- pipeline.project
- end
end
end
diff --git a/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb b/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb
new file mode 100644
index 00000000000..a4bcca8e8b3
--- /dev/null
+++ b/app/services/ci/resource_groups/assign_resource_from_resource_group_service.rb
@@ -0,0 +1,17 @@
+# frozen_string_literal: true
+
+module Ci
+ module ResourceGroups
+ class AssignResourceFromResourceGroupService < ::BaseService
+ # rubocop: disable CodeReuse/ActiveRecord
+ def execute(resource_group)
+ free_resources = resource_group.resources.free.count
+
+ resource_group.builds.waiting_for_resource.take(free_resources).each do |build|
+ build.enqueue_waiting_for_resource
+ end
+ end
+ # rubocop: enable CodeReuse/ActiveRecord
+ end
+ end
+end
diff --git a/app/services/ci/retry_build_service.rb b/app/services/ci/retry_build_service.rb
index 7a5e33c61ba..1f00d54b6a7 100644
--- a/app/services/ci/retry_build_service.rb
+++ b/app/services/ci/retry_build_service.rb
@@ -5,13 +5,13 @@ module Ci
CLONE_ACCESSORS = %i[pipeline project ref tag options name
allow_failure stage stage_id stage_idx trigger_request
yaml_variables when environment coverage_regex
- description tag_list protected needs].freeze
+ description tag_list protected needs resource_group].freeze
def execute(build)
reprocess!(build).tap do |new_build|
build.pipeline.mark_as_processable_after_stage(build.stage_idx)
- new_build.enqueue!
+ Gitlab::OptimisticLocking.retry_lock(new_build, &:enqueue)
MergeRequests::AddTodoWhenBuildFailsService
.new(project, current_user)
@@ -31,15 +31,17 @@ module Ci
attributes.push([:user, current_user])
- build.retried = true
-
Ci::Build.transaction do
# mark all other builds of that name as retried
build.pipeline.builds.latest
.where(name: build.name)
- .update_all(retried: true)
+ .update_all(retried: true, processed: true)
- create_build!(attributes)
+ create_build!(attributes).tap do
+ # mark existing object as retried/processed without a reload
+ build.retried = true
+ build.processed = true
+ end
end
end
# rubocop: enable CodeReuse/ActiveRecord
@@ -49,6 +51,7 @@ module Ci
def create_build!(attributes)
build = project.builds.new(Hash[attributes])
build.deployment = ::Gitlab::Ci::Pipeline::Seed::Deployment.new(build).to_resource
+ build.retried = false
build.save!
build
end