diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2020-01-17 15:08:37 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2020-01-17 15:08:37 +0000 |
commit | 37eff29d5ce44899e34c7c2ac319b314f2f26d15 (patch) | |
tree | b74e1632fdb58ea10972f270bfec70a4e6ee07b0 /app | |
parent | 9411a664118a3247d0a56baf7e7ef4549c1201c3 (diff) | |
download | gitlab-ce-37eff29d5ce44899e34c7c2ac319b314f2f26d15.tar.gz |
Add latest changes from gitlab-org/gitlab@master
Diffstat (limited to 'app')
-rw-r--r-- | app/assets/stylesheets/framework/common.scss | 1 | ||||
-rw-r--r-- | app/models/board.rb | 2 | ||||
-rw-r--r-- | app/models/ci/build.rb | 4 | ||||
-rw-r--r-- | app/models/ci/pipeline.rb | 18 | ||||
-rw-r--r-- | app/models/ci/processable.rb | 18 | ||||
-rw-r--r-- | app/models/ci/stage.rb | 10 | ||||
-rw-r--r-- | app/models/commit_status.rb | 64 | ||||
-rw-r--r-- | app/models/project_services/chat_message/base_message.rb | 8 | ||||
-rw-r--r-- | app/services/boards/list_service.rb | 17 | ||||
-rw-r--r-- | app/services/ci/pipeline_processing/atomic_processing_service.rb | 118 | ||||
-rw-r--r-- | app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb | 135 | ||||
-rw-r--r-- | app/services/ci/pipeline_processing/legacy_processing_service.rb | 2 | ||||
-rw-r--r-- | app/services/ci/process_pipeline_service.rb | 12 | ||||
-rw-r--r-- | app/services/ci/retry_build_service.rb | 13 | ||||
-rw-r--r-- | app/workers/pipeline_update_worker.rb | 5 | ||||
-rw-r--r-- | app/workers/stage_update_worker.rb | 6 |
16 files changed, 391 insertions, 42 deletions
diff --git a/app/assets/stylesheets/framework/common.scss b/app/assets/stylesheets/framework/common.scss index 085bf0f0a37..7521a6491af 100644 --- a/app/assets/stylesheets/framework/common.scss +++ b/app/assets/stylesheets/framework/common.scss @@ -460,6 +460,7 @@ img.emoji { .w-8em { width: 8em; } .w-3rem { width: 3rem; } .w-15p { width: 15%; } +.w-30p { width: 30%; } .w-70p { width: 70%; } .h-12em { height: 12em; } diff --git a/app/models/board.rb b/app/models/board.rb index f3f938224a4..38bbb550044 100644 --- a/app/models/board.rb +++ b/app/models/board.rb @@ -11,6 +11,8 @@ class Board < ApplicationRecord validates :group, presence: true, unless: :project scope :with_associations, -> { preload(:destroyable_lists) } + scope :order_by_name_asc, -> { order(arel_table[:name].lower.asc) } + scope :first_board, -> { where(id: self.order_by_name_asc.limit(1).select(:id)) } def project_needed? !group diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb index bb3762c26f6..369a793f3d5 100644 --- a/app/models/ci/build.rb +++ b/app/models/ci/build.rb @@ -447,10 +447,6 @@ module Ci options_retry_when.include?('always') end - def latest? - !retried? - end - def any_unmet_prerequisites? prerequisites.present? end diff --git a/app/models/ci/pipeline.rb b/app/models/ci/pipeline.rb index 3943d991c87..7e3ba98d86c 100644 --- a/app/models/ci/pipeline.rb +++ b/app/models/ci/pipeline.rb @@ -515,7 +515,9 @@ module Ci # rubocop: enable CodeReuse/ServiceClass def mark_as_processable_after_stage(stage_idx) - builds.skipped.after_stage(stage_idx).find_each(&:process) + builds.skipped.after_stage(stage_idx).find_each do |build| + Gitlab::OptimisticLocking.retry_lock(build, &:process) + end end def latest? @@ -554,6 +556,13 @@ module Ci end end + def needs_processing? + statuses + .where(processed: [false, nil]) + .latest + .exists? + end + # TODO: this logic is duplicate with Pipeline::Chain::Config::Content # we should persist this is `ci_pipelines.config_path` def config_path @@ -583,9 +592,8 @@ module Ci project.notes.for_commit_id(sha) end - def update_status + def set_status(new_status) retry_optimistic_lock(self) do - new_status = latest_builds_status.to_s case new_status when 'created' then nil when 'waiting_for_resource' then request_resource @@ -605,6 +613,10 @@ module Ci end end + def update_legacy_status + set_status(latest_builds_status.to_s) + end + def protected_ref? strong_memoize(:protected_ref) { project.protected_for?(git_ref) } end diff --git a/app/models/ci/processable.rb b/app/models/ci/processable.rb index 9c56aa67e20..6c4b271cd2c 100644 --- a/app/models/ci/processable.rb +++ b/app/models/ci/processable.rb @@ -8,8 +8,26 @@ module Ci scope :preload_needs, -> { preload(:needs) } + def self.select_with_aggregated_needs(project) + return all unless Feature.enabled?(:ci_dag_support, project, default_enabled: true) + + aggregated_needs_names = Ci::BuildNeed + .scoped_build + .select("ARRAY_AGG(name)") + .to_sql + + all.select( + '*', + "(#{aggregated_needs_names}) as aggregated_needs_names" + ) + end + validates :type, presence: true + def aggregated_needs_names + read_attribute(:aggregated_needs_names) + end + def schedulable? raise NotImplementedError end diff --git a/app/models/ci/stage.rb b/app/models/ci/stage.rb index 96041e02337..75f73429c2a 100644 --- a/app/models/ci/stage.rb +++ b/app/models/ci/stage.rb @@ -13,9 +13,12 @@ module Ci belongs_to :pipeline has_many :statuses, class_name: 'CommitStatus', foreign_key: :stage_id + has_many :processables, class_name: 'Ci::Processable', foreign_key: :stage_id has_many :builds, foreign_key: :stage_id has_many :bridges, foreign_key: :stage_id + scope :ordered, -> { order(position: :asc) } + with_options unless: :importing? do validates :project, presence: true validates :pipeline, presence: true @@ -80,9 +83,8 @@ module Ci end end - def update_status + def set_status(new_status) retry_optimistic_lock(self) do - new_status = latest_stage_status.to_s case new_status when 'created' then nil when 'waiting_for_resource' then request_resource @@ -102,6 +104,10 @@ module Ci end end + def update_legacy_status + set_status(latest_stage_status.to_s) + end + def groups @groups ||= Ci::Group.fabricate(self) end diff --git a/app/models/commit_status.rb b/app/models/commit_status.rb index 773481da5f9..f9101609f89 100644 --- a/app/models/commit_status.rb +++ b/app/models/commit_status.rb @@ -40,6 +40,7 @@ class CommitStatus < ApplicationRecord scope :latest, -> { where(retried: [false, nil]) } scope :retried, -> { where(retried: true) } scope :ordered, -> { order(:name) } + scope :ordered_by_stage, -> { order(stage_idx: :asc) } scope :latest_ordered, -> { latest.ordered.includes(project: :namespace) } scope :retried_ordered, -> { retried.ordered.includes(project: :namespace) } scope :before_stage, -> (index) { where('stage_idx < ?', index) } @@ -57,6 +58,10 @@ class CommitStatus < ApplicationRecord preload(:project, :user) end + scope :with_project_preload, -> do + preload(project: :namespace) + end + scope :with_needs, -> (names = nil) do needs = Ci::BuildNeed.scoped_build.select(1) needs = needs.where(name: names) if names @@ -69,6 +74,15 @@ class CommitStatus < ApplicationRecord where('NOT EXISTS (?)', needs) end + scope :match_id_and_lock_version, -> (slice) do + # it expects that items are an array of attributes to match + # each hash needs to have `id` and `lock_version` + slice.inject(self) do |relation, item| + match = CommitStatus.where(item.slice(:id, :lock_version)) + relation.or(match) + end + end + # We use `CommitStatusEnums.failure_reasons` here so that EE can more easily # extend this `Hash` with new values. enum_with_nil failure_reason: ::CommitStatusEnums.failure_reasons @@ -86,6 +100,16 @@ class CommitStatus < ApplicationRecord # rubocop: enable CodeReuse/ServiceClass end + before_save if: :status_changed?, unless: :importing? do + if Feature.disabled?(:ci_atomic_processing, project) + self.processed = nil + elsif latest? + self.processed = false # force refresh of all dependent ones + elsif retried? + self.processed = true # retried are considered to be already processed + end + end + state_machine :status do event :process do transition [:skipped, :manual] => :created @@ -136,19 +160,13 @@ class CommitStatus < ApplicationRecord end after_transition do |commit_status, transition| - next unless commit_status.project next if transition.loopback? + next if commit_status.processed? + next unless commit_status.project commit_status.run_after_commit do - if pipeline_id - if complete? || manual? - PipelineProcessWorker.perform_async(pipeline_id, [id]) - else - PipelineUpdateWorker.perform_async(pipeline_id) - end - end - - StageUpdateWorker.perform_async(stage_id) + schedule_stage_and_pipeline_update + ExpireJobCacheWorker.perform_async(id) end end @@ -177,6 +195,11 @@ class CommitStatus < ApplicationRecord where(name: names).latest.slow_composite_status || 'success' end + def self.update_as_processed! + # Marks items as processed, and increases `lock_version` (Optimisitc Locking) + update_all('processed=TRUE, lock_version=COALESCE(lock_version,0)+1') + end + def locking_enabled? will_save_change_to_status? end @@ -193,6 +216,10 @@ class CommitStatus < ApplicationRecord calculate_duration end + def latest? + !retried? + end + def playable? false end @@ -244,4 +271,21 @@ class CommitStatus < ApplicationRecord v =~ /\d+/ ? v.to_i : v end end + + private + + def schedule_stage_and_pipeline_update + if Feature.enabled?(:ci_atomic_processing, project) + # Atomic Processing requires only single Worker + PipelineProcessWorker.perform_async(pipeline_id, [id]) + else + if complete? || manual? + PipelineProcessWorker.perform_async(pipeline_id, [id]) + else + PipelineUpdateWorker.perform_async(pipeline_id) + end + + StageUpdateWorker.perform_async(stage_id) + end + end end diff --git a/app/models/project_services/chat_message/base_message.rb b/app/models/project_services/chat_message/base_message.rb index 6542112ba32..529af1277b0 100644 --- a/app/models/project_services/chat_message/base_message.rb +++ b/app/models/project_services/chat_message/base_message.rb @@ -4,6 +4,8 @@ require 'slack-notifier' module ChatMessage class BaseMessage + RELATIVE_LINK_REGEX = /!\[[^\]]*\]\((\/uploads\/[^\)]*)\)/.freeze + attr_reader :markdown attr_reader :user_full_name attr_reader :user_name @@ -59,7 +61,11 @@ module ChatMessage end def format(string) - Slack::Notifier::LinkFormatter.format(string) + Slack::Notifier::LinkFormatter.format(format_relative_links(string)) + end + + def format_relative_links(string) + string.gsub(RELATIVE_LINK_REGEX, "#{project_url}\\1") end def attachment_color diff --git a/app/services/boards/list_service.rb b/app/services/boards/list_service.rb index 44d5a21b15f..8258d5d07d3 100644 --- a/app/services/boards/list_service.rb +++ b/app/services/boards/list_service.rb @@ -4,13 +4,24 @@ module Boards class ListService < Boards::BaseService def execute create_board! if parent.boards.empty? - boards + + if parent.multiple_issue_boards_available? + boards + else + # When multiple issue boards are not available + # a user is only allowed to view the default shown board + first_board + end end private def boards - parent.boards + parent.boards.order_by_name_asc + end + + def first_board + parent.boards.first_board end def create_board! @@ -18,5 +29,3 @@ module Boards end end end - -Boards::ListService.prepend_if_ee('EE::Boards::ListService') diff --git a/app/services/ci/pipeline_processing/atomic_processing_service.rb b/app/services/ci/pipeline_processing/atomic_processing_service.rb new file mode 100644 index 00000000000..1ed295f5950 --- /dev/null +++ b/app/services/ci/pipeline_processing/atomic_processing_service.rb @@ -0,0 +1,118 @@ +# frozen_string_literal: true + +module Ci + module PipelineProcessing + class AtomicProcessingService + include Gitlab::Utils::StrongMemoize + include ExclusiveLeaseGuard + + attr_reader :pipeline + + DEFAULT_LEASE_TIMEOUT = 1.minute + BATCH_SIZE = 20 + + def initialize(pipeline) + @pipeline = pipeline + @collection = AtomicProcessingService::StatusCollection.new(pipeline) + end + + def execute + return unless pipeline.needs_processing? + + success = try_obtain_lease { process! } + + # re-schedule if we need further processing + if success && pipeline.needs_processing? + PipelineProcessWorker.perform_async(pipeline.id) + end + + success + end + + private + + def process! + update_stages! + update_pipeline! + update_statuses_processed! + + true + end + + def update_stages! + pipeline.stages.ordered.each(&method(:update_stage!)) + end + + def update_stage!(stage) + # Update processables for a given stage in bulk/slices + ids = @collection.created_processable_ids_for_stage_position(stage.position) + ids.in_groups_of(BATCH_SIZE, false, &method(:update_processables!)) + + status = @collection.status_for_stage_position(stage.position) + stage.set_status(status) + end + + def update_processables!(ids) + created_processables = pipeline.processables.for_ids(ids) + .with_project_preload + .created + .latest + .ordered_by_stage + .select_with_aggregated_needs(project) + + created_processables.each(&method(:update_processable!)) + end + + def update_pipeline! + pipeline.set_status(@collection.status_of_all) + end + + def update_statuses_processed! + processing = @collection.processing_processables + processing.each_slice(BATCH_SIZE) do |slice| + pipeline.statuses.match_id_and_lock_version(slice) + .update_as_processed! + end + end + + def update_processable!(processable) + status = processable_status(processable) + return unless HasStatus::COMPLETED_STATUSES.include?(status) + + # transition status if possible + Gitlab::OptimisticLocking.retry_lock(processable) do |subject| + Ci::ProcessBuildService.new(project, subject.user) + .execute(subject, status) + + # update internal representation of status + # to make the status change of processable + # to be taken into account during further processing + @collection.set_processable_status( + processable.id, processable.status, processable.lock_version) + end + end + + def processable_status(processable) + if needs_names = processable.aggregated_needs_names + # Processable uses DAG, get status of all dependent needs + @collection.status_for_names(needs_names) + else + # Processable uses Stages, get status of prior stage + @collection.status_for_prior_stage_position(processable.stage_idx.to_i) + end + end + + def project + pipeline.project + end + + def lease_key + "#{super}::pipeline_id:#{pipeline.id}" + end + + def lease_timeout + DEFAULT_LEASE_TIMEOUT + end + end + end +end diff --git a/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb b/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb new file mode 100644 index 00000000000..42e38a5c80f --- /dev/null +++ b/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb @@ -0,0 +1,135 @@ +# frozen_string_literal: true + +module Ci + module PipelineProcessing + class AtomicProcessingService + class StatusCollection + include Gitlab::Utils::StrongMemoize + + attr_reader :pipeline + + # We use these columns to perform an efficient + # calculation of a status + STATUSES_COLUMNS = [ + :id, :name, :status, :allow_failure, + :stage_idx, :processed, :lock_version + ].freeze + + def initialize(pipeline) + @pipeline = pipeline + @stage_statuses = {} + @prior_stage_statuses = {} + end + + # This method updates internal status for given ID + def set_processable_status(id, status, lock_version) + processable = all_statuses_by_id[id] + return unless processable + + processable[:status] = status + processable[:lock_version] = lock_version + end + + # This methods gets composite status of all processables + def status_of_all + status_for_array(all_statuses) + end + + # This methods gets composite status for processables with given names + def status_for_names(names) + name_statuses = all_statuses_by_name.slice(*names) + + status_for_array(name_statuses.values) + end + + # This methods gets composite status for processables before given stage + def status_for_prior_stage_position(position) + strong_memoize("status_for_prior_stage_position_#{position}") do + stage_statuses = all_statuses_grouped_by_stage_position + .select { |stage_position, _| stage_position < position } + + status_for_array(stage_statuses.values.flatten) + end + end + + # This methods gets a list of processables for a given stage + def created_processable_ids_for_stage_position(current_position) + all_statuses_grouped_by_stage_position[current_position] + .to_a + .select { |processable| processable[:status] == 'created' } + .map { |processable| processable[:id] } + end + + # This methods gets composite status for processables at a given stage + def status_for_stage_position(current_position) + strong_memoize("status_for_stage_position_#{current_position}") do + stage_statuses = all_statuses_grouped_by_stage_position[current_position].to_a + + status_for_array(stage_statuses.flatten) + end + end + + # This method returns a list of all processable, that are to be processed + def processing_processables + all_statuses.lazy.reject { |status| status[:processed] } + end + + private + + def status_for_array(statuses) + result = Gitlab::Ci::Status::Composite + .new(statuses) + .status + result || 'success' + end + + def all_statuses_grouped_by_stage_position + strong_memoize(:all_statuses_by_order) do + all_statuses.group_by { |status| status[:stage_idx].to_i } + end + end + + def all_statuses_by_id + strong_memoize(:all_statuses_by_id) do + all_statuses.map do |row| + [row[:id], row] + end.to_h + end + end + + def all_statuses_by_name + strong_memoize(:statuses_by_name) do + all_statuses.map do |row| + [row[:name], row] + end.to_h + end + end + + # rubocop: disable CodeReuse/ActiveRecord + def all_statuses + # We fetch all relevant data in one go. + # + # This is more efficient than relying + # on PostgreSQL to calculate composite status + # for us + # + # Since we need to reprocess everything + # we can fetch all of them and do processing + # ourselves. + strong_memoize(:all_statuses) do + raw_statuses = pipeline + .statuses + .latest + .ordered_by_stage + .pluck(*STATUSES_COLUMNS) + + raw_statuses.map do |row| + STATUSES_COLUMNS.zip(row).to_h + end + end + end + # rubocop: enable CodeReuse/ActiveRecord + end + end + end +end diff --git a/app/services/ci/pipeline_processing/legacy_processing_service.rb b/app/services/ci/pipeline_processing/legacy_processing_service.rb index d7535a5f743..400dc9f0abb 100644 --- a/app/services/ci/pipeline_processing/legacy_processing_service.rb +++ b/app/services/ci/pipeline_processing/legacy_processing_service.rb @@ -18,7 +18,7 @@ module Ci # only when the another job has finished success = process_builds_with_needs(trigger_build_ids) || success - @pipeline.update_status + @pipeline.update_legacy_status success end diff --git a/app/services/ci/process_pipeline_service.rb b/app/services/ci/process_pipeline_service.rb index 3a7d6ad9c3d..1ecef256233 100644 --- a/app/services/ci/process_pipeline_service.rb +++ b/app/services/ci/process_pipeline_service.rb @@ -11,9 +11,15 @@ module Ci def execute(trigger_build_ids = nil) update_retried - Ci::PipelineProcessing::LegacyProcessingService - .new(pipeline) - .execute(trigger_build_ids) + if Feature.enabled?(:ci_atomic_processing, pipeline.project) + Ci::PipelineProcessing::AtomicProcessingService + .new(pipeline) + .execute + else + Ci::PipelineProcessing::LegacyProcessingService + .new(pipeline) + .execute(trigger_build_ids) + end end private diff --git a/app/services/ci/retry_build_service.rb b/app/services/ci/retry_build_service.rb index 5abfbd26641..1f00d54b6a7 100644 --- a/app/services/ci/retry_build_service.rb +++ b/app/services/ci/retry_build_service.rb @@ -11,7 +11,7 @@ module Ci reprocess!(build).tap do |new_build| build.pipeline.mark_as_processable_after_stage(build.stage_idx) - new_build.enqueue! + Gitlab::OptimisticLocking.retry_lock(new_build, &:enqueue) MergeRequests::AddTodoWhenBuildFailsService .new(project, current_user) @@ -31,15 +31,17 @@ module Ci attributes.push([:user, current_user]) - build.retried = true - Ci::Build.transaction do # mark all other builds of that name as retried build.pipeline.builds.latest .where(name: build.name) - .update_all(retried: true) + .update_all(retried: true, processed: true) - create_build!(attributes) + create_build!(attributes).tap do + # mark existing object as retried/processed without a reload + build.retried = true + build.processed = true + end end end # rubocop: enable CodeReuse/ActiveRecord @@ -49,6 +51,7 @@ module Ci def create_build!(attributes) build = project.builds.new(Hash[attributes]) build.deployment = ::Gitlab::Ci::Pipeline::Seed::Deployment.new(build).to_resource + build.retried = false build.save! build end diff --git a/app/workers/pipeline_update_worker.rb b/app/workers/pipeline_update_worker.rb index 5b742461f7a..0321ea5a6ce 100644 --- a/app/workers/pipeline_update_worker.rb +++ b/app/workers/pipeline_update_worker.rb @@ -7,10 +7,7 @@ class PipelineUpdateWorker queue_namespace :pipeline_processing latency_sensitive_worker! - # rubocop: disable CodeReuse/ActiveRecord def perform(pipeline_id) - Ci::Pipeline.find_by(id: pipeline_id) - .try(:update_status) + Ci::Pipeline.find_by_id(pipeline_id)&.update_legacy_status end - # rubocop: enable CodeReuse/ActiveRecord end diff --git a/app/workers/stage_update_worker.rb b/app/workers/stage_update_worker.rb index de2454128f6..a96c4c6dda2 100644 --- a/app/workers/stage_update_worker.rb +++ b/app/workers/stage_update_worker.rb @@ -7,11 +7,7 @@ class StageUpdateWorker queue_namespace :pipeline_processing latency_sensitive_worker! - # rubocop: disable CodeReuse/ActiveRecord def perform(stage_id) - Ci::Stage.find_by(id: stage_id).try do |stage| - stage.update_status - end + Ci::Stage.find_by_id(stage_id)&.update_legacy_status end - # rubocop: enable CodeReuse/ActiveRecord end |