blob: 807c27a71ffc7201f350e50d1e2b70a5a4f32157 (
plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
|
# frozen_string_literal: true
module Gitlab
class SidekiqQueue
include Gitlab::Utils::StrongMemoize
NoMetadataError = Class.new(StandardError)
InvalidQueueError = Class.new(StandardError)
attr_reader :queue_name
def initialize(queue_name)
@queue_name = queue_name
end
def drop_jobs!(search_metadata, timeout:)
start_time = Gitlab::Metrics::System.monotonic_time
completed = true
deleted_jobs = 0
job_search_metadata =
search_metadata
.stringify_keys
.slice(*Labkit::Context::KNOWN_KEYS)
.transform_keys { |key| "meta.#{key}" }
.compact
raise NoMetadataError if job_search_metadata.empty?
raise InvalidQueueError unless queue
queue.each do |job|
if timeout_exceeded?(start_time, timeout)
completed = false
break
end
next unless job_matches?(job, job_search_metadata)
job.delete
deleted_jobs += 1
end
{
completed: completed,
deleted_jobs: deleted_jobs,
queue_size: queue.size
}
end
private
def queue
strong_memoize(:queue) do
# Sidekiq::Queue.new always returns a queue, even if it doesn't
# exist.
Sidekiq::Queue.all.find { |queue| queue.name == queue_name }
end
end
def job_matches?(job, job_search_metadata)
job_search_metadata.all? { |key, value| job[key] == value }
end
def timeout_exceeded?(start_time, timeout)
(Gitlab::Metrics::System.monotonic_time - start_time) > timeout
end
end
end
|