diff options
author | GitLab Bot <gitlab-bot@gitlab.com> | 2021-03-16 18:18:33 +0000 |
---|---|---|
committer | GitLab Bot <gitlab-bot@gitlab.com> | 2021-03-16 18:18:33 +0000 |
commit | f64a639bcfa1fc2bc89ca7db268f594306edfd7c (patch) | |
tree | a2c3c2ebcc3b45e596949db485d6ed18ffaacfa1 /app/services/ci | |
parent | bfbc3e0d6583ea1a91f627528bedc3d65ba4b10f (diff) | |
download | gitlab-ce-f64a639bcfa1fc2bc89ca7db268f594306edfd7c.tar.gz |
Add latest changes from gitlab-org/gitlab@13-10-stable-eev13.10.0-rc40
Diffstat (limited to 'app/services/ci')
13 files changed, 251 insertions, 130 deletions
diff --git a/app/services/ci/build_report_result_service.rb b/app/services/ci/build_report_result_service.rb index f138aa91236..8bdb51320f9 100644 --- a/app/services/ci/build_report_result_service.rb +++ b/app/services/ci/build_report_result_service.rb @@ -33,7 +33,8 @@ module Ci failed: test_suite.failed_count, errored: test_suite.error_count, skipped: test_suite.skipped_count, - success: test_suite.success_count + success: test_suite.success_count, + suite_error: test_suite.suite_error } } end diff --git a/app/services/ci/create_downstream_pipeline_service.rb b/app/services/ci/create_downstream_pipeline_service.rb index 629d85b041f..93f0338fcba 100644 --- a/app/services/ci/create_downstream_pipeline_service.rb +++ b/app/services/ci/create_downstream_pipeline_service.rb @@ -43,7 +43,7 @@ module Ci private def update_bridge_status!(bridge, pipeline) - Gitlab::OptimisticLocking.retry_lock(bridge) do |subject| + Gitlab::OptimisticLocking.retry_lock(bridge, name: 'create_downstream_pipeline_update_bridge_status') do |subject| if pipeline.created_successfully? # If bridge uses `strategy:depend` we leave it running # and update the status when the downstream pipeline completes. diff --git a/app/services/ci/create_pipeline_service.rb b/app/services/ci/create_pipeline_service.rb index dc42411dfa1..0fd47e625fd 100644 --- a/app/services/ci/create_pipeline_service.rb +++ b/app/services/ci/create_pipeline_service.rb @@ -122,7 +122,9 @@ module Ci end def record_conversion_event - Experiments::RecordConversionEventWorker.perform_async(:ci_syntax_templates, current_user.id) + return unless project.namespace.recent? + + Experiments::RecordConversionEventWorker.perform_async(:ci_syntax_templates_b, current_user.id) end def create_namespace_onboarding_action diff --git a/app/services/ci/destroy_expired_job_artifacts_service.rb b/app/services/ci/destroy_expired_job_artifacts_service.rb index 7d8a3c17abe..d91cfb3cc82 100644 --- a/app/services/ci/destroy_expired_job_artifacts_service.rb +++ b/app/services/ci/destroy_expired_job_artifacts_service.rb @@ -4,7 +4,6 @@ module Ci class DestroyExpiredJobArtifactsService include ::Gitlab::ExclusiveLeaseHelpers include ::Gitlab::LoopHelpers - include ::Gitlab::Utils::StrongMemoize BATCH_SIZE = 100 LOOP_TIMEOUT = 5.minutes @@ -34,50 +33,20 @@ module Ci def destroy_job_artifacts_with_slow_iteration(start_at) Ci::JobArtifact.expired_before(start_at).each_batch(of: BATCH_SIZE, column: :expire_at, order: :desc) do |relation, index| - artifacts = relation.unlocked.with_destroy_preloads.to_a + # For performance reasons, join with ci_pipelines after the batch is queried. + # See: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/47496 + artifacts = relation.unlocked + + service_response = destroy_batch_async(artifacts) + @removed_artifacts_count += service_response[:destroyed_artifacts_count] - parallel_destroy_batch(artifacts) if artifacts.any? break if loop_timeout?(start_at) break if index >= LOOP_LIMIT end end - def parallel_destroy_batch(job_artifacts) - Ci::DeletedObject.transaction do - Ci::DeletedObject.bulk_import(job_artifacts) - Ci::JobArtifact.id_in(job_artifacts.map(&:id)).delete_all - destroy_related_records_for(job_artifacts) - end - - # This is executed outside of the transaction because it depends on Redis - update_project_statistics_for(job_artifacts) - increment_monitoring_statistics(job_artifacts.size) - end - - # This method is implemented in EE and it must do only database work - def destroy_related_records_for(job_artifacts); end - - def update_project_statistics_for(job_artifacts) - artifacts_by_project = job_artifacts.group_by(&:project) - artifacts_by_project.each do |project, artifacts| - delta = -artifacts.sum { |artifact| artifact.size.to_i } - ProjectStatistics.increment_statistic( - project, Ci::JobArtifact.project_statistics_name, delta) - end - end - - def increment_monitoring_statistics(size) - destroyed_artifacts_counter.increment({}, size) - @removed_artifacts_count += size - end - - def destroyed_artifacts_counter - strong_memoize(:destroyed_artifacts_counter) do - name = :destroyed_job_artifacts_count_total - comment = 'Counter of destroyed expired job artifacts' - - ::Gitlab::Metrics.counter(name, comment) - end + def destroy_batch_async(artifacts) + Ci::JobArtifactsDestroyBatchService.new(artifacts).execute end def loop_timeout?(start_at) @@ -85,5 +54,3 @@ module Ci end end end - -Ci::DestroyExpiredJobArtifactsService.prepend_if_ee('EE::Ci::DestroyExpiredJobArtifactsService') diff --git a/app/services/ci/expire_pipeline_cache_service.rb b/app/services/ci/expire_pipeline_cache_service.rb index 8343e0f8cd0..2ae60907dab 100644 --- a/app/services/ci/expire_pipeline_cache_service.rb +++ b/app/services/ci/expire_pipeline_cache_service.rb @@ -2,6 +2,11 @@ module Ci class ExpirePipelineCacheService + class UrlHelpers + include ::Gitlab::Routing + include ::GitlabRoutingHelper + end + def execute(pipeline, delete: false) store = Gitlab::EtagCaching::Store.new @@ -17,27 +22,27 @@ module Ci private def project_pipelines_path(project) - Gitlab::Routing.url_helpers.project_pipelines_path(project, format: :json) + url_helpers.project_pipelines_path(project, format: :json) end def project_pipeline_path(project, pipeline) - Gitlab::Routing.url_helpers.project_pipeline_path(project, pipeline, format: :json) + url_helpers.project_pipeline_path(project, pipeline, format: :json) end def commit_pipelines_path(project, commit) - Gitlab::Routing.url_helpers.pipelines_project_commit_path(project, commit.id, format: :json) + url_helpers.pipelines_project_commit_path(project, commit.id, format: :json) end def new_merge_request_pipelines_path(project) - Gitlab::Routing.url_helpers.project_new_merge_request_path(project, format: :json) + url_helpers.project_new_merge_request_path(project, format: :json) end def pipelines_project_merge_request_path(merge_request) - Gitlab::Routing.url_helpers.pipelines_project_merge_request_path(merge_request.target_project, merge_request, format: :json) + url_helpers.pipelines_project_merge_request_path(merge_request.target_project, merge_request, format: :json) end def merge_request_widget_path(merge_request) - Gitlab::Routing.url_helpers.cached_widget_project_json_merge_request_path(merge_request.project, merge_request, format: :json) + url_helpers.cached_widget_project_json_merge_request_path(merge_request.project, merge_request, format: :json) end def each_pipelines_merge_request_path(pipeline) @@ -47,6 +52,10 @@ module Ci end end + def graphql_pipeline_path(pipeline) + url_helpers.graphql_etag_pipeline_path(pipeline) + end + # Updates ETag caches of a pipeline. # # This logic resides in a separate method so that EE can more easily extend @@ -58,14 +67,20 @@ module Ci project = pipeline.project store.touch(project_pipelines_path(project)) - store.touch(project_pipeline_path(project, pipeline)) store.touch(commit_pipelines_path(project, pipeline.commit)) unless pipeline.commit.nil? store.touch(new_merge_request_pipelines_path(project)) each_pipelines_merge_request_path(pipeline) do |path| store.touch(path) end + + pipeline.self_with_ancestors_and_descendants.each do |relative_pipeline| + store.touch(project_pipeline_path(relative_pipeline.project, relative_pipeline)) + store.touch(graphql_pipeline_path(relative_pipeline)) + end + end + + def url_helpers + @url_helpers ||= UrlHelpers.new end end end - -Ci::ExpirePipelineCacheService.prepend_if_ee('EE::Ci::ExpirePipelineCacheService') diff --git a/app/services/ci/job_artifacts_destroy_batch_service.rb b/app/services/ci/job_artifacts_destroy_batch_service.rb new file mode 100644 index 00000000000..f8ece27fe86 --- /dev/null +++ b/app/services/ci/job_artifacts_destroy_batch_service.rb @@ -0,0 +1,72 @@ +# frozen_string_literal: true + +module Ci + class JobArtifactsDestroyBatchService + include BaseServiceUtility + include ::Gitlab::Utils::StrongMemoize + + # Danger: Private - Should only be called in Ci Services that pass a batch of job artifacts + # Not for use outsie of the ci namespace + + # Adds the passed batch of job artifacts to the `ci_deleted_objects` table + # for asyncronous destruction of the objects in Object Storage via the `Ci::DeleteObjectsService` + # and then deletes the batch of related `ci_job_artifacts` records. + # Params: + # +job_artifacts+:: A relation of job artifacts to destroy (fewer than MAX_JOB_ARTIFACT_BATCH_SIZE) + # +pick_up_at+:: When to pick up for deletion of files + # Returns: + # +Hash+:: A hash with status and destroyed_artifacts_count keys + def initialize(job_artifacts, pick_up_at: nil) + @job_artifacts = job_artifacts.with_destroy_preloads.to_a + @pick_up_at = pick_up_at + end + + # rubocop: disable CodeReuse/ActiveRecord + def execute + return success(destroyed_artifacts_count: artifacts_count) if @job_artifacts.empty? + + Ci::DeletedObject.transaction do + Ci::DeletedObject.bulk_import(@job_artifacts, @pick_up_at) + Ci::JobArtifact.id_in(@job_artifacts.map(&:id)).delete_all + destroy_related_records(@job_artifacts) + end + + # This is executed outside of the transaction because it depends on Redis + update_project_statistics + increment_monitoring_statistics(artifacts_count) + + success(destroyed_artifacts_count: artifacts_count) + end + # rubocop: enable CodeReuse/ActiveRecord + + private + + # This method is implemented in EE and it must do only database work + def destroy_related_records(artifacts); end + + def update_project_statistics + artifacts_by_project = @job_artifacts.group_by(&:project) + artifacts_by_project.each do |project, artifacts| + delta = -artifacts.sum { |artifact| artifact.size.to_i } + ProjectStatistics.increment_statistic( + project, Ci::JobArtifact.project_statistics_name, delta) + end + end + + def increment_monitoring_statistics(size) + metrics.increment_destroyed_artifacts(size) + end + + def metrics + @metrics ||= ::Gitlab::Ci::Artifacts::Metrics.new + end + + def artifacts_count + strong_memoize(:artifacts_count) do + @job_artifacts.count + end + end + end +end + +Ci::JobArtifactsDestroyBatchService.prepend_if_ee('EE::Ci::JobArtifactsDestroyBatchService') diff --git a/app/services/ci/pipeline_processing/atomic_processing_service.rb b/app/services/ci/pipeline_processing/atomic_processing_service.rb index a23d5d8941a..236d660d829 100644 --- a/app/services/ci/pipeline_processing/atomic_processing_service.rb +++ b/app/services/ci/pipeline_processing/atomic_processing_service.rb @@ -53,7 +53,7 @@ module Ci end def update_processables!(ids) - created_processables = pipeline.processables.for_ids(ids) + created_processables = pipeline.processables.id_in(ids) .with_project_preload .created .latest @@ -80,7 +80,7 @@ module Ci return unless Ci::HasStatus::COMPLETED_STATUSES.include?(status) # transition status if possible - Gitlab::OptimisticLocking.retry_lock(processable) do |subject| + Gitlab::OptimisticLocking.retry_lock(processable, name: 'atomic_processing_update_processable') do |subject| Ci::ProcessBuildService.new(project, subject.user) .execute(subject, status) diff --git a/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb b/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb index aeabbb99468..35818e2cf3d 100644 --- a/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb +++ b/app/services/ci/pipeline_processing/atomic_processing_service/status_collection.rb @@ -78,7 +78,7 @@ module Ci def status_for_array(statuses, dag:) result = Gitlab::Ci::Status::Composite - .new(statuses, dag: dag) + .new(statuses, dag: dag, project: pipeline.project) .status result || 'success' end diff --git a/app/services/ci/process_pipeline_service.rb b/app/services/ci/process_pipeline_service.rb index 678b386fbbf..970652b4da3 100644 --- a/app/services/ci/process_pipeline_service.rb +++ b/app/services/ci/process_pipeline_service.rb @@ -30,6 +30,8 @@ module Ci # this updates only when there are data that needs to be updated, there are two groups with no retried flag # rubocop: disable CodeReuse/ActiveRecord def update_retried + return if Feature.enabled?(:ci_remove_update_retried_from_process_pipeline, pipeline.project, default_enabled: :yaml) + # find the latest builds for each name latest_statuses = pipeline.latest_statuses .group(:name) diff --git a/app/services/ci/register_job_service.rb b/app/services/ci/register_job_service.rb index 59691fe4ef3..ed9e44d60f1 100644 --- a/app/services/ci/register_job_service.rb +++ b/app/services/ci/register_job_service.rb @@ -4,21 +4,85 @@ module Ci # This class responsible for assigning # proper pending build to runner on runner API request class RegisterJobService - attr_reader :runner + attr_reader :runner, :metrics - JOB_QUEUE_DURATION_SECONDS_BUCKETS = [1, 3, 10, 30, 60, 300, 900, 1800, 3600].freeze - JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET = 5.freeze - METRICS_SHARD_TAG_PREFIX = 'metrics_shard::' - DEFAULT_METRICS_SHARD = 'default' + TEMPORARY_LOCK_TIMEOUT = 3.seconds Result = Struct.new(:build, :build_json, :valid?) + MAX_QUEUE_DEPTH = 50 + def initialize(runner) @runner = runner + @metrics = ::Gitlab::Ci::Queue::Metrics.new(runner) end - # rubocop: disable CodeReuse/ActiveRecord def execute(params = {}) + @metrics.increment_queue_operation(:queue_attempt) + + @metrics.observe_queue_time do + process_queue(params) + end + end + + private + + def process_queue(params) + valid = true + depth = 0 + + each_build(params) do |build| + depth += 1 + @metrics.increment_queue_operation(:queue_iteration) + + if depth > max_queue_depth + @metrics.increment_queue_operation(:queue_depth_limit) + + valid = false + + break + end + + # We read builds from replicas + # It is likely that some other concurrent connection is processing + # a given build at a given moment. To avoid an expensive compute + # we perform an exclusive lease on Redis to acquire a build temporarily + unless acquire_temporary_lock(build.id) + @metrics.increment_queue_operation(:build_temporary_locked) + + # We failed to acquire lock + # - our queue is not complete as some resources are locked temporarily + # - we need to re-process it again to ensure that all builds are handled + valid = false + + next + end + + result = process_build(build, params) + next unless result + + if result.valid? + @metrics.register_success(result.build) + @metrics.observe_queue_depth(:found, depth) + + return result # rubocop:disable Cop/AvoidReturnFromBlocks + else + # The usage of valid: is described in + # handling of ActiveRecord::StaleObjectError + valid = false + end + end + + @metrics.increment_queue_operation(:queue_conflict) unless valid + @metrics.observe_queue_depth(:conflict, depth) unless valid + @metrics.observe_queue_depth(:not_found, depth) if valid + @metrics.register_failure + + Result.new(nil, nil, valid) + end + + # rubocop: disable CodeReuse/ActiveRecord + def each_build(params, &blk) builds = if runner.instance_type? builds_for_shared_runner @@ -28,8 +92,6 @@ module Ci builds_for_project_runner end - valid = true - # pick builds that does not have other tags than runner's one builds = builds.matches_tag_ids(runner.tags.ids) @@ -43,37 +105,42 @@ module Ci builds = builds.queued_before(params[:job_age].seconds.ago) end - builds.each do |build| - result = process_build(build, params) - next unless result + if Feature.enabled?(:ci_register_job_service_one_by_one, runner) + build_ids = builds.pluck(:id) - if result.valid? - register_success(result.build) + @metrics.observe_queue_size(-> { build_ids.size }) - return result - else - # The usage of valid: is described in - # handling of ActiveRecord::StaleObjectError - valid = false + build_ids.each do |build_id| + yield Ci::Build.find(build_id) end - end + else + @metrics.observe_queue_size(-> { builds.to_a.size }) - register_failure - Result.new(nil, nil, valid) + builds.each(&blk) + end end # rubocop: enable CodeReuse/ActiveRecord - private - def process_build(build, params) - return unless runner.can_pick?(build) + unless build.pending? + @metrics.increment_queue_operation(:build_not_pending) + return + end + + if runner.can_pick?(build) + @metrics.increment_queue_operation(:build_can_pick) + else + @metrics.increment_queue_operation(:build_not_pick) + + return + end # In case when 2 runners try to assign the same build, second runner will be declined # with StateMachines::InvalidTransition or StaleObjectError when doing run! or save method. if assign_runner!(build, params) present_build!(build) end - rescue StateMachines::InvalidTransition, ActiveRecord::StaleObjectError + rescue ActiveRecord::StaleObjectError # We are looping to find another build that is not conflicting # It also indicates that this build can be picked and passed to runner. # If we don't do it, basically a bunch of runners would be competing for a build @@ -83,8 +150,16 @@ module Ci # In case we hit the concurrency-access lock, # we still have to return 409 in the end, # to make sure that this is properly handled by runner. + @metrics.increment_queue_operation(:build_conflict_lock) + + Result.new(nil, nil, false) + rescue StateMachines::InvalidTransition + @metrics.increment_queue_operation(:build_conflict_transition) + Result.new(nil, nil, false) rescue => ex + @metrics.increment_queue_operation(:build_conflict_exception) + # If an error (e.g. GRPC::DeadlineExceeded) occurred constructing # the result, consider this as a failure to be retried. scheduler_failure!(build) @@ -94,6 +169,16 @@ module Ci nil end + def max_queue_depth + @max_queue_depth ||= begin + if Feature.enabled?(:gitlab_ci_builds_queue_limit, runner, default_enabled: false) + MAX_QUEUE_DEPTH + else + ::Gitlab::Database::MAX_INT_VALUE + end + end + end + # Force variables evaluation to occur now def present_build!(build) # We need to use the presenter here because Gitaly calls in the presenter @@ -110,16 +195,30 @@ module Ci failure_reason, _ = pre_assign_runner_checks.find { |_, check| check.call(build, params) } if failure_reason + @metrics.increment_queue_operation(:runner_pre_assign_checks_failed) + build.drop!(failure_reason) else + @metrics.increment_queue_operation(:runner_pre_assign_checks_success) + build.run! end !failure_reason end + def acquire_temporary_lock(build_id) + return true unless Feature.enabled?(:ci_register_job_temporary_lock, runner) + + key = "build/register/#{build_id}" + + Gitlab::ExclusiveLease + .new(key, timeout: TEMPORARY_LOCK_TIMEOUT.to_i) + .try_obtain + end + def scheduler_failure!(build) - Gitlab::OptimisticLocking.retry_lock(build, 3) do |subject| + Gitlab::OptimisticLocking.retry_lock(build, 3, name: 'register_job_scheduler_failure') do |subject| subject.drop!(:scheduler_failure) end rescue => ex @@ -189,48 +288,6 @@ module Ci builds end - def register_failure - failed_attempt_counter.increment - attempt_counter.increment - end - - def register_success(job) - labels = { shared_runner: runner.instance_type?, - jobs_running_for_project: jobs_running_for_project(job), - shard: DEFAULT_METRICS_SHARD } - - if runner.instance_type? - shard = runner.tag_list.sort.find { |name| name.starts_with?(METRICS_SHARD_TAG_PREFIX) } - labels[:shard] = shard.gsub(METRICS_SHARD_TAG_PREFIX, '') if shard - end - - job_queue_duration_seconds.observe(labels, Time.current - job.queued_at) unless job.queued_at.nil? - attempt_counter.increment - end - - # rubocop: disable CodeReuse/ActiveRecord - def jobs_running_for_project(job) - return '+Inf' unless runner.instance_type? - - # excluding currently started job - running_jobs_count = job.project.builds.running.where(runner: Ci::Runner.instance_type) - .limit(JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET + 1).count - 1 - running_jobs_count < JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET ? running_jobs_count : "#{JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET}+" - end - # rubocop: enable CodeReuse/ActiveRecord - - def failed_attempt_counter - @failed_attempt_counter ||= Gitlab::Metrics.counter(:job_register_attempts_failed_total, "Counts the times a runner tries to register a job") - end - - def attempt_counter - @attempt_counter ||= Gitlab::Metrics.counter(:job_register_attempts_total, "Counts the times a runner tries to register a job") - end - - def job_queue_duration_seconds - @job_queue_duration_seconds ||= Gitlab::Metrics.histogram(:job_queue_duration_seconds, 'Request handling execution time', {}, JOB_QUEUE_DURATION_SECONDS_BUCKETS) - end - def pre_assign_runner_checks { missing_dependency_failure: -> (build, _) { !build.has_valid_build_dependencies? }, diff --git a/app/services/ci/retry_build_service.rb b/app/services/ci/retry_build_service.rb index e5e79f70616..b2c5249a0c7 100644 --- a/app/services/ci/retry_build_service.rb +++ b/app/services/ci/retry_build_service.rb @@ -19,7 +19,7 @@ module Ci mark_subsequent_stages_as_processable(build) build.pipeline.reset_ancestor_bridges! - Gitlab::OptimisticLocking.retry_lock(new_build, &:enqueue) + Gitlab::OptimisticLocking.retry_lock(new_build, name: 'retry_build', &:enqueue) MergeRequests::AddTodoWhenBuildFailsService .new(project, current_user) @@ -68,7 +68,7 @@ module Ci def mark_subsequent_stages_as_processable(build) build.pipeline.processables.skipped.after_stage(build.stage_idx).find_each do |skipped| - retry_optimistic_lock(skipped) { |build| build.process(current_user) } + retry_optimistic_lock(skipped, name: 'ci_retry_build_mark_subsequent_stages') { |build| build.process(current_user) } end end end diff --git a/app/services/ci/retry_pipeline_service.rb b/app/services/ci/retry_pipeline_service.rb index dea4bf73a4c..90ee7b9b3ba 100644 --- a/app/services/ci/retry_pipeline_service.rb +++ b/app/services/ci/retry_pipeline_service.rb @@ -23,7 +23,7 @@ module Ci end pipeline.builds.latest.skipped.find_each do |skipped| - retry_optimistic_lock(skipped) { |build| build.process(current_user) } + retry_optimistic_lock(skipped, name: 'ci_retry_pipeline') { |build| build.process(current_user) } end pipeline.reset_ancestor_bridges! diff --git a/app/services/ci/update_build_queue_service.rb b/app/services/ci/update_build_queue_service.rb index 241eba733ea..cf629b879b3 100644 --- a/app/services/ci/update_build_queue_service.rb +++ b/app/services/ci/update_build_queue_service.rb @@ -2,16 +2,21 @@ module Ci class UpdateBuildQueueService - def execute(build) - tick_for(build, build.project.all_runners) + def execute(build, metrics = ::Gitlab::Ci::Queue::Metrics) + tick_for(build, build.project.all_runners, metrics) end private - def tick_for(build, runners) + def tick_for(build, runners, metrics) runners = runners.with_recent_runner_queue + runners = runners.with_tags if Feature.enabled?(:ci_preload_runner_tags, default_enabled: :yaml) + + metrics.observe_active_runners(-> { runners.to_a.size }) runners.each do |runner| + metrics.increment_runner_tick(runner) + runner.pick_build!(build) end end |