diff options
author | Michael Klishin <klishinm@vmware.com> | 2021-11-23 15:01:40 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-11-23 15:01:40 +0300 |
commit | 0f0da778d0c47e75a5aa6cc5adb6a2f1a2ffcc4e (patch) | |
tree | e6302636b9bb2e81d702204f5f47b35648368799 | |
parent | 2483711505622d6e2393f4f967daac775d0e27cb (diff) | |
parent | bd2858c2086fb81c169d29b129b0e0b27e7a8504 (diff) | |
download | rabbitmq-server-git-0f0da778d0c47e75a5aa6cc5adb6a2f1a2ffcc4e.tar.gz |
Merge pull request #3708 from rabbitmq/filter-out-queues-from-global-stats
Use a pattern to filter out some queues from global stats
6 files changed, 105 insertions, 44 deletions
diff --git a/deps/rabbit/src/rabbit_channel.erl b/deps/rabbit/src/rabbit_channel.erl index 2d5437421d..4d85070001 100644 --- a/deps/rabbit/src/rabbit_channel.erl +++ b/deps/rabbit/src/rabbit_channel.erl @@ -2166,7 +2166,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ex Qs = rabbit_amqqueue:lookup(AllNames), case rabbit_queue_type:deliver(Qs, Delivery, QueueStates0) of {ok, QueueStates, Actions} -> - rabbit_global_counters:messages_routed(amqp091, length(Qs)), + rabbit_global_counters:messages_routed(amqp091, 1), %% NB: the order here is important since basic.returns must be %% sent before confirms. ok = process_routing_mandatory(Mandatory, Qs, Message, State0), diff --git a/deps/rabbitmq_management_agent/priv/schema/rabbitmq_management_agent.schema b/deps/rabbitmq_management_agent/priv/schema/rabbitmq_management_agent.schema index fa8a76725a..3d66d9aaee 100644 --- a/deps/rabbitmq_management_agent/priv/schema/rabbitmq_management_agent.schema +++ b/deps/rabbitmq_management_agent/priv/schema/rabbitmq_management_agent.schema @@ -2,3 +2,4 @@ %% Also the management application will refuse to start if metrics collection is disabled {mapping, "management_agent.disable_metrics_collector", "rabbitmq_management_agent.disable_metrics_collector", [{datatype, {enum, [true, false]}}]}. +{mapping, "management_agent.filter_aggregated_queue_metrics_pattern", "rabbitmq_management_agent.filter_aggregated_queue_metrics_pattern", [{datatype, string}]}.
\ No newline at end of file diff --git a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl index 3df72d9233..9ab07fd808 100644 --- a/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl +++ b/deps/rabbitmq_management_agent/src/rabbit_mgmt_metrics_collector.erl @@ -26,7 +26,8 @@ -import(rabbit_mgmt_data, [lookup_element/3]). -record(state, {table, interval, policies, rates_mode, lookup_queue, - lookup_exchange, old_aggr_stats}). + lookup_exchange, old_aggr_stats, + filter_aggregated_queue_metrics_pattern}). %% Data is stored in ETS tables: %% * One ETS table per metric (queue_stats, channel_stats_deliver_stats...) @@ -59,7 +60,7 @@ reset_lookups(Table) -> gen_server:call(name(Table), reset_lookups, infinity). init([Table]) -> - {RatesMode, Policies} = load_config(), + {RatesMode, Policies, FilterPattern} = load_config(), Policy = retention_policy(Table), Interval = take_smaller(proplists:get_value(Policy, Policies, [])) * 1000, erlang:send_after(Interval, self(), collect_metrics), @@ -70,7 +71,8 @@ init([Table]) -> rates_mode = RatesMode, old_aggr_stats = #{}, lookup_queue = fun queue_exists/1, - lookup_exchange = fun exchange_exists/1}}. + lookup_exchange = fun exchange_exists/1, + filter_aggregated_queue_metrics_pattern = FilterPattern}}. handle_call(reset_lookups, _From, State) -> {reply, ok, State#state{lookup_queue = fun queue_exists/1, @@ -463,19 +465,19 @@ aggregate_entry({Name, Ready, Unack, Msgs, Red}, NextStats, Ops0, #state{table = queue_coarse_metrics, old_aggr_stats = Old, policies = {BPolicies, _, GPolicies}, - lookup_queue = QueueFun} = State) -> + lookup_queue = QueueFun, + filter_aggregated_queue_metrics_pattern = Pattern} = State) -> Stats = ?vhost_msg_stats(Ready, Unack, Msgs), Diff = get_difference(Name, Stats, State), - Ops1 = insert_entry_ops(vhost_msg_stats, vhost(Name), true, Diff, Ops0, - GPolicies), + Ops1 = maybe_insert_entry_ops(Name, Pattern, vhost_msg_stats, vhost(Name), + true, Diff, Ops0, GPolicies), Ops2 = case QueueFun(Name) of true -> QPS =?queue_process_stats(Red), O1 = insert_entry_ops(queue_process_stats, Name, false, QPS, Ops1, BPolicies), QMS = ?queue_msg_stats(Ready, Unack, Msgs), - insert_entry_ops(queue_msg_stats, Name, false, QMS, - O1, BPolicies); + insert_entry_ops(queue_msg_stats, Name, false, QMS, O1, BPolicies); _ -> Ops1 end, @@ -583,6 +585,17 @@ insert_entry_op(Table, Key, Entry, Ops) -> end, {insert_entry, Entry}, TableOps0), maps:put(Table, TableOps, Ops). +maybe_insert_entry_ops(Name, Pattern, Table, Id, Incr, Entry, Ops, Policies) -> + case needs_filtering_out(Name, Pattern) of + true -> Ops; + false -> insert_entry_ops(Table, Id, Incr, Entry, Ops, Policies) + end. + +needs_filtering_out(_, undefined) -> + false; +needs_filtering_out(#resource{name = Name}, Pattern) -> + match == re:run(Name, Pattern, [{capture, none}]). + insert_entry_ops(Table, Id, Incr, Entry, Ops, Policies) -> lists:foldl(fun({Size, Interval}, Acc) -> Key = {Id, Size, Interval, Incr}, @@ -688,7 +701,8 @@ index_table(node_node_coarse_stats, node) -> node_node_coarse_stats_node_index. load_config() -> RatesMode = rabbit_mgmt_agent_config:get_env(rates_mode), Policies = rabbit_mgmt_agent_config:get_env(sample_retention_policies, []), - {RatesMode, Policies}. + FilterPattern = rabbit_mgmt_agent_config:get_env(filter_aggregated_queue_metrics_pattern), + {RatesMode, Policies, FilterPattern}. ceil(X) when X < 0 -> trunc(X); diff --git a/deps/rabbitmq_prometheus/priv/schema/rabbitmq_prometheus.schema b/deps/rabbitmq_prometheus/priv/schema/rabbitmq_prometheus.schema index a406f604fd..0ba11713f6 100644 --- a/deps/rabbitmq_prometheus/priv/schema/rabbitmq_prometheus.schema +++ b/deps/rabbitmq_prometheus/priv/schema/rabbitmq_prometheus.schema @@ -126,3 +126,5 @@ end}. [{datatype, integer}, {validators, ["non_negative_integer"]}]}. {mapping, "prometheus.ssl.max_keepalive", "rabbitmq_prometheus.ssl_config.cowboy_opts.max_keepalive", [{datatype, integer}, {validators, ["non_negative_integer"]}]}. + +{mapping, "prometheus.filter_aggregated_queue_metrics_pattern", "rabbitmq_prometheus.filter_aggregated_queue_metrics_pattern", [{datatype, string}]}.
\ No newline at end of file diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index 8b46770ef8..328ec00276 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -232,25 +232,25 @@ register() -> deregister_cleanup(_) -> ok. collect_mf('detailed', Callback) -> - collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(), Callback), + collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), queues_filter_from_pdict(), enabled_mfs_from_pdict(), Callback), %% identity is here to enable filtering on a cluster name (as already happens in existing dashboards) emit_identity_info(Callback), ok; collect_mf('per-object', Callback) -> - collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), + collect(true, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback), totals(Callback), emit_identity_info(Callback), ok; collect_mf(_Registry, Callback) -> PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false), - collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), + collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, queues_filter_from_pdict(), ?METRICS_RAW, Callback), totals(Callback), emit_identity_info(Callback), ok. -collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) -> +collect(PerObjectMetrics, Prefix, VHostsFilter, QueuesFilter, IncludedMFs, Callback) -> [begin - Data = get_data(Table, PerObjectMetrics, VHostsFilter), + Data = get_data(Table, PerObjectMetrics, VHostsFilter, QueuesFilter), mf(Callback, Prefix, Contents, Data) end || {Table, Contents} <- IncludedMFs, not mutually_exclusive_mf(PerObjectMetrics, Table, IncludedMFs)]. @@ -459,7 +459,7 @@ emit_gauge_metric_if_defined(Labels, Value) -> gauge_metric(Labels, Value) end. -get_data(connection_metrics = Table, false, _) -> +get_data(connection_metrics = Table, false, _, _) -> {Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) -> {T, sum(proplists:get_value(recv_cnt, Props), A1), @@ -468,7 +468,7 @@ get_data(connection_metrics = Table, false, _) -> sum(proplists:get_value(channels, Props), A4)} end, empty(Table), Table), [{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}]; -get_data(channel_metrics = Table, false, _) -> +get_data(channel_metrics = Table, false, _, _) -> {Table, A1, A2, A3, A4, A5, A6, A7} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) -> {T, @@ -483,42 +483,42 @@ get_data(channel_metrics = Table, false, _) -> [{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3}, {messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6}, {global_prefetch_count, A7}]}]; -get_data(queue_consumer_count = MF, false, VHostsFilter) -> +get_data(queue_consumer_count = MF, false, VHostsFilter, QueuesFilter) -> Table = queue_metrics, %% Real table name {_, A1} = ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; + ({#resource{kind = queue, name = Name}, Props, _}, {T, A1} = Acc) + when is_list(QueuesFilter) -> + case re:run(Name, QueuesFilter, [{capture, none}]) of + match -> + Acc; + nomatch -> + {T, + sum(proplists:get_value(consumers, Props), A1) + } + end; ({_, Props, _}, {T, A1}) -> {T, sum(proplists:get_value(consumers, Props), A1) } end, empty(MF), Table), [{Table, [{consumers, A1}]}]; -get_data(queue_metrics = Table, false, VHostsFilter) -> +get_data(queue_metrics = Table, false, VHostsFilter, QueuesFilter) -> {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} = ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; - ({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, - A11, A12, A13, A14, A15, A16}) -> - {T, - sum(proplists:get_value(consumers, Props), A1), - sum(proplists:get_value(consumer_utilisation, Props), A2), - sum(proplists:get_value(memory, Props), A3), - sum(proplists:get_value(messages_ram, Props), A4), - sum(proplists:get_value(message_bytes_ram, Props), A5), - sum(proplists:get_value(messages_ready_ram, Props), A6), - sum(proplists:get_value(messages_unacknowledged_ram, Props), A7), - sum(proplists:get_value(messages_persistent, Props), A8), - sum(proplists:get_value(message_bytes_persistent, Props), A9), - sum(proplists:get_value(message_bytes, Props), A10), - sum(proplists:get_value(message_bytes_ready, Props), A11), - sum(proplists:get_value(message_bytes_unacknowledged, Props), A12), - sum(proplists:get_value(messages_paged_out, Props), A13), - sum(proplists:get_value(message_bytes_paged_out, Props), A14), - sum(proplists:get_value(disk_reads, Props), A15), - sum(proplists:get_value(disk_writes, Props), A16) - } + ({#resource{kind = queue, name = Name}, Props, _}, Acc) + when is_list(QueuesFilter) -> + case re:run(Name, QueuesFilter, [{capture, none}]) of + match -> + Acc; + nomatch -> + sum_queue_metrics(Props, Acc) + end; + ({_, Props, _}, Acc) -> + sum_queue_metrics(Props, Acc) end, empty(Table), Table), [{Table, [{consumers, A1}, {consumer_utilisation, A2}, {memory, A3}, {messages_ram, A4}, {message_bytes_ram, A5}, {messages_ready_ram, A6}, @@ -527,7 +527,7 @@ get_data(queue_metrics = Table, false, VHostsFilter) -> {message_bytes_ready, A11}, {message_bytes_unacknowledged, A12}, {messages_paged_out, A13}, {message_bytes_paged_out, A14}, {disk_reads, A15}, {disk_writes, A16}]}]; -get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; +get_data(Table, false, VHostsFilter, QueuesFilter) when Table == channel_exchange_metrics; Table == queue_coarse_metrics; Table == channel_queue_metrics; Table == connection_coarse_metrics; @@ -538,6 +538,14 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; %% For queue_coarse_metrics ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> Acc; + ({#resource{kind = queue, name = Name}, V1, V2, V3, V4}, {T, A1, A2, A3, A4} = Acc) + when is_list(QueuesFilter) -> + case re:run(Name, QueuesFilter, [{capture, none}]) of + match -> + Acc; + nomatch -> + {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4} + end; ({_, V1}, {T, A1}) -> {T, V1 + A1}; ({_, V1, _}, {T, A1}) -> @@ -564,14 +572,14 @@ get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; _ -> [Result] end; -get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> +get_data(queue_coarse_metrics = Table, true, VHostsFilter, _) when is_map(VHostsFilter) -> ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> [Row|Acc]; (_, Acc) -> Acc end, [], Table); -get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count -> +get_data(MF, true, VHostsFilter, _) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count -> Table = queue_metrics, ets:foldl(fun ({#resource{kind = queue, virtual_host = VHost}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> @@ -579,11 +587,33 @@ get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics (_, Acc) -> Acc end, [], Table); -get_data(queue_consumer_count, true, _) -> +get_data(queue_consumer_count, true, _, _) -> ets:tab2list(queue_metrics); -get_data(Table, _, _) -> +get_data(Table, _, _, _) -> ets:tab2list(Table). + +sum_queue_metrics(Props, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, + A12, A13, A14, A15, A16}) -> + {T, + sum(proplists:get_value(consumers, Props), A1), + sum(proplists:get_value(consumer_utilisation, Props), A2), + sum(proplists:get_value(memory, Props), A3), + sum(proplists:get_value(messages_ram, Props), A4), + sum(proplists:get_value(message_bytes_ram, Props), A5), + sum(proplists:get_value(messages_ready_ram, Props), A6), + sum(proplists:get_value(messages_unacknowledged_ram, Props), A7), + sum(proplists:get_value(messages_persistent, Props), A8), + sum(proplists:get_value(message_bytes_persistent, Props), A9), + sum(proplists:get_value(message_bytes, Props), A10), + sum(proplists:get_value(message_bytes_ready, Props), A11), + sum(proplists:get_value(message_bytes_unacknowledged, Props), A12), + sum(proplists:get_value(messages_paged_out, Props), A13), + sum(proplists:get_value(message_bytes_paged_out, Props), A14), + sum(proplists:get_value(disk_reads, Props), A15), + sum(proplists:get_value(disk_writes, Props), A16) + }. + division(0, 0) -> 0; division(A, B) -> @@ -631,3 +661,11 @@ vhosts_filter_from_pdict() -> Enabled = maps:from_list([ {VHost, true} || VHost <- L ]), maps:merge(All, Enabled) end. + +queues_filter_from_pdict() -> + case get(prometheus_queue_filter) of + undefined -> + false; + Pattern -> + Pattern + end. diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl index 954e2f878d..f7f4f11720 100644 --- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl +++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl @@ -163,6 +163,12 @@ put_filtering_options_into_process_dictionary(Request) -> put(prometheus_mf_filter, Fs); _ -> ok end, + case application:get_env(rabbitmq_prometheus, filter_aggregated_queue_metrics_pattern, undefined) of + undefined -> ok; + Pattern -> + {ok, CompiledPattern} = re:compile(Pattern), + put(prometheus_queue_filter, CompiledPattern) + end, ok. parse_vhosts(N) when is_binary(N) -> |