summaryrefslogtreecommitdiff
path: root/lib/gitlab/elasticsearch/logs/lines.rb
diff options
context:
space:
mode:
Diffstat (limited to 'lib/gitlab/elasticsearch/logs/lines.rb')
-rw-r--r--lib/gitlab/elasticsearch/logs/lines.rb156
1 files changed, 156 insertions, 0 deletions
diff --git a/lib/gitlab/elasticsearch/logs/lines.rb b/lib/gitlab/elasticsearch/logs/lines.rb
new file mode 100644
index 00000000000..fb32a6c9fcd
--- /dev/null
+++ b/lib/gitlab/elasticsearch/logs/lines.rb
@@ -0,0 +1,156 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Elasticsearch
+ module Logs
+ class Lines
+ InvalidCursor = Class.new(RuntimeError)
+
+ # How many log lines to fetch in a query
+ LOGS_LIMIT = 500
+
+ def initialize(client)
+ @client = client
+ end
+
+ def pod_logs(namespace, pod_name: nil, container_name: nil, search: nil, start_time: nil, end_time: nil, cursor: nil)
+ query = { bool: { must: [] } }.tap do |q|
+ filter_pod_name(q, pod_name)
+ filter_namespace(q, namespace)
+ filter_container_name(q, container_name)
+ filter_search(q, search)
+ filter_times(q, start_time, end_time)
+ end
+
+ body = build_body(query, cursor)
+ response = @client.search body: body
+
+ format_response(response)
+ end
+
+ private
+
+ def build_body(query, cursor = nil)
+ body = {
+ query: query,
+ # reverse order so we can query N-most recent records
+ sort: [
+ { "@timestamp": { order: :desc } },
+ { "offset": { order: :desc } }
+ ],
+ # only return these fields in the response
+ _source: ["@timestamp", "message", "kubernetes.pod.name"],
+ # fixed limit for now, we should support paginated queries
+ size: ::Gitlab::Elasticsearch::Logs::Lines::LOGS_LIMIT
+ }
+
+ unless cursor.nil?
+ body[:search_after] = decode_cursor(cursor)
+ end
+
+ body
+ end
+
+ def filter_pod_name(query, pod_name)
+ # We can filter by "all pods" with a null pod_name
+ return if pod_name.nil?
+
+ query[:bool][:must] << {
+ match_phrase: {
+ "kubernetes.pod.name" => {
+ query: pod_name
+ }
+ }
+ }
+ end
+
+ def filter_namespace(query, namespace)
+ query[:bool][:must] << {
+ match_phrase: {
+ "kubernetes.namespace" => {
+ query: namespace
+ }
+ }
+ }
+ end
+
+ def filter_container_name(query, container_name)
+ # A pod can contain multiple containers.
+ # By default we return logs from every container
+ return if container_name.nil?
+
+ query[:bool][:must] << {
+ match_phrase: {
+ "kubernetes.container.name" => {
+ query: container_name
+ }
+ }
+ }
+ end
+
+ def filter_search(query, search)
+ return if search.nil?
+
+ query[:bool][:must] << {
+ simple_query_string: {
+ query: search,
+ fields: [:message],
+ default_operator: :and
+ }
+ }
+ end
+
+ def filter_times(query, start_time, end_time)
+ return unless start_time || end_time
+
+ time_range = { range: { :@timestamp => {} } }.tap do |tr|
+ tr[:range][:@timestamp][:gte] = start_time if start_time
+ tr[:range][:@timestamp][:lt] = end_time if end_time
+ end
+
+ query[:bool][:filter] = [time_range]
+ end
+
+ def format_response(response)
+ results = response.fetch("hits", {}).fetch("hits", [])
+ last_result = results.last
+ results = results.map do |hit|
+ {
+ timestamp: hit["_source"]["@timestamp"],
+ message: hit["_source"]["message"],
+ pod: hit["_source"]["kubernetes"]["pod"]["name"]
+ }
+ end
+
+ # we queried for the N-most recent records but we want them ordered oldest to newest
+ {
+ logs: results.reverse,
+ cursor: last_result.nil? ? nil : encode_cursor(last_result["sort"])
+ }
+ end
+
+ # we want to hide the implementation details of the search_after parameter from the frontend
+ # behind a single easily transmitted value
+ def encode_cursor(obj)
+ obj.join(',')
+ end
+
+ def decode_cursor(obj)
+ cursor = obj.split(',').map(&:to_i)
+
+ unless valid_cursor(cursor)
+ raise InvalidCursor, "invalid cursor format"
+ end
+
+ cursor
+ end
+
+ def valid_cursor(cursor)
+ cursor.instance_of?(Array) &&
+ cursor.length == 2 &&
+ cursor.map {|i| i.instance_of?(Integer)}.reduce(:&)
+ end
+ end
+ end
+ end
+end