summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
diff options
context:
space:
mode:
Diffstat (limited to 'deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl')
-rw-r--r--deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl531
1 files changed, 531 insertions, 0 deletions
diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
new file mode 100644
index 0000000000..255260627a
--- /dev/null
+++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
@@ -0,0 +1,531 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(prometheus_rabbitmq_core_metrics_collector).
+-export([register/0,
+ deregister_cleanup/1,
+ collect_mf/2,
+ collect_metrics/2]).
+
+-import(prometheus_model_helpers, [create_mf/4,
+ create_mf/5,
+ gauge_metric/2,
+ counter_metric/2,
+ untyped_metric/2]).
+
+-include_lib("prometheus/include/prometheus.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-behaviour(prometheus_collector).
+
+%% Because all metrics are from RabbitMQ's perspective,
+%% cached for up to 5 seconds by default (configurable),
+%% we prepend rabbitmq_ to all metrics emitted by this collector.
+%% Some metrics are for Erlang (erlang_), Mnesia (schema_db_) or the System (io_),
+%% as observed by RabbitMQ.
+-define(METRIC_NAME_PREFIX, "rabbitmq_").
+
+%% ==The source of these metrics can be found in the rabbit_core_metrics module==
+%% The relevant files are:
+%% * rabbit_common/src/rabbit_core_metrics.erl
+%% * rabbit_common/include/rabbit_core_metrics.hrl
+%%
+%% ==How to determine if a metric should be of type GAUGE or COUNTER?==
+%%
+%% * GAUGE if you care about its value rather than rate of change
+%% - value can decrease as well as decrease
+%% * COUNTER if you care about the rate of change
+%% - value can only increase
+%%
+%% To put it differently, if the metric is used with rate(), it's a COUNTER, otherwise it's a GAUGE.
+%%
+%% More info: https://prometheus.io/docs/practices/instrumentation/#counter-vs-gauge-summary-vs-histogram
+
+% Some metrics require to be converted, mostly those that represent time.
+% It is a Prometheus best practice to use specific base units: https://prometheus.io/docs/practices/naming/#base-units
+% Extra context: https://github.com/prometheus/docs/pull/1414#issuecomment-522337895
+
+-define(MILLISECOND, 1000).
+-define(MICROSECOND, 1000000).
+
+-define(METRICS_RAW, [
+ {channel_metrics, [
+ {2, undefined, channel_consumers, gauge, "Consumers on a channel", consumer_count},
+ {2, undefined, channel_messages_unacked, gauge, "Delivered but not yet acknowledged messages", messages_unacknowledged},
+ {2, undefined, channel_messages_unconfirmed, gauge, "Published but not yet confirmed messages", messages_unconfirmed},
+ {2, undefined, channel_messages_uncommitted, gauge, "Messages received in a transaction but not yet committed", messages_uncommitted},
+ {2, undefined, channel_acks_uncommitted, gauge, "Message acknowledgements in a transaction not yet committed", acks_uncommitted},
+ {2, undefined, consumer_prefetch, gauge, "Limit of unacknowledged messages for each consumer", prefetch_count},
+ {2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count}
+ ]},
+
+ {channel_exchange_metrics, [
+ {2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"},
+ {3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"},
+ {4, undefined, channel_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"},
+ {5, undefined, channel_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"}
+ ]},
+
+ {channel_process_metrics, [
+ {2, undefined, channel_process_reductions_total, counter, "Total number of channel process reductions"}
+ ]},
+
+ {channel_queue_metrics, [
+ {2, undefined, channel_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"},
+ {3, undefined, channel_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"},
+ {4, undefined, channel_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"},
+ {5, undefined, channel_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"},
+ {6, undefined, channel_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"},
+ {7, undefined, channel_messages_acked_total, counter, "Total number of messages acknowledged by consumers"},
+ {8, undefined, channel_get_empty_total, counter, "Total number of times basic.get operations fetched no message"}
+ ]},
+
+ {connection_churn_metrics, [
+ {2, undefined, connections_opened_total, counter, "Total number of connections opened"},
+ {3, undefined, connections_closed_total, counter, "Total number of connections closed or terminated"},
+ {4, undefined, channels_opened_total, counter, "Total number of channels opened"},
+ {5, undefined, channels_closed_total, counter, "Total number of channels closed"},
+ {6, undefined, queues_declared_total, counter, "Total number of queues declared"},
+ {7, undefined, queues_created_total, counter, "Total number of queues created"},
+ {8, undefined, queues_deleted_total, counter, "Total number of queues deleted"}
+ ]},
+
+ {connection_coarse_metrics, [
+ {2, undefined, connection_incoming_bytes_total, counter, "Total number of bytes received on a connection"},
+ {3, undefined, connection_outgoing_bytes_total, counter, "Total number of bytes sent on a connection"},
+ {4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
+ ]},
+
+ {connection_metrics, [
+ {2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
+ {2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
+ {2, undefined, connection_pending_packets, gauge, "Number of packets waiting to be sent on a connection", send_pend},
+ {2, undefined, connection_channels, gauge, "Channels on a connection", channels}
+ ]},
+
+ {channel_queue_exchange_metrics, [
+ {2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"}
+ ]},
+
+ {node_coarse_metrics, [
+ {2, undefined, process_open_fds, gauge, "Open file descriptors", fd_used},
+ {2, undefined, process_open_tcp_sockets, gauge, "Open TCP sockets", sockets_used},
+ {2, undefined, process_resident_memory_bytes, gauge, "Memory used in bytes", mem_used},
+ {2, undefined, disk_space_available_bytes, gauge, "Disk space available in bytes", disk_free},
+ {2, undefined, erlang_processes_used, gauge, "Erlang processes used", proc_used},
+ {2, undefined, erlang_gc_runs_total, counter, "Total number of Erlang garbage collector runs", gc_num},
+ {2, undefined, erlang_gc_reclaimed_bytes_total, counter, "Total number of bytes of memory reclaimed by Erlang garbage collector", gc_bytes_reclaimed},
+ {2, undefined, erlang_scheduler_context_switches_total, counter, "Total number of Erlang scheduler context switches", context_switches}
+ ]},
+
+ {node_metrics, [
+ {2, undefined, process_max_fds, gauge, "Open file descriptors limit", fd_total},
+ {2, undefined, process_max_tcp_sockets, gauge, "Open TCP sockets limit", sockets_total},
+ {2, undefined, resident_memory_limit_bytes, gauge, "Memory high watermark in bytes", mem_limit},
+ {2, undefined, disk_space_available_limit_bytes, gauge, "Free disk space low watermark in bytes", disk_free_limit},
+ {2, undefined, erlang_processes_limit, gauge, "Erlang processes limit", proc_total},
+ {2, undefined, erlang_scheduler_run_queue, gauge, "Erlang scheduler run queue", run_queue},
+ {2, undefined, erlang_net_ticktime_seconds, gauge, "Inter-node heartbeat interval", net_ticktime},
+ {2, ?MILLISECOND, erlang_uptime_seconds, gauge, "Node uptime", uptime}
+ ]},
+
+ {node_persister_metrics, [
+ {2, undefined, io_read_ops_total, counter, "Total number of I/O read operations", io_read_count},
+ {2, undefined, io_read_bytes_total, counter, "Total number of I/O bytes read", io_read_bytes},
+ {2, undefined, io_write_ops_total, counter, "Total number of I/O write operations", io_write_count},
+ {2, undefined, io_write_bytes_total, counter, "Total number of I/O bytes written", io_write_bytes},
+ {2, undefined, io_sync_ops_total, counter, "Total number of I/O sync operations", io_sync_count},
+ {2, undefined, io_seek_ops_total, counter, "Total number of I/O seek operations", io_seek_count},
+ {2, undefined, io_open_attempt_ops_total, counter, "Total number of file open attempts", io_file_handle_open_attempt_count},
+ {2, undefined, io_reopen_ops_total, counter, "Total number of times files have been reopened", io_reopen_count},
+ {2, undefined, schema_db_ram_tx_total, counter, "Total number of Schema DB memory transactions", mnesia_ram_tx_count},
+ {2, undefined, schema_db_disk_tx_total, counter, "Total number of Schema DB disk transactions", mnesia_disk_tx_count},
+ {2, undefined, msg_store_read_total, counter, "Total number of Message Store read operations", msg_store_read_count},
+ {2, undefined, msg_store_write_total, counter, "Total number of Message Store write operations", msg_store_write_count},
+ {2, undefined, queue_index_read_ops_total, counter, "Total number of Queue Index read operations", queue_index_read_count},
+ {2, undefined, queue_index_write_ops_total, counter, "Total number of Queue Index write operations", queue_index_write_count},
+ {2, undefined, queue_index_journal_write_ops_total, counter, "Total number of Queue Index Journal write operations", queue_index_journal_write_count},
+ {2, ?MICROSECOND, io_read_time_seconds_total, counter, "Total I/O read time", io_read_time},
+ {2, ?MICROSECOND, io_write_time_seconds_total, counter, "Total I/O write time", io_write_time},
+ {2, ?MICROSECOND, io_sync_time_seconds_total, counter, "Total I/O sync time", io_sync_time},
+ {2, ?MICROSECOND, io_seek_time_seconds_total, counter, "Total I/O seek time", io_seek_time},
+ {2, ?MICROSECOND, io_open_attempt_time_seconds_total, counter, "Total file open attempts time", io_file_handle_open_attempt_time}
+ ]},
+
+ {ra_metrics, [
+ {2, undefined, raft_term_total, counter, "Current Raft term number"},
+ {3, undefined, raft_log_snapshot_index, gauge, "Raft log snapshot index"},
+ {4, undefined, raft_log_last_applied_index, gauge, "Raft log last applied index"},
+ {5, undefined, raft_log_commit_index, gauge, "Raft log commit index"},
+ {6, undefined, raft_log_last_written_index, gauge, "Raft log last written index"},
+ {7, ?MILLISECOND, raft_entry_commit_latency_seconds, gauge, "Time taken for a log entry to be committed"}
+ ]},
+
+ {queue_coarse_metrics, [
+ {2, undefined, queue_messages_ready, gauge, "Messages ready to be delivered to consumers"},
+ {3, undefined, queue_messages_unacked, gauge, "Messages delivered to consumers but not yet acknowledged"},
+ {4, undefined, queue_messages, gauge, "Sum of ready and unacknowledged messages - total queue depth"},
+ {5, undefined, queue_process_reductions_total, counter, "Total number of queue process reductions"}
+ ]},
+
+ {queue_metrics, [
+ {2, undefined, queue_consumers, gauge, "Consumers on a queue", consumers},
+ {2, undefined, queue_consumer_utilisation, gauge, "Consumer utilisation", consumer_utilisation},
+ {2, undefined, queue_process_memory_bytes, gauge, "Memory in bytes used by the Erlang queue process", memory},
+ {2, undefined, queue_messages_ram, gauge, "Ready and unacknowledged messages stored in memory", messages_ram},
+ {2, undefined, queue_messages_ram_bytes, gauge, "Size of ready and unacknowledged messages stored in memory", message_bytes_ram},
+ {2, undefined, queue_messages_ready_ram, gauge, "Ready messages stored in memory", messages_ready_ram},
+ {2, undefined, queue_messages_unacked_ram, gauge, "Unacknowledged messages stored in memory", messages_unacknowledged_ram},
+ {2, undefined, queue_messages_persistent, gauge, "Persistent messages", messages_persistent},
+ {2, undefined, queue_messages_persistent_bytes, gauge, "Size in bytes of persistent messages", message_bytes_persistent},
+ {2, undefined, queue_messages_bytes, gauge, "Size in bytes of ready and unacknowledged messages", message_bytes},
+ {2, undefined, queue_messages_ready_bytes, gauge, "Size in bytes of ready messages", message_bytes_ready},
+ {2, undefined, queue_messages_unacked_bytes, gauge, "Size in bytes of all unacknowledged messages", message_bytes_unacknowledged},
+ {2, undefined, queue_messages_paged_out, gauge, "Messages paged out to disk", messages_paged_out},
+ {2, undefined, queue_messages_paged_out_bytes, gauge, "Size in bytes of messages paged out to disk", message_bytes_paged_out},
+ {2, undefined, queue_disk_reads_total, counter, "Total number of times queue read messages from disk", disk_reads},
+ {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}
+ ]},
+
+ {auth_attempt_metrics, [
+ {2, undefined, auth_attempts_total, counter, "Total number of authorization attempts on a node"},
+ {3, undefined, auth_attempts_succeeded_total, counter, "Total number of successful authorization attempts on a node"},
+ {4, undefined, auth_attempts_failed_total, counter, "Total number of failed authorization attempts on a node"}
+ ]},
+
+ {auth_attempt_detailed_metrics, [
+ {2, undefined, auth_attempts_total, counter, "Total number of authorization attempts on a node"},
+ {3, undefined, auth_attempts_succeeded_total, counter, "Total number of successful authorization attempts on a node"},
+ {4, undefined, auth_attempts_failed_total, counter, "Total number of failed authorization attempts on a node"}
+ ]}
+
+]).
+
+-define(TOTALS, [
+ %% ordering differs from metrics above, refer to list comprehension
+ {connection_created, connections, gauge, "Connections currently open"},
+ {channel_created, channels, gauge, "Channels currently open"},
+ {consumer_created, consumers, gauge, "Consumers currently connected"},
+ {queue_metrics, queues, gauge, "Queues available"}
+]).
+
+%%====================================================================
+%% Collector API
+%%====================================================================
+
+register() ->
+ ok = prometheus_registry:register_collector(?MODULE).
+
+deregister_cleanup(_) -> ok.
+
+collect_mf(_Registry, Callback) ->
+ {ok, PerObjectMetrics} = application:get_env(rabbitmq_prometheus, return_per_object_metrics),
+ [begin
+ Data = get_data(Table, PerObjectMetrics),
+ mf(Callback, Contents, Data)
+ end || {Table, Contents} <- ?METRICS_RAW, needs_processing(PerObjectMetrics, Table)],
+ [begin
+ Size = ets:info(Table, size),
+ mf_totals(Callback, Name, Type, Help, Size)
+ end || {Table, Name, Type, Help} <- ?TOTALS],
+ add_metric_family(build_info(), Callback),
+ add_metric_family(identity_info(), Callback),
+ ok.
+
+needs_processing(false, auth_attempt_detailed_metrics) ->
+ %% When per object metrics are disabled the detailed authentication attempt metrics
+ %% create duplicates. Totals are carried on `auth_attempt_metrics`
+ false;
+needs_processing(_, _) ->
+ true.
+
+build_info() ->
+ ProductInfo = rabbit:product_info(),
+ #{product_base_version := BaseVersion} = ProductInfo,
+ {ok, PrometheusPluginVersion} = application:get_key(rabbitmq_prometheus, vsn),
+ {ok, PrometheusClientVersion} = application:get_key(prometheus, vsn),
+ Properties0 = [
+ {rabbitmq_version, BaseVersion},
+ {prometheus_plugin_version, PrometheusPluginVersion},
+ {prometheus_client_version, PrometheusClientVersion},
+ {erlang_version, rabbit_misc:otp_release()}
+ ],
+ Properties1 = case ProductInfo of
+ #{product_version := ProductVersion} ->
+ [{product_version, ProductVersion} | Properties0];
+ _ ->
+ Properties0
+ end,
+ Properties = case ProductInfo of
+ #{product_name := ProductName} ->
+ [{product_name, ProductName} | Properties1];
+ _ ->
+ Properties1
+ end,
+ {
+ build_info,
+ untyped,
+ "RabbitMQ & Erlang/OTP version info",
+ [{
+ Properties,
+ 1
+ }]
+ }.
+
+identity_info() ->
+ {
+ identity_info,
+ untyped,
+ "RabbitMQ node & cluster identity info",
+ [{
+ [
+ {rabbitmq_node, node()},
+ {rabbitmq_cluster, rabbit_nodes:cluster_name()}
+ ],
+ 1
+ }]
+ }.
+
+add_metric_family({Name, Type, Help, Metrics}, Callback) ->
+ Callback(create_mf(?METRIC_NAME(Name), Help, Type, Metrics)).
+
+mf(Callback, Contents, Data) ->
+ [begin
+ Fun = case Conversion of
+ undefined ->
+ fun(D) -> element(Index, D) end;
+ BaseUnitConversionFactor ->
+ fun(D) -> element(Index, D) / BaseUnitConversionFactor end
+ end,
+ Callback(
+ create_mf(
+ ?METRIC_NAME(Name),
+ Help,
+ catch_boolean(Type),
+ ?MODULE,
+ {Type, Fun, Data}
+ )
+ )
+ end || {Index, Conversion, Name, Type, Help} <- Contents],
+ [begin
+ Fun = case Conversion of
+ undefined ->
+ fun(D) -> proplists:get_value(Key, element(Index, D)) end;
+ BaseUnitConversionFactor ->
+ fun(D) -> proplists:get_value(Key, element(Index, D)) / BaseUnitConversionFactor end
+ end,
+ Callback(
+ create_mf(
+ ?METRIC_NAME(Name),
+ Help,
+ catch_boolean(Type),
+ ?MODULE,
+ {Type, Fun, Data}
+ )
+ )
+ end || {Index, Conversion, Name, Type, Help, Key} <- Contents].
+
+mf_totals(Callback, Name, Type, Help, Size) ->
+ Callback(
+ create_mf(
+ ?METRIC_NAME(Name),
+ Help,
+ catch_boolean(Type),
+ Size
+ )
+ ).
+
+collect_metrics(_, {Type, Fun, Items}) ->
+ [metric(Type, labels(Item), Fun(Item)) || Item <- Items].
+
+labels(Item) ->
+ label(element(1, Item)).
+
+label(#resource{virtual_host = VHost, kind = exchange, name = Name}) ->
+ [{vhost, VHost}, {exchange, Name}];
+label(#resource{virtual_host = VHost, kind = queue, name = Name}) ->
+ [{vhost, VHost}, {queue, Name}];
+label({P, {#resource{virtual_host = QVHost, kind = queue, name = QName},
+ #resource{virtual_host = EVHost, kind = exchange, name = EName}}}) when is_pid(P) ->
+ %% channel_queue_exchange_metrics {channel_id, {queue_id, exchange_id}}
+ [{channel, P}, {queue_vhost, QVHost}, {queue, QName},
+ {exchange_vhost, EVHost}, {exchange, EName}];
+label({RemoteAddress, Username, Protocol}) when is_binary(RemoteAddress), is_binary(Username),
+ is_atom(Protocol) ->
+ lists:filter(fun({_, V}) ->
+ V =/= <<>>
+ end, [{remote_address, RemoteAddress}, {username, Username},
+ {protocol, atom_to_binary(Protocol, utf8)}]);
+label({I1, I2}) ->
+ label(I1) ++ label(I2);
+label(P) when is_pid(P) ->
+ [{channel, P}];
+label(A) when is_atom(A) ->
+ case is_protocol(A) of
+ true -> [{protocol, atom_to_binary(A, utf8)}];
+ false -> []
+ end.
+
+is_protocol(P) ->
+ lists:member(P, [amqp091, amqp10, mqtt, http]).
+
+metric(counter, Labels, Value) ->
+ emit_counter_metric_if_defined(Labels, Value);
+metric(gauge, Labels, Value) ->
+ emit_gauge_metric_if_defined(Labels, Value);
+metric(untyped, Labels, Value) ->
+ untyped_metric(Labels, Value);
+metric(boolean, Labels, Value0) ->
+ Value = case Value0 of
+ true -> 1;
+ false -> 0;
+ undefined -> undefined
+ end,
+ untyped_metric(Labels, Value).
+
+%%====================================================================
+%% Private Parts
+%%====================================================================
+catch_boolean(boolean) ->
+ untyped;
+catch_boolean(T) ->
+ T.
+
+emit_counter_metric_if_defined(Labels, Value) ->
+ case Value of
+ undefined -> undefined;
+ '' ->
+ counter_metric(Labels, undefined);
+ Value ->
+ counter_metric(Labels, Value)
+ end.
+
+emit_gauge_metric_if_defined(Labels, Value) ->
+ case Value of
+ undefined -> undefined;
+ '' ->
+ gauge_metric(Labels, undefined);
+ Value ->
+ gauge_metric(Labels, Value)
+ end.
+
+get_data(connection_metrics = Table, false) ->
+ {Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) ->
+ {T,
+ sum(proplists:get_value(recv_cnt, Props), A1),
+ sum(proplists:get_value(send_cnt, Props), A2),
+ sum(proplists:get_value(send_pend, Props), A3),
+ sum(proplists:get_value(channels, Props), A4)}
+ end, empty(Table), Table),
+ [{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}];
+get_data(channel_metrics = Table, false) ->
+ {Table, A1, A2, A3, A4, A5, A6, A7} =
+ ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
+ {T,
+ sum(proplists:get_value(consumer_count, Props), A1),
+ sum(proplists:get_value(messages_unacknowledged, Props), A2),
+ sum(proplists:get_value(messages_unconfirmed, Props), A3),
+ sum(proplists:get_value(messages_uncommitted, Props), A4),
+ sum(proplists:get_value(acks_uncommitted, Props), A5),
+ sum(proplists:get_value(prefetch_count, Props), A6),
+ sum(proplists:get_value(global_prefetch_count, Props), A7)}
+ end, empty(Table), Table),
+ [{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
+ {messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
+ {global_prefetch_count, A7}]}];
+get_data(queue_metrics = Table, false) ->
+ {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} =
+ ets:foldl(fun({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10,
+ A11, A12, A13, A14, A15, A16}) ->
+ {T,
+ sum(proplists:get_value(consumers, Props), A1),
+ sum(proplists:get_value(consumer_utilisation, Props), A2),
+ sum(proplists:get_value(memory, Props), A3),
+ sum(proplists:get_value(messages_ram, Props), A4),
+ sum(proplists:get_value(message_bytes_ram, Props), A5),
+ sum(proplists:get_value(messages_ready_ram, Props), A6),
+ sum(proplists:get_value(messages_unacknowledged_ram, Props), A7),
+ sum(proplists:get_value(messages_persistent, Props), A8),
+ sum(proplists:get_value(message_bytes_persistent, Props), A9),
+ sum(proplists:get_value(message_bytes, Props), A10),
+ sum(proplists:get_value(message_bytes_ready, Props), A11),
+ sum(proplists:get_value(message_bytes_unacknowledged, Props), A12),
+ sum(proplists:get_value(messages_paged_out, Props), A13),
+ sum(proplists:get_value(message_bytes_paged_out, Props), A14),
+ sum(proplists:get_value(disk_reads, Props), A15),
+ sum(proplists:get_value(disk_writes, Props), A16)
+ }
+ end, empty(Table), Table),
+ [{Table, [{consumers, A1}, {consumer_utilisation, A2}, {memory, A3}, {messages_ram, A4},
+ {message_bytes_ram, A5}, {messages_ready_ram, A6},
+ {messages_unacknowledged_ram, A7}, {messages_persistent, A8},
+ {messages_bytes_persistent, A9}, {message_bytes, A10},
+ {message_bytes_ready, A11}, {message_bytes_unacknowledged, A12},
+ {messages_paged_out, A13}, {message_bytes_paged_out, A14},
+ {disk_reads, A15}, {disk_writes, A16}]}];
+get_data(Table, false) when Table == channel_exchange_metrics;
+ Table == queue_coarse_metrics;
+ Table == channel_queue_metrics;
+ Table == connection_coarse_metrics;
+ Table == channel_queue_exchange_metrics;
+ Table == ra_metrics;
+ Table == channel_process_metrics ->
+ Result = ets:foldl(fun({_, V1}, {T, A1}) ->
+ {T, V1 + A1};
+ ({_, V1, _}, {T, A1}) ->
+ {T, V1 + A1};
+ ({_, V1, V2, V3}, {T, A1, A2, A3}) ->
+ {T, V1 + A1, V2 + A2, V3 + A3};
+ ({_, V1, V2, V3, _}, {T, A1, A2, A3}) ->
+ {T, V1 + A1, V2 + A2, V3 + A3};
+ ({_, V1, V2, V3, V4, _}, {T, A1, A2, A3, A4}) ->
+ {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4};
+ ({_, V1, V2, V3, V4}, {T, A1, A2, A3, A4}) ->
+ {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4};
+ ({_, V1, V2, V3, V4, V5, V6}, {T, A1, A2, A3, A4, A5, A6}) ->
+ %% ra_metrics: raft_entry_commit_latency_seconds needs to be an average
+ {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4, V5 + A5, accumulate_count_and_sum(V6, A6)};
+ ({_, V1, V2, V3, V4, V5, V6, V7, _}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
+ {T, V1 + A1, V2 + A2, V3 + A3, V4 + A4, V5 + A5, V6 + A6, V7 + A7}
+ end, empty(Table), Table),
+ case Table of
+ %% raft_entry_commit_latency_seconds needs to be an average
+ ra_metrics ->
+ {Count, Sum} = element(7, Result),
+ [setelement(7, Result, division(Sum, Count))];
+ _ ->
+ [Result]
+ end;
+get_data(Table, _) ->
+ ets:tab2list(Table).
+
+division(0, 0) ->
+ 0;
+division(A, B) ->
+ A / B.
+
+accumulate_count_and_sum(Value, {Count, Sum}) ->
+ {Count + 1, Sum + Value}.
+
+empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics ->
+ {T, 0};
+empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
+ {T, 0, 0, 0};
+empty(T) when T == channel_exchange_metrics; T == queue_coarse_metrics; T == connection_metrics ->
+ {T, 0, 0, 0, 0};
+empty(T) when T == ra_metrics ->
+ {T, 0, 0, 0, 0, 0, {0, 0}};
+empty(T) when T == channel_queue_metrics; T == channel_metrics ->
+ {T, 0, 0, 0, 0, 0, 0, 0};
+empty(queue_metrics = T) ->
+ {T, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}.
+
+sum(undefined, B) ->
+ B;
+sum('', B) ->
+ B;
+sum(A, B) ->
+ A + B.