summaryrefslogtreecommitdiff
path: root/lib/gitlab/elasticsearch
diff options
context:
space:
mode:
authorGitLab Bot <gitlab-bot@gitlab.com>2020-04-20 18:38:24 +0000
committerGitLab Bot <gitlab-bot@gitlab.com>2020-04-20 18:38:24 +0000
commit983a0bba5d2a042c4a3bbb22432ec192c7501d82 (patch)
treeb153cd387c14ba23bd5a07514c7c01fddf6a78a0 /lib/gitlab/elasticsearch
parenta2bddee2cdb38673df0e004d5b32d9f77797de64 (diff)
downloadgitlab-ce-983a0bba5d2a042c4a3bbb22432ec192c7501d82.tar.gz
Add latest changes from gitlab-org/gitlab@12-10-stable-ee
Diffstat (limited to 'lib/gitlab/elasticsearch')
-rw-r--r--lib/gitlab/elasticsearch/logs.rb154
-rw-r--r--lib/gitlab/elasticsearch/logs/lines.rb156
-rw-r--r--lib/gitlab/elasticsearch/logs/pods.rb70
3 files changed, 226 insertions, 154 deletions
diff --git a/lib/gitlab/elasticsearch/logs.rb b/lib/gitlab/elasticsearch/logs.rb
deleted file mode 100644
index 3b6d1d0286a..00000000000
--- a/lib/gitlab/elasticsearch/logs.rb
+++ /dev/null
@@ -1,154 +0,0 @@
-# frozen_string_literal: true
-
-module Gitlab
- module Elasticsearch
- class Logs
- InvalidCursor = Class.new(RuntimeError)
-
- # How many log lines to fetch in a query
- LOGS_LIMIT = 500
-
- def initialize(client)
- @client = client
- end
-
- def pod_logs(namespace, pod_name: nil, container_name: nil, search: nil, start_time: nil, end_time: nil, cursor: nil)
- query = { bool: { must: [] } }.tap do |q|
- filter_pod_name(q, pod_name)
- filter_namespace(q, namespace)
- filter_container_name(q, container_name)
- filter_search(q, search)
- filter_times(q, start_time, end_time)
- end
-
- body = build_body(query, cursor)
- response = @client.search body: body
-
- format_response(response)
- end
-
- private
-
- def build_body(query, cursor = nil)
- body = {
- query: query,
- # reverse order so we can query N-most recent records
- sort: [
- { "@timestamp": { order: :desc } },
- { "offset": { order: :desc } }
- ],
- # only return these fields in the response
- _source: ["@timestamp", "message", "kubernetes.pod.name"],
- # fixed limit for now, we should support paginated queries
- size: ::Gitlab::Elasticsearch::Logs::LOGS_LIMIT
- }
-
- unless cursor.nil?
- body[:search_after] = decode_cursor(cursor)
- end
-
- body
- end
-
- def filter_pod_name(query, pod_name)
- # We can filter by "all pods" with a null pod_name
- return if pod_name.nil?
-
- query[:bool][:must] << {
- match_phrase: {
- "kubernetes.pod.name" => {
- query: pod_name
- }
- }
- }
- end
-
- def filter_namespace(query, namespace)
- query[:bool][:must] << {
- match_phrase: {
- "kubernetes.namespace" => {
- query: namespace
- }
- }
- }
- end
-
- def filter_container_name(query, container_name)
- # A pod can contain multiple containers.
- # By default we return logs from every container
- return if container_name.nil?
-
- query[:bool][:must] << {
- match_phrase: {
- "kubernetes.container.name" => {
- query: container_name
- }
- }
- }
- end
-
- def filter_search(query, search)
- return if search.nil?
-
- query[:bool][:must] << {
- simple_query_string: {
- query: search,
- fields: [:message],
- default_operator: :and
- }
- }
- end
-
- def filter_times(query, start_time, end_time)
- return unless start_time || end_time
-
- time_range = { range: { :@timestamp => {} } }.tap do |tr|
- tr[:range][:@timestamp][:gte] = start_time if start_time
- tr[:range][:@timestamp][:lt] = end_time if end_time
- end
-
- query[:bool][:filter] = [time_range]
- end
-
- def format_response(response)
- results = response.fetch("hits", {}).fetch("hits", [])
- last_result = results.last
- results = results.map do |hit|
- {
- timestamp: hit["_source"]["@timestamp"],
- message: hit["_source"]["message"],
- pod: hit["_source"]["kubernetes"]["pod"]["name"]
- }
- end
-
- # we queried for the N-most recent records but we want them ordered oldest to newest
- {
- logs: results.reverse,
- cursor: last_result.nil? ? nil : encode_cursor(last_result["sort"])
- }
- end
-
- # we want to hide the implementation details of the search_after parameter from the frontend
- # behind a single easily transmitted value
- def encode_cursor(obj)
- obj.join(',')
- end
-
- def decode_cursor(obj)
- cursor = obj.split(',').map(&:to_i)
-
- unless valid_cursor(cursor)
- raise InvalidCursor, "invalid cursor format"
- end
-
- cursor
- end
-
- def valid_cursor(cursor)
- cursor.instance_of?(Array) &&
- cursor.length == 2 &&
- cursor.map {|i| i.instance_of?(Integer)}.reduce(:&)
- end
- end
- end
-end
diff --git a/lib/gitlab/elasticsearch/logs/lines.rb b/lib/gitlab/elasticsearch/logs/lines.rb
new file mode 100644
index 00000000000..fb32a6c9fcd
--- /dev/null
+++ b/lib/gitlab/elasticsearch/logs/lines.rb
@@ -0,0 +1,156 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Elasticsearch
+ module Logs
+ class Lines
+ InvalidCursor = Class.new(RuntimeError)
+
+ # How many log lines to fetch in a query
+ LOGS_LIMIT = 500
+
+ def initialize(client)
+ @client = client
+ end
+
+ def pod_logs(namespace, pod_name: nil, container_name: nil, search: nil, start_time: nil, end_time: nil, cursor: nil)
+ query = { bool: { must: [] } }.tap do |q|
+ filter_pod_name(q, pod_name)
+ filter_namespace(q, namespace)
+ filter_container_name(q, container_name)
+ filter_search(q, search)
+ filter_times(q, start_time, end_time)
+ end
+
+ body = build_body(query, cursor)
+ response = @client.search body: body
+
+ format_response(response)
+ end
+
+ private
+
+ def build_body(query, cursor = nil)
+ body = {
+ query: query,
+ # reverse order so we can query N-most recent records
+ sort: [
+ { "@timestamp": { order: :desc } },
+ { "offset": { order: :desc } }
+ ],
+ # only return these fields in the response
+ _source: ["@timestamp", "message", "kubernetes.pod.name"],
+ # fixed limit for now, we should support paginated queries
+ size: ::Gitlab::Elasticsearch::Logs::Lines::LOGS_LIMIT
+ }
+
+ unless cursor.nil?
+ body[:search_after] = decode_cursor(cursor)
+ end
+
+ body
+ end
+
+ def filter_pod_name(query, pod_name)
+ # We can filter by "all pods" with a null pod_name
+ return if pod_name.nil?
+
+ query[:bool][:must] << {
+ match_phrase: {
+ "kubernetes.pod.name" => {
+ query: pod_name
+ }
+ }
+ }
+ end
+
+ def filter_namespace(query, namespace)
+ query[:bool][:must] << {
+ match_phrase: {
+ "kubernetes.namespace" => {
+ query: namespace
+ }
+ }
+ }
+ end
+
+ def filter_container_name(query, container_name)
+ # A pod can contain multiple containers.
+ # By default we return logs from every container
+ return if container_name.nil?
+
+ query[:bool][:must] << {
+ match_phrase: {
+ "kubernetes.container.name" => {
+ query: container_name
+ }
+ }
+ }
+ end
+
+ def filter_search(query, search)
+ return if search.nil?
+
+ query[:bool][:must] << {
+ simple_query_string: {
+ query: search,
+ fields: [:message],
+ default_operator: :and
+ }
+ }
+ end
+
+ def filter_times(query, start_time, end_time)
+ return unless start_time || end_time
+
+ time_range = { range: { :@timestamp => {} } }.tap do |tr|
+ tr[:range][:@timestamp][:gte] = start_time if start_time
+ tr[:range][:@timestamp][:lt] = end_time if end_time
+ end
+
+ query[:bool][:filter] = [time_range]
+ end
+
+ def format_response(response)
+ results = response.fetch("hits", {}).fetch("hits", [])
+ last_result = results.last
+ results = results.map do |hit|
+ {
+ timestamp: hit["_source"]["@timestamp"],
+ message: hit["_source"]["message"],
+ pod: hit["_source"]["kubernetes"]["pod"]["name"]
+ }
+ end
+
+ # we queried for the N-most recent records but we want them ordered oldest to newest
+ {
+ logs: results.reverse,
+ cursor: last_result.nil? ? nil : encode_cursor(last_result["sort"])
+ }
+ end
+
+ # we want to hide the implementation details of the search_after parameter from the frontend
+ # behind a single easily transmitted value
+ def encode_cursor(obj)
+ obj.join(',')
+ end
+
+ def decode_cursor(obj)
+ cursor = obj.split(',').map(&:to_i)
+
+ unless valid_cursor(cursor)
+ raise InvalidCursor, "invalid cursor format"
+ end
+
+ cursor
+ end
+
+ def valid_cursor(cursor)
+ cursor.instance_of?(Array) &&
+ cursor.length == 2 &&
+ cursor.map {|i| i.instance_of?(Integer)}.reduce(:&)
+ end
+ end
+ end
+ end
+end
diff --git a/lib/gitlab/elasticsearch/logs/pods.rb b/lib/gitlab/elasticsearch/logs/pods.rb
new file mode 100644
index 00000000000..66499ae956a
--- /dev/null
+++ b/lib/gitlab/elasticsearch/logs/pods.rb
@@ -0,0 +1,70 @@
+# frozen_string_literal: true
+
+module Gitlab
+ module Elasticsearch
+ module Logs
+ class Pods
+ # How many items to fetch in a query
+ PODS_LIMIT = 500
+ CONTAINERS_LIMIT = 500
+
+ def initialize(client)
+ @client = client
+ end
+
+ def pods(namespace)
+ body = build_body(namespace)
+ response = @client.search body: body
+
+ format_response(response)
+ end
+
+ private
+
+ def build_body(namespace)
+ {
+ aggs: {
+ pods: {
+ aggs: {
+ containers: {
+ terms: {
+ field: 'kubernetes.container.name',
+ size: ::Gitlab::Elasticsearch::Logs::Pods::CONTAINERS_LIMIT
+ }
+ }
+ },
+ terms: {
+ field: 'kubernetes.pod.name',
+ size: ::Gitlab::Elasticsearch::Logs::Pods::PODS_LIMIT
+ }
+ }
+ },
+ query: {
+ bool: {
+ must: {
+ match_phrase: {
+ "kubernetes.namespace": namespace
+ }
+ }
+ }
+ },
+ # don't populate hits, only the aggregation is needed
+ size: 0
+ }
+ end
+
+ def format_response(response)
+ results = response.dig("aggregations", "pods", "buckets") || []
+ results.map do |bucket|
+ {
+ name: bucket["key"],
+ container_names: (bucket.dig("containers", "buckets") || []).map do |cbucket|
+ cbucket["key"]
+ end
+ }
+ end
+ end
+ end
+ end
+ end
+end