diff options
Diffstat (limited to 'app/services/ci/register_job_service.rb')
-rw-r--r-- | app/services/ci/register_job_service.rb | 193 |
1 files changed, 125 insertions, 68 deletions
diff --git a/app/services/ci/register_job_service.rb b/app/services/ci/register_job_service.rb index 59691fe4ef3..ed9e44d60f1 100644 --- a/app/services/ci/register_job_service.rb +++ b/app/services/ci/register_job_service.rb @@ -4,21 +4,85 @@ module Ci # This class responsible for assigning # proper pending build to runner on runner API request class RegisterJobService - attr_reader :runner + attr_reader :runner, :metrics - JOB_QUEUE_DURATION_SECONDS_BUCKETS = [1, 3, 10, 30, 60, 300, 900, 1800, 3600].freeze - JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET = 5.freeze - METRICS_SHARD_TAG_PREFIX = 'metrics_shard::' - DEFAULT_METRICS_SHARD = 'default' + TEMPORARY_LOCK_TIMEOUT = 3.seconds Result = Struct.new(:build, :build_json, :valid?) + MAX_QUEUE_DEPTH = 50 + def initialize(runner) @runner = runner + @metrics = ::Gitlab::Ci::Queue::Metrics.new(runner) end - # rubocop: disable CodeReuse/ActiveRecord def execute(params = {}) + @metrics.increment_queue_operation(:queue_attempt) + + @metrics.observe_queue_time do + process_queue(params) + end + end + + private + + def process_queue(params) + valid = true + depth = 0 + + each_build(params) do |build| + depth += 1 + @metrics.increment_queue_operation(:queue_iteration) + + if depth > max_queue_depth + @metrics.increment_queue_operation(:queue_depth_limit) + + valid = false + + break + end + + # We read builds from replicas + # It is likely that some other concurrent connection is processing + # a given build at a given moment. To avoid an expensive compute + # we perform an exclusive lease on Redis to acquire a build temporarily + unless acquire_temporary_lock(build.id) + @metrics.increment_queue_operation(:build_temporary_locked) + + # We failed to acquire lock + # - our queue is not complete as some resources are locked temporarily + # - we need to re-process it again to ensure that all builds are handled + valid = false + + next + end + + result = process_build(build, params) + next unless result + + if result.valid? + @metrics.register_success(result.build) + @metrics.observe_queue_depth(:found, depth) + + return result # rubocop:disable Cop/AvoidReturnFromBlocks + else + # The usage of valid: is described in + # handling of ActiveRecord::StaleObjectError + valid = false + end + end + + @metrics.increment_queue_operation(:queue_conflict) unless valid + @metrics.observe_queue_depth(:conflict, depth) unless valid + @metrics.observe_queue_depth(:not_found, depth) if valid + @metrics.register_failure + + Result.new(nil, nil, valid) + end + + # rubocop: disable CodeReuse/ActiveRecord + def each_build(params, &blk) builds = if runner.instance_type? builds_for_shared_runner @@ -28,8 +92,6 @@ module Ci builds_for_project_runner end - valid = true - # pick builds that does not have other tags than runner's one builds = builds.matches_tag_ids(runner.tags.ids) @@ -43,37 +105,42 @@ module Ci builds = builds.queued_before(params[:job_age].seconds.ago) end - builds.each do |build| - result = process_build(build, params) - next unless result + if Feature.enabled?(:ci_register_job_service_one_by_one, runner) + build_ids = builds.pluck(:id) - if result.valid? - register_success(result.build) + @metrics.observe_queue_size(-> { build_ids.size }) - return result - else - # The usage of valid: is described in - # handling of ActiveRecord::StaleObjectError - valid = false + build_ids.each do |build_id| + yield Ci::Build.find(build_id) end - end + else + @metrics.observe_queue_size(-> { builds.to_a.size }) - register_failure - Result.new(nil, nil, valid) + builds.each(&blk) + end end # rubocop: enable CodeReuse/ActiveRecord - private - def process_build(build, params) - return unless runner.can_pick?(build) + unless build.pending? + @metrics.increment_queue_operation(:build_not_pending) + return + end + + if runner.can_pick?(build) + @metrics.increment_queue_operation(:build_can_pick) + else + @metrics.increment_queue_operation(:build_not_pick) + + return + end # In case when 2 runners try to assign the same build, second runner will be declined # with StateMachines::InvalidTransition or StaleObjectError when doing run! or save method. if assign_runner!(build, params) present_build!(build) end - rescue StateMachines::InvalidTransition, ActiveRecord::StaleObjectError + rescue ActiveRecord::StaleObjectError # We are looping to find another build that is not conflicting # It also indicates that this build can be picked and passed to runner. # If we don't do it, basically a bunch of runners would be competing for a build @@ -83,8 +150,16 @@ module Ci # In case we hit the concurrency-access lock, # we still have to return 409 in the end, # to make sure that this is properly handled by runner. + @metrics.increment_queue_operation(:build_conflict_lock) + + Result.new(nil, nil, false) + rescue StateMachines::InvalidTransition + @metrics.increment_queue_operation(:build_conflict_transition) + Result.new(nil, nil, false) rescue => ex + @metrics.increment_queue_operation(:build_conflict_exception) + # If an error (e.g. GRPC::DeadlineExceeded) occurred constructing # the result, consider this as a failure to be retried. scheduler_failure!(build) @@ -94,6 +169,16 @@ module Ci nil end + def max_queue_depth + @max_queue_depth ||= begin + if Feature.enabled?(:gitlab_ci_builds_queue_limit, runner, default_enabled: false) + MAX_QUEUE_DEPTH + else + ::Gitlab::Database::MAX_INT_VALUE + end + end + end + # Force variables evaluation to occur now def present_build!(build) # We need to use the presenter here because Gitaly calls in the presenter @@ -110,16 +195,30 @@ module Ci failure_reason, _ = pre_assign_runner_checks.find { |_, check| check.call(build, params) } if failure_reason + @metrics.increment_queue_operation(:runner_pre_assign_checks_failed) + build.drop!(failure_reason) else + @metrics.increment_queue_operation(:runner_pre_assign_checks_success) + build.run! end !failure_reason end + def acquire_temporary_lock(build_id) + return true unless Feature.enabled?(:ci_register_job_temporary_lock, runner) + + key = "build/register/#{build_id}" + + Gitlab::ExclusiveLease + .new(key, timeout: TEMPORARY_LOCK_TIMEOUT.to_i) + .try_obtain + end + def scheduler_failure!(build) - Gitlab::OptimisticLocking.retry_lock(build, 3) do |subject| + Gitlab::OptimisticLocking.retry_lock(build, 3, name: 'register_job_scheduler_failure') do |subject| subject.drop!(:scheduler_failure) end rescue => ex @@ -189,48 +288,6 @@ module Ci builds end - def register_failure - failed_attempt_counter.increment - attempt_counter.increment - end - - def register_success(job) - labels = { shared_runner: runner.instance_type?, - jobs_running_for_project: jobs_running_for_project(job), - shard: DEFAULT_METRICS_SHARD } - - if runner.instance_type? - shard = runner.tag_list.sort.find { |name| name.starts_with?(METRICS_SHARD_TAG_PREFIX) } - labels[:shard] = shard.gsub(METRICS_SHARD_TAG_PREFIX, '') if shard - end - - job_queue_duration_seconds.observe(labels, Time.current - job.queued_at) unless job.queued_at.nil? - attempt_counter.increment - end - - # rubocop: disable CodeReuse/ActiveRecord - def jobs_running_for_project(job) - return '+Inf' unless runner.instance_type? - - # excluding currently started job - running_jobs_count = job.project.builds.running.where(runner: Ci::Runner.instance_type) - .limit(JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET + 1).count - 1 - running_jobs_count < JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET ? running_jobs_count : "#{JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET}+" - end - # rubocop: enable CodeReuse/ActiveRecord - - def failed_attempt_counter - @failed_attempt_counter ||= Gitlab::Metrics.counter(:job_register_attempts_failed_total, "Counts the times a runner tries to register a job") - end - - def attempt_counter - @attempt_counter ||= Gitlab::Metrics.counter(:job_register_attempts_total, "Counts the times a runner tries to register a job") - end - - def job_queue_duration_seconds - @job_queue_duration_seconds ||= Gitlab::Metrics.histogram(:job_queue_duration_seconds, 'Request handling execution time', {}, JOB_QUEUE_DURATION_SECONDS_BUCKETS) - end - def pre_assign_runner_checks { missing_dependency_failure: -> (build, _) { !build.has_valid_build_dependencies? }, |