1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
|
# frozen_string_literal: true
module Gitlab
module SidekiqLogging
class StructuredLogger
START_TIMESTAMP_FIELDS = %w[created_at enqueued_at].freeze
DONE_TIMESTAMP_FIELDS = %w[started_at retried_at failed_at completed_at].freeze
MAXIMUM_JOB_ARGUMENTS_LENGTH = 10.kilobytes
def call(job, queue)
started_at = current_time
base_payload = parse_job(job)
Sidekiq.logger.info log_job_start(started_at, base_payload)
yield
Sidekiq.logger.info log_job_done(job, started_at, base_payload)
rescue => job_exception
Sidekiq.logger.warn log_job_done(job, started_at, base_payload, job_exception)
raise
end
private
def base_message(payload)
"#{payload['class']} JID-#{payload['jid']}"
end
def add_instrumentation_keys!(job, output_payload)
output_payload.merge!(job.slice(*::Gitlab::InstrumentationHelper::KEYS))
end
def log_job_start(started_at, payload)
payload['message'] = "#{base_message(payload)}: start"
payload['job_status'] = 'start'
# Old gitlab-shell messages don't provide enqueued_at/created_at attributes
enqueued_at = payload['enqueued_at'] || payload['created_at']
if enqueued_at
payload['scheduling_latency_s'] = elapsed_by_absolute_time(Time.iso8601(enqueued_at))
end
payload
end
def log_job_done(job, started_at, payload, job_exception = nil)
payload = payload.dup
add_instrumentation_keys!(job, payload)
payload['duration'] = elapsed(started_at)
payload['completed_at'] = Time.now.utc
message = base_message(payload)
if job_exception
payload['message'] = "#{message}: fail: #{payload['duration']} sec"
payload['job_status'] = 'fail'
payload['error_message'] = job_exception.message
payload['error'] = job_exception.class
payload['error_backtrace'] = backtrace_cleaner.clean(job_exception.backtrace)
else
payload['message'] = "#{message}: done: #{payload['duration']} sec"
payload['job_status'] = 'done'
end
convert_to_iso8601(payload, DONE_TIMESTAMP_FIELDS)
payload
end
def parse_job(job)
job = job.dup
# Add process id params
job['pid'] = ::Process.pid
job.delete('args') unless ENV['SIDEKIQ_LOG_ARGUMENTS']
job['args'] = limited_job_args(job['args']) if job['args']
convert_to_iso8601(job, START_TIMESTAMP_FIELDS)
job
end
def convert_to_iso8601(payload, keys)
keys.each do |key|
payload[key] = format_time(payload[key]) if payload[key]
end
end
def elapsed_by_absolute_time(start)
(Time.now.utc - start).to_f.round(3)
end
def elapsed(start)
(current_time - start).round(3)
end
def current_time
Gitlab::Metrics::System.monotonic_time
end
def backtrace_cleaner
@backtrace_cleaner ||= ActiveSupport::BacktraceCleaner.new
end
def format_time(timestamp)
return timestamp if timestamp.is_a?(String)
Time.at(timestamp).utc.iso8601(3)
end
def limited_job_args(args)
return unless args.is_a?(Array)
total_length = 0
limited_args = args.take_while do |arg|
total_length += arg.to_json.length
total_length <= MAXIMUM_JOB_ARGUMENTS_LENGTH
end
limited_args.push('...') if total_length > MAXIMUM_JOB_ARGUMENTS_LENGTH
limited_args
end
end
end
end
|