summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2021-11-16 10:23:39 +0100
committerdcorbacho <dparracorbacho@piotal.io>2021-11-16 10:23:39 +0100
commit242cb539b32dadda74106d689b404d46f454cd82 (patch)
tree7be4807493a9fbe9599037bd95d1b1dc2b97b7e6
parente299178471143402cbdcb4fdb3b99f5f191880a3 (diff)
downloadrabbitmq-server-git-242cb539b32dadda74106d689b404d46f454cd82.tar.gz
Exclude queues from aggregated metrics in prometheus collector
Uses same exclusion pattern as the management agent
-rw-r--r--deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl106
-rw-r--r--deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl5
2 files changed, 77 insertions, 34 deletions
diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
index 8b46770ef8..328ec00276 100644
--- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
+++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
@@ -232,25 +232,25 @@ register() ->
deregister_cleanup(_) -> ok.
collect_mf('detailed', Callback) ->
- collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(), Callback),
+ collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), queues_filter_from_pdict(), enabled_mfs_from_pdict(), Callback),
%% identity is here to enable filtering on a cluster name (as already happens in existing dashboards)
emit_identity_info(Callback),
ok;
collect_mf('per-object', Callback) ->
- collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
+ collect(true, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback),
totals(Callback),
emit_identity_info(Callback),
ok;
collect_mf(_Registry, Callback) ->
PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false),
- collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
+ collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback),
totals(Callback),
emit_identity_info(Callback),
ok.
-collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) ->
+collect(PerObjectMetrics, Prefix, VHostsFilter, QueuesFilter, IncludedMFs, Callback) ->
[begin
- Data = get_data(Table, PerObjectMetrics, VHostsFilter),
+ Data = get_data(Table, PerObjectMetrics, VHostsFilter, QueuesFilter),
mf(Callback, Prefix, Contents, Data)
end || {Table, Contents} <- IncludedMFs, not mutually_exclusive_mf(PerObjectMetrics, Table, IncludedMFs)].
@@ -459,7 +459,7 @@ emit_gauge_metric_if_defined(Labels, Value) ->
gauge_metric(Labels, Value)
end.
-get_data(connection_metrics = Table, false, _) ->
+get_data(connection_metrics = Table, false, _, _) ->
{Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) ->
{T,
sum(proplists:get_value(recv_cnt, Props), A1),
@@ -468,7 +468,7 @@ get_data(connection_metrics = Table, false, _) ->
sum(proplists:get_value(channels, Props), A4)}
end, empty(Table), Table),
[{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}];
-get_data(channel_metrics = Table, false, _) ->
+get_data(channel_metrics = Table, false, _, _) ->
{Table, A1, A2, A3, A4, A5, A6, A7} =
ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
{T,
@@ -483,42 +483,42 @@ get_data(channel_metrics = Table, false, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
-get_data(queue_consumer_count = MF, false, VHostsFilter) ->
+get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
+ ({#resource{kind = queue, name = Name}, Props, _}, {T, A1} = Acc)
+ when is_list(QueuesFilter) ->
+ case re:run(Name, QueuesFilter, [{capture, none}]) of
+ match ->
+ Acc;
+ nomatch ->
+ {T,
+ sum(proplists:get_value(consumers, Props), A1)
+ }
+ end;
({_, Props, _}, {T, A1}) ->
{T,
sum(proplists:get_value(consumers, Props), A1)
}
end, empty(MF), Table),
[{Table, [{consumers, A1}]}];
-get_data(queue_metrics = Table, false, VHostsFilter) ->
+get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) ->
{Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} =
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
- ({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10,
- A11, A12, A13, A14, A15, A16}) ->
- {T,
- sum(proplists:get_value(consumers, Props), A1),
- sum(proplists:get_value(consumer_utilisation, Props), A2),
- sum(proplists:get_value(memory, Props), A3),
- sum(proplists:get_value(messages_ram, Props), A4),
- sum(proplists:get_value(message_bytes_ram, Props), A5),
- sum(proplists:get_value(messages_ready_ram, Props), A6),
- sum(proplists:get_value(messages_unacknowledged_ram, Props), A7),
- sum(proplists:get_value(messages_persistent, Props), A8),
- sum(proplists:get_value(message_bytes_persistent, Props), A9),
- sum(proplists:get_value(message_bytes, Props), A10),
- sum(proplists:get_value(message_bytes_ready, Props), A11),
- sum(proplists:get_value(message_bytes_unacknowledged, Props), A12),
- sum(proplists:get_value(messages_paged_out, Props), A13),
- sum(proplists:get_value(message_bytes_paged_out, Props), A14),
- sum(proplists:get_value(disk_reads, Props), A15),
- sum(proplists:get_value(disk_writes, Props), A16)
- }
+ ({#resource{kind = queue, name = Name}, Props, _}, Acc)
+ when is_list(QueuesFilter) ->
+ case re:run(Name, QueuesFilter, [{capture, none}]) of
+ match ->
+ Acc;
+ nomatch ->
+ sum_queue_metrics(Props, Acc)
+ end;
+ ({_, Props, _}, Acc) ->
+ sum_queue_metrics(Props, Acc)
end, empty(Table), Table),
[{Table, [{consumers, A1}, {consumer_utilisation, A2}, {memory, A3}, {messages_ram, A4},
{message_bytes_ram, A5}, {messages_ready_ram, A6},
@@ -527,7 +527,7 @@ get_data(queue_metrics = Table, false, VHostsFilter) ->
{message_bytes_ready, A11}, {message_bytes_unacknowledged, A12},
{messages_paged_out, A13}, {message_bytes_paged_out, A14},
{disk_reads, A15}, {disk_writes, A16}]}];
-get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
+get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
@@ -538,6 +538,14 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
%% For queue_coarse_metrics
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
+ ({#resource{kind = queue, name = Name}, V1, V2, V3, V4}, {T, A1, A2, A3, A4} = Acc)
+ when is_list(QueuesFilter) ->
+ case re:run(Name, QueuesFilter, [{capture, none}]) of
+ match ->
+ Acc;
+ nomatch ->
+ {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4}
+ end;
({_, V1}, {T, A1}) ->
{T, V1 + A1};
({_, V1, _}, {T, A1}) ->
@@ -564,14 +572,14 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
_ ->
[Result]
end;
-get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
+get_data(queue_coarse_metrics = Table, true, VHostsFilter, _) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
[Row|Acc];
(_, Acc) ->
Acc
end, [], Table);
-get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
+get_data(MF, true, VHostsFilter, _) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
Table = queue_metrics,
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
@@ -579,11 +587,33 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
(_, Acc) ->
Acc
end, [], Table);
-get_data(queue_consumer_count, true, _) ->
+get_data(queue_consumer_count, true, _, _) ->
ets:tab2list(queue_metrics);
-get_data(Table, _, _) ->
+get_data(Table, _, _, _) ->
ets:tab2list(Table).
+
+sum_queue_metrics(Props, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11,
+ A12, A13, A14, A15, A16}) ->
+ {T,
+ sum(proplists:get_value(consumers, Props), A1),
+ sum(proplists:get_value(consumer_utilisation, Props), A2),
+ sum(proplists:get_value(memory, Props), A3),
+ sum(proplists:get_value(messages_ram, Props), A4),
+ sum(proplists:get_value(message_bytes_ram, Props), A5),
+ sum(proplists:get_value(messages_ready_ram, Props), A6),
+ sum(proplists:get_value(messages_unacknowledged_ram, Props), A7),
+ sum(proplists:get_value(messages_persistent, Props), A8),
+ sum(proplists:get_value(message_bytes_persistent, Props), A9),
+ sum(proplists:get_value(message_bytes, Props), A10),
+ sum(proplists:get_value(message_bytes_ready, Props), A11),
+ sum(proplists:get_value(message_bytes_unacknowledged, Props), A12),
+ sum(proplists:get_value(messages_paged_out, Props), A13),
+ sum(proplists:get_value(message_bytes_paged_out, Props), A14),
+ sum(proplists:get_value(disk_reads, Props), A15),
+ sum(proplists:get_value(disk_writes, Props), A16)
+ }.
+
division(0, 0) ->
0;
division(A, B) ->
@@ -631,3 +661,11 @@ vhosts_filter_from_pdict() ->
Enabled = maps:from_list([ {VHost, true} || VHost <- L ]),
maps:merge(All, Enabled)
end.
+
+queues_filter_from_pdict() ->
+ case get(prometheus_queue_filter) of
+ undefined ->
+ false;
+ Pattern ->
+ Pattern
+ end.
diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
index 954e2f878d..9b1d6540e9 100644
--- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
+++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
@@ -163,6 +163,11 @@ put_filtering_options_into_process_dictionary(Request) ->
put(prometheus_mf_filter, Fs);
_ -> ok
end,
+ case rabbit_mgmt_agent_config:get_env(filter_aggregated_queue_metrics_pattern) of
+ undefined -> ok;
+ Pattern ->
+ put(prometheus_queue_filter, Pattern)
+ end,
ok.
parse_vhosts(N) when is_binary(N) ->