summaryrefslogtreecommitdiff
path: root/lib/gitlab/elasticsearch/logs/lines.rb
blob: fb32a6c9fcdb6d894cdb4fc6b83ff336ad870f05 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
# frozen_string_literal: true

module Gitlab
  module Elasticsearch
    module Logs
      class Lines
        InvalidCursor = Class.new(RuntimeError)

        # How many log lines to fetch in a query
        LOGS_LIMIT = 500

        def initialize(client)
          @client = client
        end

        def pod_logs(namespace, pod_name: nil, container_name: nil, search: nil, start_time: nil, end_time: nil, cursor: nil)
          query = { bool: { must: [] } }.tap do |q|
            filter_pod_name(q, pod_name)
            filter_namespace(q, namespace)
            filter_container_name(q, container_name)
            filter_search(q, search)
            filter_times(q, start_time, end_time)
          end

          body = build_body(query, cursor)
          response = @client.search body: body

          format_response(response)
        end

        private

        def build_body(query, cursor = nil)
          body = {
            query: query,
            # reverse order so we can query N-most recent records
            sort: [
              { "@timestamp": { order: :desc } },
              { "offset": { order: :desc } }
            ],
            # only return these fields in the response
            _source: ["@timestamp", "message", "kubernetes.pod.name"],
            # fixed limit for now, we should support paginated queries
            size: ::Gitlab::Elasticsearch::Logs::Lines::LOGS_LIMIT
          }

          unless cursor.nil?
            body[:search_after] = decode_cursor(cursor)
          end

          body
        end

        def filter_pod_name(query, pod_name)
          # We can filter by "all pods" with a null pod_name
          return if pod_name.nil?

          query[:bool][:must] << {
            match_phrase: {
              "kubernetes.pod.name" => {
                query: pod_name
              }
            }
          }
        end

        def filter_namespace(query, namespace)
          query[:bool][:must] << {
            match_phrase: {
              "kubernetes.namespace" => {
                query: namespace
              }
            }
          }
        end

        def filter_container_name(query, container_name)
          # A pod can contain multiple containers.
          # By default we return logs from every container
          return if container_name.nil?

          query[:bool][:must] << {
            match_phrase: {
              "kubernetes.container.name" => {
                query: container_name
              }
            }
          }
        end

        def filter_search(query, search)
          return if search.nil?

          query[:bool][:must] << {
            simple_query_string: {
              query: search,
              fields: [:message],
              default_operator: :and
            }
          }
        end

        def filter_times(query, start_time, end_time)
          return unless start_time || end_time

          time_range = { range: { :@timestamp => {} } }.tap do |tr|
            tr[:range][:@timestamp][:gte] = start_time if start_time
            tr[:range][:@timestamp][:lt] = end_time if end_time
          end

          query[:bool][:filter] = [time_range]
        end

        def format_response(response)
          results = response.fetch("hits", {}).fetch("hits", [])
          last_result = results.last
          results = results.map do |hit|
            {
              timestamp: hit["_source"]["@timestamp"],
              message: hit["_source"]["message"],
              pod: hit["_source"]["kubernetes"]["pod"]["name"]
            }
          end

          # we queried for the N-most recent records but we want them ordered oldest to newest
          {
            logs: results.reverse,
            cursor: last_result.nil? ? nil : encode_cursor(last_result["sort"])
          }
        end

        # we want to hide the implementation details of the search_after parameter from the frontend
        # behind a single easily transmitted value
        def encode_cursor(obj)
          obj.join(',')
        end

        def decode_cursor(obj)
          cursor = obj.split(',').map(&:to_i)

          unless valid_cursor(cursor)
            raise InvalidCursor, "invalid cursor format"
          end

          cursor
        end

        def valid_cursor(cursor)
          cursor.instance_of?(Array) &&
          cursor.length == 2 &&
          cursor.map {|i| i.instance_of?(Integer)}.reduce(:&)
        end
      end
    end
  end
end