summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <klishinm@vmware.com>2021-11-23 15:01:40 +0300
committerGitHub <noreply@github.com>2021-11-23 15:01:40 +0300
commit0f0da778d0c47e75a5aa6cc5adb6a2f1a2ffcc4e (patch)
treee6302636b9bb2e81d702204f5f47b35648368799
parent2483711505622d6e2393f4f967daac775d0e27cb (diff)
parentbd2858c2086fb81c169d29b129b0e0b27e7a8504 (diff)
downloadrabbitmq-server-git-0f0da778d0c47e75a5aa6cc5adb6a2f1a2ffcc4e.tar.gz
Merge pull request #3708 from rabbitmq/filter-out-queues-from-global-stats
Use a pattern to filter out some queues from global stats
-rw-r--r--deps/rabbit/src/rabbit_channel.erl2
-rw-r--r--deps/rabbitmq_management_agent/priv/schema/rabbitmq_management_agent.schema1
-rw-r--r--deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl32
-rw-r--r--deps/rabbitmq_prometheus/priv/schema/rabbitmq_prometheus.schema2
-rw-r--r--deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl106
-rw-r--r--deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl6
6 files changed, 105 insertions, 44 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl
index 2d5437421d..4d85070001 100644
--- a/deps/rabbit/src/rabbit_channel.erl
+++ b/deps/rabbit/src/rabbit_channel.erl
@@ -2166,7 +2166,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex
Qs = rabbit_amqqueue:lookup(AllNames),
case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of
{ok, QueueStates, Actions} ->
- rabbit_global_counters:messages_routed(amqp091, length(Qs)),
+ rabbit_global_counters:messages_routed(amqp091, 1),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
diff --git a/deps/rabbitmq_management_agent/priv/schema/rabbitmq_management_agent.schema b/deps/rabbitmq_management_agent/priv/schema/rabbitmq_management_agent.schema
index fa8a76725a..3d66d9aaee 100644
--- a/deps/rabbitmq_management_agent/priv/schema/rabbitmq_management_agent.schema
+++ b/deps/rabbitmq_management_agent/priv/schema/rabbitmq_management_agent.schema
@@ -2,3 +2,4 @@
%% Also the management application will refuse to start if metrics collection is disabled
{mapping, "management_agent.disable_metrics_collector", "rabbitmq_management_agent.disable_metrics_collector",
[{datatype, {enum, [true, false]}}]}.
+{mapping, "management_agent.filter_aggregated_queue_metrics_pattern", "rabbitmq_management_agent.filter_aggregated_queue_metrics_pattern", [{datatype, string}]}. \ No newline at end of file
diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl
index 3df72d9233..9ab07fd808 100644
--- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl
+++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl
@@ -26,7 +26,8 @@
-import(rabbit_mgmt_data, [lookup_element/3]).
-record(state, {table, interval, policies, rates_mode, lookup_queue,
- lookup_exchange, old_aggr_stats}).
+ lookup_exchange, old_aggr_stats,
+ filter_aggregated_queue_metrics_pattern}).
%% Data is stored in ETS tables:
%% * One ETS table per metric (queue_stats, channel_stats_deliver_stats...)
@@ -59,7 +60,7 @@ reset_lookups(Table) ->
gen_server:call(name(Table), reset_lookups, infinity).
init([Table]) ->
- {RatesMode, Policies} = load_config(),
+ {RatesMode, Policies, FilterPattern} = load_config(),
Policy = retention_policy(Table),
Interval = take_smaller(proplists:get_value(Policy, Policies, [])) * 1000,
erlang:send_after(Interval, self(), collect_metrics),
@@ -70,7 +71,8 @@ init([Table]) ->
rates_mode = RatesMode,
old_aggr_stats = #{},
lookup_queue = fun queue_exists/1,
- lookup_exchange = fun exchange_exists/1}}.
+ lookup_exchange = fun exchange_exists/1,
+ filter_aggregated_queue_metrics_pattern = FilterPattern}}.
handle_call(reset_lookups, _From, State) ->
{reply, ok, State#state{lookup_queue = fun queue_exists/1,
@@ -463,19 +465,19 @@ aggregate_entry({Name, Ready, Unack, Msgs, Red}, NextStats, Ops0,
#state{table = queue_coarse_metrics,
old_aggr_stats = Old,
policies = {BPolicies, _, GPolicies},
- lookup_queue = QueueFun} = State) ->
+ lookup_queue = QueueFun,
+ filter_aggregated_queue_metrics_pattern = Pattern} = State) ->
Stats = ?vhost_msg_stats(Ready, Unack, Msgs),
Diff = get_difference(Name, Stats, State),
- Ops1 = insert_entry_ops(vhost_msg_stats, vhost(Name), true, Diff, Ops0,
- GPolicies),
+ Ops1 = maybe_insert_entry_ops(Name, Pattern, vhost_msg_stats, vhost(Name),
+ true, Diff, Ops0, GPolicies),
Ops2 = case QueueFun(Name) of
true ->
QPS =?queue_process_stats(Red),
O1 = insert_entry_ops(queue_process_stats, Name, false, QPS,
Ops1, BPolicies),
QMS = ?queue_msg_stats(Ready, Unack, Msgs),
- insert_entry_ops(queue_msg_stats, Name, false, QMS,
- O1, BPolicies);
+ insert_entry_ops(queue_msg_stats, Name, false, QMS, O1, BPolicies);
_ ->
Ops1
end,
@@ -583,6 +585,17 @@ insert_entry_op(Table, Key, Entry, Ops) ->
end, {insert_entry, Entry}, TableOps0),
maps:put(Table, TableOps, Ops).
+maybe_insert_entry_ops(Name, Pattern, Table, Id, Incr, Entry, Ops, Policies) ->
+ case needs_filtering_out(Name, Pattern) of
+ true -> Ops;
+ false -> insert_entry_ops(Table, Id, Incr, Entry, Ops, Policies)
+ end.
+
+needs_filtering_out(_, undefined) ->
+ false;
+needs_filtering_out(#resource{name = Name}, Pattern) ->
+ match == re:run(Name, Pattern, [{capture, none}]).
+
insert_entry_ops(Table, Id, Incr, Entry, Ops, Policies) ->
lists:foldl(fun({Size, Interval}, Acc) ->
Key = {Id, Size, Interval, Incr},
@@ -688,7 +701,8 @@ index_table(node_node_coarse_stats, node) -> node_node_coarse_stats_node_index.
load_config() ->
RatesMode = rabbit_mgmt_agent_config:get_env(rates_mode),
Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies, []),
- {RatesMode, Policies}.
+ FilterPattern = rabbit_mgmt_agent_config:get_env(filter_aggregated_queue_metrics_pattern),
+ {RatesMode, Policies, FilterPattern}.
ceil(X) when X < 0 ->
trunc(X);
diff --git a/deps/rabbitmq_prometheus/priv/schema/rabbitmq_prometheus.schema b/deps/rabbitmq_prometheus/priv/schema/rabbitmq_prometheus.schema
index a406f604fd..0ba11713f6 100644
--- a/deps/rabbitmq_prometheus/priv/schema/rabbitmq_prometheus.schema
+++ b/deps/rabbitmq_prometheus/priv/schema/rabbitmq_prometheus.schema
@@ -126,3 +126,5 @@ end}.
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.
{mapping, "prometheus.ssl.max_keepalive", "rabbitmq_prometheus.ssl_config.cowboy_opts.max_keepalive",
[{datatype, integer}, {validators, ["non_negative_integer"]}]}.
+
+{mapping, "prometheus.filter_aggregated_queue_metrics_pattern", "rabbitmq_prometheus.filter_aggregated_queue_metrics_pattern", [{datatype, string}]}. \ No newline at end of file
diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
index 8b46770ef8..328ec00276 100644
--- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
+++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
@@ -232,25 +232,25 @@ register() ->
deregister_cleanup(_) -> ok.
collect_mf('detailed', Callback) ->
- collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(), Callback),
+ collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), queues_filter_from_pdict(), enabled_mfs_from_pdict(), Callback),
%% identity is here to enable filtering on a cluster name (as already happens in existing dashboards)
emit_identity_info(Callback),
ok;
collect_mf('per-object', Callback) ->
- collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
+ collect(true, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback),
totals(Callback),
emit_identity_info(Callback),
ok;
collect_mf(_Registry, Callback) ->
PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false),
- collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
+ collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback),
totals(Callback),
emit_identity_info(Callback),
ok.
-collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) ->
+collect(PerObjectMetrics, Prefix, VHostsFilter, QueuesFilter, IncludedMFs, Callback) ->
[begin
- Data = get_data(Table, PerObjectMetrics, VHostsFilter),
+ Data = get_data(Table, PerObjectMetrics, VHostsFilter, QueuesFilter),
mf(Callback, Prefix, Contents, Data)
end || {Table, Contents} <- IncludedMFs, not mutually_exclusive_mf(PerObjectMetrics, Table, IncludedMFs)].
@@ -459,7 +459,7 @@ emit_gauge_metric_if_defined(Labels, Value) ->
gauge_metric(Labels, Value)
end.
-get_data(connection_metrics = Table, false, _) ->
+get_data(connection_metrics = Table, false, _, _) ->
{Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) ->
{T,
sum(proplists:get_value(recv_cnt, Props), A1),
@@ -468,7 +468,7 @@ get_data(connection_metrics = Table, false, _) ->
sum(proplists:get_value(channels, Props), A4)}
end, empty(Table), Table),
[{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}];
-get_data(channel_metrics = Table, false, _) ->
+get_data(channel_metrics = Table, false, _, _) ->
{Table, A1, A2, A3, A4, A5, A6, A7} =
ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
{T,
@@ -483,42 +483,42 @@ get_data(channel_metrics = Table, false, _) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
-get_data(queue_consumer_count = MF, false, VHostsFilter) ->
+get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) ->
Table = queue_metrics, %% Real table name
{_, A1} = ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
+ ({#resource{kind = queue, name = Name}, Props, _}, {T, A1} = Acc)
+ when is_list(QueuesFilter) ->
+ case re:run(Name, QueuesFilter, [{capture, none}]) of
+ match ->
+ Acc;
+ nomatch ->
+ {T,
+ sum(proplists:get_value(consumers, Props), A1)
+ }
+ end;
({_, Props, _}, {T, A1}) ->
{T,
sum(proplists:get_value(consumers, Props), A1)
}
end, empty(MF), Table),
[{Table, [{consumers, A1}]}];
-get_data(queue_metrics = Table, false, VHostsFilter) ->
+get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) ->
{Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} =
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
- ({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10,
- A11, A12, A13, A14, A15, A16}) ->
- {T,
- sum(proplists:get_value(consumers, Props), A1),
- sum(proplists:get_value(consumer_utilisation, Props), A2),
- sum(proplists:get_value(memory, Props), A3),
- sum(proplists:get_value(messages_ram, Props), A4),
- sum(proplists:get_value(message_bytes_ram, Props), A5),
- sum(proplists:get_value(messages_ready_ram, Props), A6),
- sum(proplists:get_value(messages_unacknowledged_ram, Props), A7),
- sum(proplists:get_value(messages_persistent, Props), A8),
- sum(proplists:get_value(message_bytes_persistent, Props), A9),
- sum(proplists:get_value(message_bytes, Props), A10),
- sum(proplists:get_value(message_bytes_ready, Props), A11),
- sum(proplists:get_value(message_bytes_unacknowledged, Props), A12),
- sum(proplists:get_value(messages_paged_out, Props), A13),
- sum(proplists:get_value(message_bytes_paged_out, Props), A14),
- sum(proplists:get_value(disk_reads, Props), A15),
- sum(proplists:get_value(disk_writes, Props), A16)
- }
+ ({#resource{kind = queue, name = Name}, Props, _}, Acc)
+ when is_list(QueuesFilter) ->
+ case re:run(Name, QueuesFilter, [{capture, none}]) of
+ match ->
+ Acc;
+ nomatch ->
+ sum_queue_metrics(Props, Acc)
+ end;
+ ({_, Props, _}, Acc) ->
+ sum_queue_metrics(Props, Acc)
end, empty(Table), Table),
[{Table, [{consumers, A1}, {consumer_utilisation, A2}, {memory, A3}, {messages_ram, A4},
{message_bytes_ram, A5}, {messages_ready_ram, A6},
@@ -527,7 +527,7 @@ get_data(queue_metrics = Table, false, VHostsFilter) ->
{message_bytes_ready, A11}, {message_bytes_unacknowledged, A12},
{messages_paged_out, A13}, {message_bytes_paged_out, A14},
{disk_reads, A15}, {disk_writes, A16}]}];
-get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
+get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
@@ -538,6 +538,14 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
%% For queue_coarse_metrics
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
Acc;
+ ({#resource{kind = queue, name = Name}, V1, V2, V3, V4}, {T, A1, A2, A3, A4} = Acc)
+ when is_list(QueuesFilter) ->
+ case re:run(Name, QueuesFilter, [{capture, none}]) of
+ match ->
+ Acc;
+ nomatch ->
+ {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4}
+ end;
({_, V1}, {T, A1}) ->
{T, V1 + A1};
({_, V1, _}, {T, A1}) ->
@@ -564,14 +572,14 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
_ ->
[Result]
end;
-get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
+get_data(queue_coarse_metrics = Table, true, VHostsFilter, _) when is_map(VHostsFilter) ->
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
[Row|Acc];
(_, Acc) ->
Acc
end, [], Table);
-get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
+get_data(MF, true, VHostsFilter, _) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
Table = queue_metrics,
ets:foldl(fun
({#resource{kind = queue, virtual_host = VHost}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
@@ -579,11 +587,33 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics
(_, Acc) ->
Acc
end, [], Table);
-get_data(queue_consumer_count, true, _) ->
+get_data(queue_consumer_count, true, _, _) ->
ets:tab2list(queue_metrics);
-get_data(Table, _, _) ->
+get_data(Table, _, _, _) ->
ets:tab2list(Table).
+
+sum_queue_metrics(Props, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11,
+ A12, A13, A14, A15, A16}) ->
+ {T,
+ sum(proplists:get_value(consumers, Props), A1),
+ sum(proplists:get_value(consumer_utilisation, Props), A2),
+ sum(proplists:get_value(memory, Props), A3),
+ sum(proplists:get_value(messages_ram, Props), A4),
+ sum(proplists:get_value(message_bytes_ram, Props), A5),
+ sum(proplists:get_value(messages_ready_ram, Props), A6),
+ sum(proplists:get_value(messages_unacknowledged_ram, Props), A7),
+ sum(proplists:get_value(messages_persistent, Props), A8),
+ sum(proplists:get_value(message_bytes_persistent, Props), A9),
+ sum(proplists:get_value(message_bytes, Props), A10),
+ sum(proplists:get_value(message_bytes_ready, Props), A11),
+ sum(proplists:get_value(message_bytes_unacknowledged, Props), A12),
+ sum(proplists:get_value(messages_paged_out, Props), A13),
+ sum(proplists:get_value(message_bytes_paged_out, Props), A14),
+ sum(proplists:get_value(disk_reads, Props), A15),
+ sum(proplists:get_value(disk_writes, Props), A16)
+ }.
+
division(0, 0) ->
0;
division(A, B) ->
@@ -631,3 +661,11 @@ vhosts_filter_from_pdict() ->
Enabled = maps:from_list([ {VHost, true} || VHost <- L ]),
maps:merge(All, Enabled)
end.
+
+queues_filter_from_pdict() ->
+ case get(prometheus_queue_filter) of
+ undefined ->
+ false;
+ Pattern ->
+ Pattern
+ end.
diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
index 954e2f878d..f7f4f11720 100644
--- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
+++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
@@ -163,6 +163,12 @@ put_filtering_options_into_process_dictionary(Request) ->
put(prometheus_mf_filter, Fs);
_ -> ok
end,
+ case application:get_env(rabbitmq_prometheus, filter_aggregated_queue_metrics_pattern, undefined) of
+ undefined -> ok;
+ Pattern ->
+ {ok, CompiledPattern} = re:compile(Pattern),
+ put(prometheus_queue_filter, CompiledPattern)
+ end,
ok.
parse_vhosts(N) when is_binary(N) ->