1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
|
# frozen_string_literal: true
module Ci
# This class responsible for assigning
# proper pending build to runner on runner API request
class RegisterJobService
attr_reader :runner
JOB_QUEUE_DURATION_SECONDS_BUCKETS = [1, 3, 10, 30, 60, 300, 900, 1800, 3600].freeze
JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET = 5.freeze
METRICS_SHARD_TAG_PREFIX = 'metrics_shard::'.freeze
DEFAULT_METRICS_SHARD = 'default'.freeze
Result = Struct.new(:build, :build_json, :valid?)
def initialize(runner)
@runner = runner
end
# rubocop: disable CodeReuse/ActiveRecord
def execute(params = {})
builds =
if runner.instance_type?
builds_for_shared_runner
elsif runner.group_type?
builds_for_group_runner
else
builds_for_project_runner
end
valid = true
# pick builds that does not have other tags than runner's one
builds = builds.matches_tag_ids(runner.tags.ids)
# pick builds that have at least one tag
unless runner.run_untagged?
builds = builds.with_any_tags
end
# pick builds that older than specified age
if params.key?(:job_age)
builds = builds.queued_before(params[:job_age].seconds.ago)
end
builds.each do |build|
result = process_build(build, params)
next unless result
if result.valid?
register_success(result.build)
return result
else
# The usage of valid: is described in
# handling of ActiveRecord::StaleObjectError
valid = false
end
end
register_failure
Result.new(nil, nil, valid)
end
# rubocop: enable CodeReuse/ActiveRecord
private
def process_build(build, params)
return unless runner.can_pick?(build)
# In case when 2 runners try to assign the same build, second runner will be declined
# with StateMachines::InvalidTransition or StaleObjectError when doing run! or save method.
if assign_runner!(build, params)
present_build!(build)
end
rescue StateMachines::InvalidTransition, ActiveRecord::StaleObjectError
# We are looping to find another build that is not conflicting
# It also indicates that this build can be picked and passed to runner.
# If we don't do it, basically a bunch of runners would be competing for a build
# and thus we will generate a lot of 409. This will increase
# the number of generated requests, also will reduce significantly
# how many builds can be picked by runner in a unit of time.
# In case we hit the concurrency-access lock,
# we still have to return 409 in the end,
# to make sure that this is properly handled by runner.
Result.new(nil, nil, false)
rescue => ex
# If an error (e.g. GRPC::DeadlineExceeded) occurred constructing
# the result, consider this as a failure to be retried.
scheduler_failure!(build)
track_exception_for_build(ex, build)
# skip, and move to next one
nil
end
# Force variables evaluation to occur now
def present_build!(build)
# We need to use the presenter here because Gitaly calls in the presenter
# may fail, and we need to ensure the response has been generated.
presented_build = ::Ci::BuildRunnerPresenter.new(build) # rubocop:disable CodeReuse/Presenter
build_json = ::API::Entities::JobRequest::Response.new(presented_build).to_json
Result.new(build, build_json, true)
end
def assign_runner!(build, params)
build.runner_id = runner.id
build.runner_session_attributes = params[:session] if params[:session].present?
failure_reason, _ = pre_assign_runner_checks.find { |_, check| check.call(build, params) }
if failure_reason
build.drop!(failure_reason)
else
build.run!
end
!failure_reason
end
def scheduler_failure!(build)
Gitlab::OptimisticLocking.retry_lock(build, 3) do |subject|
subject.drop!(:scheduler_failure)
end
rescue => ex
build.doom!
# This requires extra exception, otherwise we would loose information
# why we cannot perform `scheduler_failure`
track_exception_for_build(ex, build)
end
def track_exception_for_build(ex, build)
Gitlab::ErrorTracking.track_exception(ex,
build_id: build.id,
build_name: build.name,
build_stage: build.stage,
pipeline_id: build.pipeline_id,
project_id: build.project_id
)
end
# rubocop: disable CodeReuse/ActiveRecord
def builds_for_shared_runner
new_builds.
# don't run projects which have not enabled shared runners and builds
joins(:project).where(projects: { shared_runners_enabled: true, pending_delete: false })
.joins('LEFT JOIN project_features ON ci_builds.project_id = project_features.project_id')
.where('project_features.builds_access_level IS NULL or project_features.builds_access_level > 0').
# Implement fair scheduling
# this returns builds that are ordered by number of running builds
# we prefer projects that don't use shared runners at all
joins("LEFT JOIN (#{running_builds_for_shared_runners.to_sql}) AS project_builds ON ci_builds.project_id=project_builds.project_id")
.order(Arel.sql('COALESCE(project_builds.running_builds, 0) ASC'), 'ci_builds.id ASC')
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def builds_for_project_runner
new_builds.where(project: runner.projects.without_deleted.with_builds_enabled).order('id ASC')
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def builds_for_group_runner
# Workaround for weird Rails bug, that makes `runner.groups.to_sql` to return `runner_id = NULL`
groups = ::Group.joins(:runner_namespaces).merge(runner.runner_namespaces)
hierarchy_groups = Gitlab::ObjectHierarchy.new(groups).base_and_descendants
projects = Project.where(namespace_id: hierarchy_groups)
.with_group_runners_enabled
.with_builds_enabled
.without_deleted
new_builds.where(project: projects).order('id ASC')
end
# rubocop: enable CodeReuse/ActiveRecord
# rubocop: disable CodeReuse/ActiveRecord
def running_builds_for_shared_runners
Ci::Build.running.where(runner: Ci::Runner.instance_type)
.group(:project_id).select(:project_id, 'count(*) AS running_builds')
end
# rubocop: enable CodeReuse/ActiveRecord
def new_builds
builds = Ci::Build.pending.unstarted
builds = builds.ref_protected if runner.ref_protected?
builds
end
def register_failure
failed_attempt_counter.increment
attempt_counter.increment
end
def register_success(job)
labels = { shared_runner: runner.instance_type?,
jobs_running_for_project: jobs_running_for_project(job),
shard: DEFAULT_METRICS_SHARD }
if runner.instance_type?
shard = runner.tag_list.sort.find { |name| name.starts_with?(METRICS_SHARD_TAG_PREFIX) }
labels[:shard] = shard.gsub(METRICS_SHARD_TAG_PREFIX, '') if shard
end
job_queue_duration_seconds.observe(labels, Time.current - job.queued_at) unless job.queued_at.nil?
attempt_counter.increment
end
# rubocop: disable CodeReuse/ActiveRecord
def jobs_running_for_project(job)
return '+Inf' unless runner.instance_type?
# excluding currently started job
running_jobs_count = job.project.builds.running.where(runner: Ci::Runner.instance_type)
.limit(JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET + 1).count - 1
running_jobs_count < JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET ? running_jobs_count : "#{JOBS_RUNNING_FOR_PROJECT_MAX_BUCKET}+"
end
# rubocop: enable CodeReuse/ActiveRecord
def failed_attempt_counter
@failed_attempt_counter ||= Gitlab::Metrics.counter(:job_register_attempts_failed_total, "Counts the times a runner tries to register a job")
end
def attempt_counter
@attempt_counter ||= Gitlab::Metrics.counter(:job_register_attempts_total, "Counts the times a runner tries to register a job")
end
def job_queue_duration_seconds
@job_queue_duration_seconds ||= Gitlab::Metrics.histogram(:job_queue_duration_seconds, 'Request handling execution time', {}, JOB_QUEUE_DURATION_SECONDS_BUCKETS)
end
def pre_assign_runner_checks
{
missing_dependency_failure: -> (build, _) { !build.has_valid_build_dependencies? },
runner_unsupported: -> (build, params) { !build.supported_runner?(params.dig(:info, :features)) },
archived_failure: -> (build, _) { build.archived? }
}
end
end
end
Ci::RegisterJobService.prepend_if_ee('EE::Ci::RegisterJobService')
|