diff options
author | dcorbacho <dparracorbacho@piotal.io> | 2021-11-16 10:23:39 +0100 |
---|---|---|
committer | dcorbacho <dparracorbacho@piotal.io> | 2021-11-16 10:23:39 +0100 |
commit | 242cb539b32dadda74106d689b404d46f454cd82 (patch) | |
tree | 7be4807493a9fbe9599037bd95d1b1dc2b97b7e6 | |
parent | e299178471143402cbdcb4fdb3b99f5f191880a3 (diff) | |
download | rabbitmq-server-git-242cb539b32dadda74106d689b404d46f454cd82.tar.gz |
Exclude queues from aggregated metrics in prometheus collector
Uses same exclusion pattern as the management agent
-rw-r--r-- | deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl | 106 | ||||
-rw-r--r-- | deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl | 5 |
2 files changed, 77 insertions, 34 deletions
diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 8b46770ef8..328ec00276 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -232,25 +232,25 @@ register() -> deregister_cleanup(_) -> ok. collect_mf('detailed', Callback) -> - collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(), Callback), + collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), queues_filter_from_pdict(), enabled_mfs_from_pdict(), Callback), %% identity is here to enable filtering on a cluster name (as already happens in existing dashboards) emit_identity_info(Callback), ok; collect_mf('per-object', Callback) -> - collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), + collect(true, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback), totals(Callback), emit_identity_info(Callback), ok; collect_mf(_Registry, Callback) -> PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false), - collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), + collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback), totals(Callback), emit_identity_info(Callback), ok. -collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) -> +collect(PerObjectMetrics, Prefix, VHostsFilter, QueuesFilter, IncludedMFs, Callback) -> [begin - Data = get_data(Table, PerObjectMetrics, VHostsFilter), + Data = get_data(Table, PerObjectMetrics, VHostsFilter, QueuesFilter), mf(Callback, Prefix, Contents, Data) end || {Table, Contents} <- IncludedMFs, not mutually_exclusive_mf(PerObjectMetrics, Table, IncludedMFs)]. @@ -459,7 +459,7 @@ emit_gauge_metric_if_defined(Labels, Value) -> gauge_metric(Labels, Value) end. -get_data(connection_metrics = Table, false, _) -> +get_data(connection_metrics = Table, false, _, _) -> {Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) -> {T, sum(proplists:get_value(recv_cnt, Props), A1), @@ -468,7 +468,7 @@ get_data(connection_metrics = Table, false, _) -> sum(proplists:get_value(channels, Props), A4)} end, empty(Table), Table), [{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}]; -get_data(channel_metrics = Table, false, _) -> +get_data(channel_metrics = Table, false, _, _) -> {Table, A1, A2, A3, A4, A5, A6, A7} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) -> {T, @@ -483,42 +483,42 @@ get_data(channel_metrics = Table, false, _) -> [{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3}, {messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6}, {global_prefetch_count, A7}]}]; -get_data(queue_consumer_count = MF, false, VHostsFilter) -> +get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) -> Table = queue_metrics, %% Real table name {_, A1} = ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; + ({#resource{kind = queue, name = Name}, Props, _}, {T, A1} = Acc) + when is_list(QueuesFilter) -> + case re:run(Name, QueuesFilter, [{capture, none}]) of + match -> + Acc; + nomatch -> + {T, + sum(proplists:get_value(consumers, Props), A1) + } + end; ({_, Props, _}, {T, A1}) -> {T, sum(proplists:get_value(consumers, Props), A1) } end, empty(MF), Table), [{Table, [{consumers, A1}]}]; -get_data(queue_metrics = Table, false, VHostsFilter) -> +get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) -> {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} = ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; - ({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, - A11, A12, A13, A14, A15, A16}) -> - {T, - sum(proplists:get_value(consumers, Props), A1), - sum(proplists:get_value(consumer_utilisation, Props), A2), - sum(proplists:get_value(memory, Props), A3), - sum(proplists:get_value(messages_ram, Props), A4), - sum(proplists:get_value(message_bytes_ram, Props), A5), - sum(proplists:get_value(messages_ready_ram, Props), A6), - sum(proplists:get_value(messages_unacknowledged_ram, Props), A7), - sum(proplists:get_value(messages_persistent, Props), A8), - sum(proplists:get_value(message_bytes_persistent, Props), A9), - sum(proplists:get_value(message_bytes, Props), A10), - sum(proplists:get_value(message_bytes_ready, Props), A11), - sum(proplists:get_value(message_bytes_unacknowledged, Props), A12), - sum(proplists:get_value(messages_paged_out, Props), A13), - sum(proplists:get_value(message_bytes_paged_out, Props), A14), - sum(proplists:get_value(disk_reads, Props), A15), - sum(proplists:get_value(disk_writes, Props), A16) - } + ({#resource{kind = queue, name = Name}, Props, _}, Acc) + when is_list(QueuesFilter) -> + case re:run(Name, QueuesFilter, [{capture, none}]) of + match -> + Acc; + nomatch -> + sum_queue_metrics(Props, Acc) + end; + ({_, Props, _}, Acc) -> + sum_queue_metrics(Props, Acc) end, empty(Table), Table), [{Table, [{consumers, A1}, {consumer_utilisation, A2}, {memory, A3}, {messages_ram, A4}, {message_bytes_ram, A5}, {messages_ready_ram, A6}, @@ -527,7 +527,7 @@ get_data(queue_metrics = Table, false, VHostsFilter) -> {message_bytes_ready, A11}, {message_bytes_unacknowledged, A12}, {messages_paged_out, A13}, {message_bytes_paged_out, A14}, {disk_reads, A15}, {disk_writes, A16}]}]; -get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; +get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchange_metrics; Table == queue_coarse_metrics; Table == channel_queue_metrics; Table == connection_coarse_metrics; @@ -538,6 +538,14 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; %% For queue_coarse_metrics ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; + ({#resource{kind = queue, name = Name}, V1, V2, V3, V4}, {T, A1, A2, A3, A4} = Acc) + when is_list(QueuesFilter) -> + case re:run(Name, QueuesFilter, [{capture, none}]) of + match -> + Acc; + nomatch -> + {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4} + end; ({_, V1}, {T, A1}) -> {T, V1 + A1}; ({_, V1, _}, {T, A1}) -> @@ -564,14 +572,14 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; _ -> [Result] end; -get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> +get_data(queue_coarse_metrics = Table, true, VHostsFilter, _) when is_map(VHostsFilter) -> ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> [Row|Acc]; (_, Acc) -> Acc end, [], Table); -get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count -> +get_data(MF, true, VHostsFilter, _) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count -> Table = queue_metrics, ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> @@ -579,11 +587,33 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics (_, Acc) -> Acc end, [], Table); -get_data(queue_consumer_count, true, _) -> +get_data(queue_consumer_count, true, _, _) -> ets:tab2list(queue_metrics); -get_data(Table, _, _) -> +get_data(Table, _, _, _) -> ets:tab2list(Table). + +sum_queue_metrics(Props, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, + A12, A13, A14, A15, A16}) -> + {T, + sum(proplists:get_value(consumers, Props), A1), + sum(proplists:get_value(consumer_utilisation, Props), A2), + sum(proplists:get_value(memory, Props), A3), + sum(proplists:get_value(messages_ram, Props), A4), + sum(proplists:get_value(message_bytes_ram, Props), A5), + sum(proplists:get_value(messages_ready_ram, Props), A6), + sum(proplists:get_value(messages_unacknowledged_ram, Props), A7), + sum(proplists:get_value(messages_persistent, Props), A8), + sum(proplists:get_value(message_bytes_persistent, Props), A9), + sum(proplists:get_value(message_bytes, Props), A10), + sum(proplists:get_value(message_bytes_ready, Props), A11), + sum(proplists:get_value(message_bytes_unacknowledged, Props), A12), + sum(proplists:get_value(messages_paged_out, Props), A13), + sum(proplists:get_value(message_bytes_paged_out, Props), A14), + sum(proplists:get_value(disk_reads, Props), A15), + sum(proplists:get_value(disk_writes, Props), A16) + }. + division(0, 0) -> 0; division(A, B) -> @@ -631,3 +661,11 @@ vhosts_filter_from_pdict() -> Enabled = maps:from_list([ {VHost, true} || VHost <- L ]), maps:merge(All, Enabled) end. + +queues_filter_from_pdict() -> + case get(prometheus_queue_filter) of + undefined -> + false; + Pattern -> + Pattern + end. diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl index 954e2f878d..9b1d6540e9 100644 --- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl +++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl @@ -163,6 +163,11 @@ put_filtering_options_into_process_dictionary(Request) -> put(prometheus_mf_filter, Fs); _ -> ok end, + case rabbit_mgmt_agent_config:get_env(filter_aggregated_queue_metrics_pattern) of + undefined -> ok; + Pattern -> + put(prometheus_queue_filter, Pattern) + end, ok. parse_vhosts(N) when is_binary(N) -> |