1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
|
# frozen_string_literal: true
module Gitlab
module Elasticsearch
module Logs
class Lines
InvalidCursor = Class.new(RuntimeError)
# How many log lines to fetch in a query
LOGS_LIMIT = 500
def initialize(client)
@client = client
end
def pod_logs(namespace, pod_name: nil, container_name: nil, search: nil, start_time: nil, end_time: nil, cursor: nil)
query = { bool: { must: [] } }.tap do |q|
filter_pod_name(q, pod_name)
filter_namespace(q, namespace)
filter_container_name(q, container_name)
filter_search(q, search)
filter_times(q, start_time, end_time)
end
body = build_body(query, cursor)
response = @client.search body: body
format_response(response)
end
private
def build_body(query, cursor = nil)
body = {
query: query,
# reverse order so we can query N-most recent records
sort: [
{ "@timestamp": { order: :desc } },
{ "offset": { order: :desc } }
],
# only return these fields in the response
_source: ["@timestamp", "message", "kubernetes.pod.name"],
# fixed limit for now, we should support paginated queries
size: ::Gitlab::Elasticsearch::Logs::Lines::LOGS_LIMIT
}
unless cursor.nil?
body[:search_after] = decode_cursor(cursor)
end
body
end
def filter_pod_name(query, pod_name)
# We can filter by "all pods" with a null pod_name
return if pod_name.nil?
query[:bool][:must] << {
match_phrase: {
"kubernetes.pod.name" => {
query: pod_name
}
}
}
end
def filter_namespace(query, namespace)
query[:bool][:must] << {
match_phrase: {
"kubernetes.namespace" => {
query: namespace
}
}
}
end
def filter_container_name(query, container_name)
# A pod can contain multiple containers.
# By default we return logs from every container
return if container_name.nil?
query[:bool][:must] << {
match_phrase: {
"kubernetes.container.name" => {
query: container_name
}
}
}
end
def filter_search(query, search)
return if search.nil?
query[:bool][:must] << {
simple_query_string: {
query: search,
fields: [:message],
default_operator: :and
}
}
end
def filter_times(query, start_time, end_time)
return unless start_time || end_time
time_range = { range: { :@timestamp => {} } }.tap do |tr|
tr[:range][:@timestamp][:gte] = start_time if start_time
tr[:range][:@timestamp][:lt] = end_time if end_time
end
query[:bool][:filter] = [time_range]
end
def format_response(response)
results = response.fetch("hits", {}).fetch("hits", [])
last_result = results.last
results = results.map do |hit|
{
timestamp: hit["_source"]["@timestamp"],
message: hit["_source"]["message"],
pod: hit["_source"]["kubernetes"]["pod"]["name"]
}
end
# we queried for the N-most recent records but we want them ordered oldest to newest
{
logs: results.reverse,
cursor: last_result.nil? ? nil : encode_cursor(last_result["sort"])
}
end
# we want to hide the implementation details of the search_after parameter from the frontend
# behind a single easily transmitted value
def encode_cursor(obj)
obj.join(',')
end
def decode_cursor(obj)
cursor = obj.split(',').map(&:to_i)
unless valid_cursor(cursor)
raise InvalidCursor, "invalid cursor format"
end
cursor
end
def valid_cursor(cursor)
cursor.instance_of?(Array) &&
cursor.length == 2 &&
cursor.map {|i| i.instance_of?(Integer)}.reduce(:&)
end
end
end
end
end
|