diff options
author | Alexey Lebedeff <binarin@binarin.ru> | 2021-09-20 17:53:47 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-20 17:53:47 +0200 |
commit | e4bda83b8ec81e2efe44ffa3d7111cd9f401b5af (patch) | |
tree | 6efe31b7ac4ef4ef3b7543201c008e5f8a62b596 | |
parent | 9ea1a823cc543989a710a35b8c4b10c9f7c968fd (diff) | |
parent | 4bb2262140663b186c8d7beb57d10e3d86ef52ea (diff) | |
download | rabbitmq-server-git-e4bda83b8ec81e2efe44ffa3d7111cd9f401b5af.tar.gz |
Merge pull request #3421 from rabbitmq/alebedeff/opp-92
Make prometheus plugin output customizable
6 files changed, 685 insertions, 91 deletions
diff --git a/deps/rabbitmq_prometheus/README.md b/deps/rabbitmq_prometheus/README.md index c65c2675bf..61a5a2b903 100644 --- a/deps/rabbitmq_prometheus/README.md +++ b/deps/rabbitmq_prometheus/README.md @@ -85,6 +85,20 @@ To go back to aggregated metrics on-the-fly, run the following command: rabbitmqctl eval 'application:set_env(rabbitmq_prometheus, return_per_object_metrics, false).' ``` +## Selective querying of per-object metrics + +As mentioned in the previous section, returning a lot of per-object metrics is quite computationally expensive process. One of the reasons is that `/metrics/per-object` returns every possible metric for every possible object - even if having them makes no sense in the day-to-day monitoring activity. + +That's why there is an additional endpoint that always return per-object metrics and allows one to explicitly query only the things that are relevant - `/metrics/detailed`. By default it doesn't return anything at all, but it's possible to specify required metric groups and virtual host filters in the GET-parameters. Scraping `/metrics/detailed?vhost=vhost-1&vhost=vhost-2&family=queue_coarse_metrics&family=queue_consumer_count`. will only return requested metrics (and not, for example, channel metrics that include erlang PID in labels). + +This endpoint supports the following parameters: + +* Zero or more `family` - only the requested metric families will be returned. The full list is documented in [metrics-detailed](metrics-detailed.md). +* Zero or more `vhost` - if it's given, queue related metrics (`queue_coarse_metrics`, `queue_consumer_count` and `queue_metrics`) will be returned only for given vhost(s). + +The returned metrics use different prefix `rabbitmq_detailed_` (instead of plain `rabbitmq_` used by other endpoints), so that endpoint can be used simultaneously with `/metrics`, and existing dashboards won't be affected. + +Here are the performance gains you can expect from using this endpoint. On a test system with 10k queues/10k consumer/10k producers, `/metrics/per-object` took a bit over 2 minutes. Querying `/metrics/detailed?family=queue_coarse_metrics&family=queue_consumer_count` provides just enough metrics to see how many messages sit in every queue and how much consumers each of these queues have. And it takes only 2 seconds, a significant improvement over indiscriminate `/metrics/per-object`. ## Contributing diff --git a/deps/rabbitmq_prometheus/metrics-detailed.md b/deps/rabbitmq_prometheus/metrics-detailed.md new file mode 100644 index 0000000000..262fca04bb --- /dev/null +++ b/deps/rabbitmq_prometheus/metrics-detailed.md @@ -0,0 +1,238 @@ +## Configurable RabbitMQ metric groups + +Those are metrics than can be explicitly requested via `/metrics/detailed` endpoint. + +### Generic metrics + +These are some generic metrics, which do not refer to any specific +queue/connection/etc. + +#### Connection/channel/queue churn + +Group `connection_churn_metrics`: + +| Metric | Description | +|--------------------------------------------|--------------------------------------------------| +| rabbitmq_detailed_connections_opened_total | Total number of connections opened | +| rabbitmq_detailed_connections_closed_total | Total number of connections closed or terminated | +| rabbitmq_detailed_channels_opened_total | Total number of channels opened | +| rabbitmq_detailed_channels_closed_total | Total number of channels closed | +| rabbitmq_detailed_queues_declared_total | Total number of queues declared | +| rabbitmq_detailed_queues_created_total | Total number of queues created | +| rabbitmq_detailed_queues_deleted_total | Total number of queues deleted | + + +#### Erlang VM/Disk IO via RabbitMQ + +Group `node_coarse_metrics`: + +| Metric | Description | +|-----------------------------------------------------------|-----------------------------------------------------------------------| +| rabbitmq_detailed_process_open_fds | Open file descriptors | +| rabbitmq_detailed_process_open_tcp_sockets | Open TCP sockets | +| rabbitmq_detailed_process_resident_memory_bytes | Memory used in bytes | +| rabbitmq_detailed_disk_space_available_bytes | Disk space available in bytes | +| rabbitmq_detailed_erlang_processes_used | Erlang processes used | +| rabbitmq_detailed_erlang_gc_runs_total | Total number of Erlang garbage collector runs | +| rabbitmq_detailed_erlang_gc_reclaimed_bytes_total | Total number of bytes of memory reclaimed by Erlang garbage collector | +| rabbitmq_detailed_erlang_scheduler_context_switches_total | Total number of Erlang scheduler context switches | + +Group `node_metrics`: + +| Metric | Description | +|----------------------------------------------------|----------------------------------------| +| rabbitmq_detailed_process_max_fds | Open file descriptors limit | +| rabbitmq_detailed_process_max_tcp_sockets | Open TCP sockets limit | +| rabbitmq_detailed_resident_memory_limit_bytes | Memory high watermark in bytes | +| rabbitmq_detailed_disk_space_available_limit_bytes | Free disk space low watermark in bytes | +| rabbitmq_detailed_erlang_processes_limit | Erlang processes limit | +| rabbitmq_detailed_erlang_scheduler_run_queue | Erlang scheduler run queue | +| rabbitmq_detailed_erlang_net_ticktime_seconds | Inter-node heartbeat interval | +| rabbitmq_detailed_erlang_uptime_seconds | Node uptime | + + +Group `node_persister_metrics`: + +| Metric | Description | +|-------------------------------------------------------|------------------------------------------------------| +| rabbitmq_detailed_io_read_ops_total | Total number of I/O read operations | +| rabbitmq_detailed_io_read_bytes_total | Total number of I/O bytes read | +| rabbitmq_detailed_io_write_ops_total | Total number of I/O write operations | +| rabbitmq_detailed_io_write_bytes_total | Total number of I/O bytes written | +| rabbitmq_detailed_io_sync_ops_total | Total number of I/O sync operations | +| rabbitmq_detailed_io_seek_ops_total | Total number of I/O seek operations | +| rabbitmq_detailed_io_open_attempt_ops_total | Total number of file open attempts | +| rabbitmq_detailed_io_reopen_ops_total | Total number of times files have been reopened | +| rabbitmq_detailed_schema_db_ram_tx_total | Total number of Schema DB memory transactions | +| rabbitmq_detailed_schema_db_disk_tx_total | Total number of Schema DB disk transactions | +| rabbitmq_detailed_msg_store_read_total | Total number of Message Store read operations | +| rabbitmq_detailed_msg_store_write_total | Total number of Message Store write operations | +| rabbitmq_detailed_queue_index_read_ops_total | Total number of Queue Index read operations | +| rabbitmq_detailed_queue_index_write_ops_total | Total number of Queue Index write operations | +| rabbitmq_detailed_queue_index_journal_write_ops_total | Total number of Queue Index Journal write operations | +| rabbitmq_detailed_io_read_time_seconds_total | Total I/O read time | +| rabbitmq_detailed_io_write_time_seconds_total | Total I/O write time | +| rabbitmq_detailed_io_sync_time_seconds_total | Total I/O sync time | +| rabbitmq_detailed_io_seek_time_seconds_total | Total I/O seek time | +| rabbitmq_detailed_io_open_attempt_time_seconds_total | Total file open attempts time | + + +#### Raft metrics + +Group `ra_metrics`: + +| Metric | Description | +|-----------------------------------------------------|--------------------------------------------| +| rabbitmq_detailed_raft_term_total | Current Raft term number | +| rabbitmq_detailed_raft_log_snapshot_index | Raft log snapshot index | +| rabbitmq_detailed_raft_log_last_applied_index | Raft log last applied index | +| rabbitmq_detailed_raft_log_commit_index | Raft log commit index | +| rabbitmq_detailed_raft_log_last_written_index | Raft log last written index | +| rabbitmq_detailed_raft_entry_commit_latency_seconds | Time taken for a log entry to be committed | + +#### Auth metrics + +Group `auth_attempt_metrics`: + +| Metric | Description | +|-------------------------------------------------|----------------------------------------------------| +| rabbitmq_detailed_auth_attempts_total | Total number of authorization attempts | +| rabbitmq_detailed_auth_attempts_succeeded_total | Total number of successful authentication attempts | +| rabbitmq_detailed_auth_attempts_failed_total | Total number of failed authentication attempts | + + +Group `auth_attempt_detailed_metrics` (when aggregated, it produces the same numbers as `auth_attempt_metrics` - so it's mutually exclusive with it in the aggregation mode): + +| Metric | Description | +|----------------------------------------------------------|--------------------------------------------------------------------| +| rabbitmq_detailed_auth_attempts_detailed_total | Total number of authorization attempts with source info | +| rabbitmq_detailed_auth_attempts_detailed_succeeded_total | Total number of successful authorization attempts with source info | +| rabbitmq_detailed_auth_attempts_detailed_failed_total | Total number of failed authorization attempts with source info | + + +### Queue metrics + +Each of metrics in this group refers to a single queue in its label. Amount of data and performance totally depends on the number of queues. + +They are listed from least expensive to collect to the most expensive. + +#### Queue coarse metrics + +Group `queue_coarse_metrics`: + +| Metric | Description | +|--------------------------------------------------|--------------------------------------------------------------| +| rabbitmq_detailed_queue_messages_ready | Messages ready to be delivered to consumers | +| rabbitmq_detailed_queue_messages_unacked | Messages delivered to consumers but not yet acknowledged | +| rabbitmq_detailed_queue_messages | Sum of ready and unacknowledged messages - total queue depth | +| rabbitmq_detailed_queue_process_reductions_total | Total number of queue process reductions | + +#### Per-queue consumer count + +Group `queue_consumer_count`. This is a strict subset of `queue_metrics` which contains only a single metric (if both `queue_consumer_count` and `queue_metrics` are requested, the former will be automatically skipped): + +| Metric | Description | +|-----------------------------------|----------------------| +| rabbitmq_detailed_queue_consumers | Consumers on a queue | + +This is one of the more telling metrics, and having it separately allows to skip some expensive operations for extracting/exposing the other metrics from the same datasource. + +#### Detailed queue metrics + +Group `queue_metrics` contains all the metrics for every queue, and can be relatively expensive to produce: + +| Metric | Description | +|---------------------------------------------------|------------------------------------------------------------| +| rabbitmq_detailed_queue_consumers | Consumers on a queue | +| rabbitmq_detailed_queue_consumer_capacity | Consumer capacity | +| rabbitmq_detailed_queue_consumer_utilisation | Same as consumer capacity | +| rabbitmq_detailed_queue_process_memory_bytes | Memory in bytes used by the Erlang queue process | +| rabbitmq_detailed_queue_messages_ram | Ready and unacknowledged messages stored in memory | +| rabbitmq_detailed_queue_messages_ram_bytes | Size of ready and unacknowledged messages stored in memory | +| rabbitmq_detailed_queue_messages_ready_ram | Ready messages stored in memory | +| rabbitmq_detailed_queue_messages_unacked_ram | Unacknowledged messages stored in memory | +| rabbitmq_detailed_queue_messages_persistent | Persistent messages | +| rabbitmq_detailed_queue_messages_persistent_bytes | Size in bytes of persistent messages | +| rabbitmq_detailed_queue_messages_bytes | Size in bytes of ready and unacknowledged messages | +| rabbitmq_detailed_queue_messages_ready_bytes | Size in bytes of ready messages | +| rabbitmq_detailed_queue_messages_unacked_bytes | Size in bytes of all unacknowledged messages | +| rabbitmq_detailed_queue_messages_paged_out | Messages paged out to disk | +| rabbitmq_detailed_queue_messages_paged_out_bytes | Size in bytes of messages paged out to disk | +| rabbitmq_detailed_queue_disk_reads_total | Total number of times queue read messages from disk | +| rabbitmq_detailed_queue_disk_writes_total | Total number of times queue wrote messages to disk | + +Tests show that performance difference between it and `queue_consumer_count` is approximately 8 times. E.g. on a test broker with 10k queues/producers/consumers, scrape time was ~8 second and ~1 respectively. So while it's expensive, it's not prohibitively so - especially compared to other metrics from per-connection/channel groups. + +### Connection/channel metrics + +All of those include Erlang PID in their label, which is rarely useful when ingested into Prometheus. And they are most expensive to produce, the most resources are spent by `/metrics/per-object` on these. + +#### Connection metrics + +Group `connection_coarse_metrics`: + +| Metric | Description | +|-------------------------------------------------------|------------------------------------------------| +| rabbitmq_detailed_connection_incoming_bytes_total | Total number of bytes received on a connection | +| rabbitmq_detailed_connection_outgoing_bytes_total | Total number of bytes sent on a connection | +| rabbitmq_detailed_connection_process_reductions_total | Total number of connection process reductions | + +Group `connection_metrics`: + +| Metric | Description | +|-----------------------------------------------------|------------------------------------------------------| +| rabbitmq_detailed_connection_incoming_packets_total | Total number of packets received on a connection | +| rabbitmq_detailed_connection_outgoing_packets_total | Total number of packets sent on a connection | +| rabbitmq_detailed_connection_pending_packets | Number of packets waiting to be sent on a connection | +| rabbitmq_detailed_connection_channels | Channels on a connection | + +#### General channel metrics + +Group `channel_metrics`: + +| Metric | Description | +|------------------------------------------------|-----------------------------------------------------------------------| +| rabbitmq_detailed_channel_consumers | Consumers on a channel | +| rabbitmq_detailed_channel_messages_unacked | Delivered but not yet acknowledged messages | +| rabbitmq_detailed_channel_messages_unconfirmed | Published but not yet confirmed messages | +| rabbitmq_detailed_channel_messages_uncommitted | Messages received in a transaction but not yet committed | +| rabbitmq_detailed_channel_acks_uncommitted | Message acknowledgements in a transaction not yet committed | +| rabbitmq_detailed_consumer_prefetch | Limit of unacknowledged messages for each consumer | +| rabbitmq_detailed_channel_prefetch | Total limit of unacknowledged messages for all consumers on a channel | + + +Group `channel_process_metrics`: + +| Metric | Description | +|----------------------------------------------------|--------------------------------------------| +| rabbitmq_detailed_channel_process_reductions_total | Total number of channel process reductions | + + +#### Channel metrics with queue/exchange breakdowns + +Group `channel_exchange_metrics`: + +| Metric | Description | +|--------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------| +| rabbitmq_detailed_channel_messages_published_total | Total number of messages published into an exchange on a channel | +| rabbitmq_detailed_channel_messages_confirmed_total | Total number of messages published into an exchange and confirmed on the channel | +| rabbitmq_detailed_channel_messages_unroutable_returned_total | Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable | +| rabbitmq_detailed_channel_messages_unroutable_dropped_total | Total number of messages published as non-mandatory into an exchange and dropped as unroutable | + +Group `channel_queue_metrics`: + +| Metric | Description | +|--------------------------------------------------------|-----------------------------------------------------------------------------------| +| rabbitmq_detailed_channel_get_ack_total | Total number of messages fetched with basic.get in manual acknowledgement mode | +| rabbitmq_detailed_channel_get_total | Total number of messages fetched with basic.get in automatic acknowledgement mode | +| rabbitmq_detailed_channel_messages_delivered_ack_total | Total number of messages delivered to consumers in manual acknowledgement mode | +| rabbitmq_detailed_channel_messages_delivered_total | Total number of messages delivered to consumers in automatic acknowledgement mode | +| rabbitmq_detailed_channel_messages_redelivered_total | Total number of messages redelivered to consumers | +| rabbitmq_detailed_channel_messages_acked_total | Total number of messages acknowledged by consumers | +| rabbitmq_detailed_channel_get_empty_total | Total number of times basic.get operations fetched no message | + +Group `channel_queue_exchange_metrics`: + +| Metric | Description | +|--------------------------------------------------|----------------------------------------------| +| rabbitmq_detailed_queue_messages_published_total | Total number of messages published to queues | diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl index f06ab643e3..bd1cd9067f 100644 --- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl +++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl @@ -20,14 +20,16 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -behaviour(prometheus_collector). - -%% Because all metrics are from RabbitMQ's perspective, -%% cached for up to 5 seconds by default (configurable), -%% we prepend rabbitmq_ to all metrics emitted by this collector. -%% Some metrics are for Erlang (erlang_), Mnesia (schema_db_) or the System (io_), +%% We prepend either rabbitmq_ or rabbitmq_detailed_ to all metrics emitted by this collector. +%% As there are also some metrics for Erlang (erlang_), Mnesia (schema_db_) or the System (io_), %% as observed by RabbitMQ. + +%% Used by `/metrics` and `/metrics/per-object`. -define(METRIC_NAME_PREFIX, "rabbitmq_"). +%% Used by `/metrics/detailed` endpoint +-define(DETAILED_METRIC_NAME_PREFIX, "rabbitmq_detailed_"). + %% ==The source of these metrics can be found in the rabbit_core_metrics module== %% The relevant files are: %% * rabbit_common/src/rabbit_core_metrics.erl @@ -52,37 +54,8 @@ -define(MICROSECOND, 1000000). -define(METRICS_RAW, [ - {channel_metrics, [ - {2, undefined, channel_consumers, gauge, "Consumers on a channel", consumer_count}, - {2, undefined, channel_messages_unacked, gauge, "Delivered but not yet acknowledged messages", messages_unacknowledged}, - {2, undefined, channel_messages_unconfirmed, gauge, "Published but not yet confirmed messages", messages_unconfirmed}, - {2, undefined, channel_messages_uncommitted, gauge, "Messages received in a transaction but not yet committed", messages_uncommitted}, - {2, undefined, channel_acks_uncommitted, gauge, "Message acknowledgements in a transaction not yet committed", acks_uncommitted}, - {2, undefined, consumer_prefetch, gauge, "Limit of unacknowledged messages for each consumer", prefetch_count}, - {2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count} - ]}, - - {channel_exchange_metrics, [ - {2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"}, - {3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"}, - {4, undefined, channel_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"}, - {5, undefined, channel_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"} - ]}, - - {channel_process_metrics, [ - {2, undefined, channel_process_reductions_total, counter, "Total number of channel process reductions"} - ]}, - - {channel_queue_metrics, [ - {2, undefined, channel_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"}, - {3, undefined, channel_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"}, - {4, undefined, channel_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"}, - {5, undefined, channel_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"}, - {6, undefined, channel_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"}, - {7, undefined, channel_messages_acked_total, counter, "Total number of messages acknowledged by consumers"}, - {8, undefined, channel_get_empty_total, counter, "Total number of times basic.get operations fetched no message"} - ]}, +%%% Those are global, i.e. they contain no reference to queue/vhost/channel {connection_churn_metrics, [ {2, undefined, connections_opened_total, counter, "Total number of connections opened"}, {3, undefined, connections_closed_total, counter, "Total number of connections closed or terminated"}, @@ -92,24 +65,6 @@ {7, undefined, queues_created_total, counter, "Total number of queues created"}, {8, undefined, queues_deleted_total, counter, "Total number of queues deleted"} ]}, - - {connection_coarse_metrics, [ - {2, undefined, connection_incoming_bytes_total, counter, "Total number of bytes received on a connection"}, - {3, undefined, connection_outgoing_bytes_total, counter, "Total number of bytes sent on a connection"}, - {4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"} - ]}, - - {connection_metrics, [ - {2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt}, - {2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt}, - {2, undefined, connection_pending_packets, gauge, "Number of packets waiting to be sent on a connection", send_pend}, - {2, undefined, connection_channels, gauge, "Channels on a connection", channels} - ]}, - - {channel_queue_exchange_metrics, [ - {2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"} - ]}, - {node_coarse_metrics, [ {2, undefined, process_open_fds, gauge, "Open file descriptors", fd_used}, {2, undefined, process_open_tcp_sockets, gauge, "Open TCP sockets", sockets_used}, @@ -120,7 +75,6 @@ {2, undefined, erlang_gc_reclaimed_bytes_total, counter, "Total number of bytes of memory reclaimed by Erlang garbage collector", gc_bytes_reclaimed}, {2, undefined, erlang_scheduler_context_switches_total, counter, "Total number of Erlang scheduler context switches", context_switches} ]}, - {node_metrics, [ {2, undefined, process_max_fds, gauge, "Open file descriptors limit", fd_total}, {2, undefined, process_max_tcp_sockets, gauge, "Open TCP sockets limit", sockets_total}, @@ -164,6 +118,19 @@ {7, ?MILLISECOND, raft_entry_commit_latency_seconds, gauge, "Time taken for a log entry to be committed"} ]}, + {auth_attempt_metrics, [ + {2, undefined, auth_attempts_total, counter, "Total number of authorization attempts"}, + {3, undefined, auth_attempts_succeeded_total, counter, "Total number of successful authentication attempts"}, + {4, undefined, auth_attempts_failed_total, counter, "Total number of failed authentication attempts"} + ]}, + + {auth_attempt_detailed_metrics, [ + {2, undefined, auth_attempts_detailed_total, counter, "Total number of authorization attempts with source info"}, + {3, undefined, auth_attempts_detailed_succeeded_total, counter, "Total number of successful authorization attempts with source info"}, + {4, undefined, auth_attempts_detailed_failed_total, counter, "Total number of failed authorization attempts with source info"} + ]}, + +%%% Those metrics have reference only to a queue name. This is the only group where filtering (e.g. by vhost) makes sense. {queue_coarse_metrics, [ {2, undefined, queue_messages_ready, gauge, "Messages ready to be delivered to consumers"}, {3, undefined, queue_messages_unacked, gauge, "Messages delivered to consumers but not yet acknowledged"}, @@ -171,6 +138,10 @@ {5, undefined, queue_process_reductions_total, counter, "Total number of queue process reductions"} ]}, + {queue_consumer_count, [ + {2, undefined, queue_consumers, gauge, "Consumers on a queue", consumers} + ]}, + {queue_metrics, [ {2, undefined, queue_consumers, gauge, "Consumers on a queue", consumers}, {2, undefined, queue_consumer_capacity, gauge, "Consumer capacity", consumer_capacity}, @@ -191,18 +162,56 @@ {2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes} ]}, - {auth_attempt_metrics, [ - {2, undefined, auth_attempts_total, counter, "Total number of authorization attempts"}, - {3, undefined, auth_attempts_succeeded_total, counter, "Total number of successful authentication attempts"}, - {4, undefined, auth_attempts_failed_total, counter, "Total number of failed authentication attempts"} +%%% Metrics that contain reference to a channel. Some of them also have +%%% a queue name, but in this case filtering on it doesn't make any +%%% sense, as the queue is not an object of interest here. + {channel_metrics, [ + {2, undefined, channel_consumers, gauge, "Consumers on a channel", consumer_count}, + {2, undefined, channel_messages_unacked, gauge, "Delivered but not yet acknowledged messages", messages_unacknowledged}, + {2, undefined, channel_messages_unconfirmed, gauge, "Published but not yet confirmed messages", messages_unconfirmed}, + {2, undefined, channel_messages_uncommitted, gauge, "Messages received in a transaction but not yet committed", messages_uncommitted}, + {2, undefined, channel_acks_uncommitted, gauge, "Message acknowledgements in a transaction not yet committed", acks_uncommitted}, + {2, undefined, consumer_prefetch, gauge, "Limit of unacknowledged messages for each consumer", prefetch_count}, + {2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count} ]}, - {auth_attempt_detailed_metrics, [ - {2, undefined, auth_attempts_detailed_total, counter, "Total number of authorization attempts with source info"}, - {3, undefined, auth_attempts_detailed_succeeded_total, counter, "Total number of successful authorization attempts with source info"}, - {4, undefined, auth_attempts_detailed_failed_total, counter, "Total number of failed authorization attempts with source info"} - ]} + {channel_exchange_metrics, [ + {2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"}, + {3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"}, + {4, undefined, channel_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"}, + {5, undefined, channel_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"} + ]}, + + {channel_process_metrics, [ + {2, undefined, channel_process_reductions_total, counter, "Total number of channel process reductions"} + ]}, + {channel_queue_metrics, [ + {2, undefined, channel_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"}, + {3, undefined, channel_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"}, + {4, undefined, channel_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"}, + {5, undefined, channel_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"}, + {6, undefined, channel_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"}, + {7, undefined, channel_messages_acked_total, counter, "Total number of messages acknowledged by consumers"}, + {8, undefined, channel_get_empty_total, counter, "Total number of times basic.get operations fetched no message"} + ]}, + + {connection_coarse_metrics, [ + {2, undefined, connection_incoming_bytes_total, counter, "Total number of bytes received on a connection"}, + {3, undefined, connection_outgoing_bytes_total, counter, "Total number of bytes sent on a connection"}, + {4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"} + ]}, + + {connection_metrics, [ + {2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt}, + {2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt}, + {2, undefined, connection_pending_packets, gauge, "Number of packets waiting to be sent on a connection", send_pend}, + {2, undefined, connection_channels, gauge, "Channels on a connection", channels} + ]}, + + {channel_queue_exchange_metrics, [ + {2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"} + ]} ]). -define(TOTALS, [ @@ -222,29 +231,45 @@ register() -> deregister_cleanup(_) -> ok. +collect_mf('detailed', Callback) -> + collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(), Callback), + ok; collect_mf('per-object', Callback) -> - collect(true, Callback); + collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), + totals(Callback), + ok; collect_mf(_Registry, Callback) -> PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false), - collect(PerObjectMetrics, Callback). + collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback), + totals(Callback), + ok. -collect(PerObjectMetrics, Callback) -> +collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) -> [begin - Data = get_data(Table, PerObjectMetrics), - mf(Callback, Contents, Data) - end || {Table, Contents} <- ?METRICS_RAW, include_when_per_object_metrics(PerObjectMetrics, Table)], + Data = get_data(Table, PerObjectMetrics, VHostsFilter), + mf(Callback, Prefix, Contents, Data) + end || {Table, Contents} <- IncludedMFs, not mutually_exclusive_mf(PerObjectMetrics, Table, IncludedMFs)]. + +totals(Callback) -> [begin Size = ets:info(Table, size), mf_totals(Callback, Name, Type, Help, Size) end || {Table, Name, Type, Help} <- ?TOTALS], add_metric_family(build_info(), Callback), - add_metric_family(identity_info(), Callback), - ok. - -include_when_per_object_metrics(false, auth_attempt_detailed_metrics) -> - false; -include_when_per_object_metrics(_, _) -> - true. + add_metric_family(identity_info(), Callback). + +%% Aggregated `auth``_attempt_detailed_metrics` and +%% `auth_attempt_metrics` are the same numbers. The former is just +%% more computationally intensive. +mutually_exclusive_mf(false, auth_attempt_detailed_metrics, _) -> + true; +%% `queue_consumer_count` is a strict subset of queue metrics. They +%% read from the same table, but `queue_consumer_count` skips a lot of +%% `proplists:get_value/2` calls. +mutually_exclusive_mf(_, queue_consumer_count, MFs) -> + lists:keymember(queue_metrics, 1, MFs); +mutually_exclusive_mf(_, _, _) -> + false. build_info() -> ProductInfo = rabbit:product_info(), @@ -296,7 +321,7 @@ identity_info() -> add_metric_family({Name, Type, Help, Metrics}, Callback) -> Callback(create_mf(?METRIC_NAME(Name), Help, Type, Metrics)). -mf(Callback, Contents, Data) -> +mf(Callback, Prefix, Contents, Data) -> [begin Fun = case Conversion of undefined -> @@ -306,7 +331,7 @@ mf(Callback, Contents, Data) -> end, Callback( create_mf( - ?METRIC_NAME(Name), + [Prefix, prometheus_model_helpers:metric_name(Name)], Help, catch_boolean(Type), ?MODULE, @@ -323,7 +348,7 @@ mf(Callback, Contents, Data) -> end, Callback( create_mf( - ?METRIC_NAME(Name), + [Prefix, prometheus_model_helpers:metric_name(Name)], Help, catch_boolean(Type), ?MODULE, @@ -416,7 +441,7 @@ emit_gauge_metric_if_defined(Labels, Value) -> gauge_metric(Labels, Value) end. -get_data(connection_metrics = Table, false) -> +get_data(connection_metrics = Table, false, _) -> {Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) -> {T, sum(proplists:get_value(recv_cnt, Props), A1), @@ -425,7 +450,7 @@ get_data(connection_metrics = Table, false) -> sum(proplists:get_value(channels, Props), A4)} end, empty(Table), Table), [{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}]; -get_data(channel_metrics = Table, false) -> +get_data(channel_metrics = Table, false, _) -> {Table, A1, A2, A3, A4, A5, A6, A7} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) -> {T, @@ -440,10 +465,24 @@ get_data(channel_metrics = Table, false) -> [{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3}, {messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6}, {global_prefetch_count, A7}]}]; -get_data(queue_metrics = Table, false) -> +get_data(queue_consumer_count = MF, false, VHostsFilter) -> + Table = queue_metrics, %% Real table name + {_, A1} = ets:foldl(fun + ({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> + Acc; + ({_, Props, _}, {T, A1}) -> + {T, + sum(proplists:get_value(consumers, Props), A1) + } + end, empty(MF), Table), + [{Table, [{consumers, A1}]}]; +get_data(queue_metrics = Table, false, VHostsFilter) -> {Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} = - ets:foldl(fun({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, - A11, A12, A13, A14, A15, A16}) -> + ets:foldl(fun + ({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> + Acc; + ({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, + A11, A12, A13, A14, A15, A16}) -> {T, sum(proplists:get_value(consumers, Props), A1), sum(proplists:get_value(consumer_utilisation, Props), A2), @@ -470,14 +509,18 @@ get_data(queue_metrics = Table, false) -> {message_bytes_ready, A11}, {message_bytes_unacknowledged, A12}, {messages_paged_out, A13}, {message_bytes_paged_out, A14}, {disk_reads, A15}, {disk_writes, A16}]}]; -get_data(Table, false) when Table == channel_exchange_metrics; +get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics; Table == queue_coarse_metrics; Table == channel_queue_metrics; Table == connection_coarse_metrics; Table == channel_queue_exchange_metrics; Table == ra_metrics; Table == channel_process_metrics -> - Result = ets:foldl(fun({_, V1}, {T, A1}) -> + Result = ets:foldl(fun + %% For queue_coarse_metrics + ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false -> + Acc; + ({_, V1}, {T, A1}) -> {T, V1 + A1}; ({_, V1, _}, {T, A1}) -> {T, V1 + A1}; @@ -503,7 +546,24 @@ get_data(Table, false) when Table == channel_exchange_metrics; _ -> [Result] end; -get_data(Table, _) -> +get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) -> + ets:foldl(fun + ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> + [Row|Acc]; + (_, Acc) -> + Acc + end, [], Table); +get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count -> + Table = queue_metrics, + ets:foldl(fun + ({#resource{kind = queue, virtual_host = VHost}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) -> + [Row|Acc]; + (_, Acc) -> + Acc + end, [], Table); +get_data(queue_consumer_count, true, _) -> + ets:tab2list(queue_metrics); +get_data(Table, _, _) -> ets:tab2list(Table). division(0, 0) -> @@ -514,7 +574,7 @@ division(A, B) -> accumulate_count_and_sum(Value, {Count, Sum}) -> {Count + 1, Sum + Value}. -empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics -> +empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count -> {T, 0}; empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics -> {T, 0, 0, 0}; @@ -533,3 +593,23 @@ sum('', B) -> B; sum(A, B) -> A + B. + +enabled_mfs_from_pdict() -> + case get(prometheus_mf_filter) of + undefined -> + []; + MFNames -> + MFNameSet = sets:from_list(MFNames), + [ MF || MF = {Table, _} <- ?METRICS_RAW, sets:is_element(Table, MFNameSet) ] + end. + +vhosts_filter_from_pdict() -> + case get(prometheus_vhost_filter) of + undefined -> + false; + L -> + %% Having both excluded and included hosts in this map makes some guards easier (or even possible). + All = maps:from_list([ {VHost, false} || VHost <- rabbit_vhost:list()]), + Enabled = maps:from_list([ {VHost, true} || VHost <- L ]), + maps:merge(All, Enabled) + end. diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl index bf397d8ca1..45f5bb738d 100644 --- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl +++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl @@ -27,6 +27,9 @@ build_dispatcher() -> prometheus_rabbitmq_core_metrics_collector, prometheus_rabbitmq_global_metrics_collector ]), + prometheus_registry:register_collectors('detailed', [ + prometheus_rabbitmq_core_metrics_collector + ]), rabbit_prometheus_handler:setup(), cowboy_router:compile([{'_', dispatcher()}]). diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl index c2f8ddb080..954e2f878d 100644 --- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl +++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl @@ -32,7 +32,8 @@ is_authorized(ReqData, Context) -> setup() -> setup_metrics(telemetry_registry()), - setup_metrics('per-object'). + setup_metrics('per-object'), + setup_metrics('detailed'). setup_metrics(Registry) -> ScrapeDuration = [{name, ?SCRAPE_DURATION}, @@ -67,6 +68,7 @@ gen_response(<<"GET">>, Request) -> false -> cowboy_req:reply(404, #{}, <<"Unknown Registry">>, Request); Registry -> + put_filtering_options_into_process_dictionary(Request), gen_metrics_response(Registry, Request) end; gen_response(_, Request) -> @@ -146,4 +148,43 @@ encode_format(ContentType, Encoding, Scrape, Registry) -> encode_format_("gzip", Scrape) -> zlib:gzip(Scrape); encode_format_("identity", Scrape) -> - Scrape. + Scrape. + +%% It's not easy to pass this information in a pure way (it'll require changing prometheus.erl) +put_filtering_options_into_process_dictionary(Request) -> + #{vhost := VHosts, family := Families} = cowboy_req:match_qs([{vhost, [], undefined}, {family, [], undefined}], Request), + case parse_vhosts(VHosts) of + Vs when is_list(Vs) -> + put(prometheus_vhost_filter, Vs); + _ -> ok + end, + case parse_metric_families(Families) of + Fs when is_list(Fs) -> + put(prometheus_mf_filter, Fs); + _ -> ok + end, + ok. + +parse_vhosts(N) when is_binary(N) -> + parse_vhosts([N]); +parse_vhosts(L) when is_list(L) -> + [ VHostName || VHostName <- L, rabbit_vhost:exists(VHostName)]; +parse_vhosts(_) -> + false. + +parse_metric_families(N) when is_binary(N) -> + parse_metric_families([N]); +parse_metric_families([]) -> + []; +parse_metric_families([B|Bs]) -> + %% binary_to_existing_atom() should be enough, as it's used for filtering things out. + %% Getting a full list of supported metrics would be harder. + %% NB But on the other hand, it's nice to have validation. Implement it? + case catch erlang:binary_to_existing_atom(B) of + A when is_atom(A) -> + [A|parse_metric_families(Bs)]; + _ -> + parse_metric_families(Bs) + end; +parse_metric_families(_) -> + false. diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl index f713e36a72..b903410057 100644 --- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl +++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl @@ -22,7 +22,8 @@ all() -> {group, aggregated_metrics}, {group, per_object_metrics}, {group, per_object_endpoint_metrics}, - {group, commercial} + {group, commercial}, + {group, detailed_metrics} ]. groups() -> @@ -48,6 +49,14 @@ groups() -> ]}, {commercial, [], [ build_info_product_test + ]}, + {detailed_metrics, [], [ + detailed_metrics_no_families_enabled_by_default, + queue_consumer_count_single_vhost_per_object_test, + queue_consumer_count_all_vhosts_per_object_test, + queue_coarse_metrics_per_object_test, + queue_metrics_per_object_test, + queue_consumer_count_and_queue_metrics_mutually_exclusive_test ]} ]. @@ -84,6 +93,71 @@ init_per_group(per_object_endpoint_metrics, Config0) -> ]}, Config1 = rabbit_ct_helpers:merge_app_env(Config0, PathConfig), init_per_group(aggregated_metrics, Config1); +init_per_group(detailed_metrics, Config0) -> + StatsEnv = {rabbit, [{collect_statistics, coarse}, {collect_statistics_interval, 100}]}, + + Config1 = init_per_group(detailed_metrics, rabbit_ct_helpers:merge_app_env(Config0, StatsEnv), []), + + rabbit_ct_broker_helpers:add_vhost(Config1, 0, <<"vhost-1">>, <<"guest">>), + rabbit_ct_broker_helpers:set_full_permissions(Config1, <<"vhost-1">>), + VHost1Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config1, 0, <<"vhost-1">>), + {ok, VHost1Ch} = amqp_connection:open_channel(VHost1Conn), + + rabbit_ct_broker_helpers:add_vhost(Config1, 0, <<"vhost-2">>, <<"guest">>), + rabbit_ct_broker_helpers:set_full_permissions(Config1, <<"vhost-2">>), + VHost2Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config1, 0, <<"vhost-2">>), + {ok, VHost2Ch} = amqp_connection:open_channel(VHost2Conn), + + DefaultCh = rabbit_ct_client_helpers:open_channel(Config1), + + [ + (fun () -> + QPart = case VHost of + <<"/">> -> <<"default">>; + _ -> VHost + end, + QName = << QPart/binary, "-", Q/binary>>, + #'queue.declare_ok'{} = amqp_channel:call(Ch, + #'queue.declare'{queue = QName, + durable = true + + }), + lists:foreach( fun (_) -> + amqp_channel:cast(Ch, + #'basic.publish'{routing_key = QName}, + #amqp_msg{payload = <<"msg">>}) + end, lists:seq(1, MsgNum) ) + end)() + || {VHost, Ch, MsgNum} <- [{<<"/">>, DefaultCh, 3}, {<<"vhost-1">>, VHost1Ch, 7}, {<<"vhost-2">>, VHost2Ch, 11}], + Q <- [ <<"queue-with-messages">>, <<"queue-with-consumer">> ] + ], + + DefaultConsumer = sleeping_consumer(), + #'basic.consume_ok'{consumer_tag = DefaultCTag} = + amqp_channel:subscribe(DefaultCh, #'basic.consume'{queue = <<"default-queue-with-consumer">>}, DefaultConsumer), + + VHost1Consumer = sleeping_consumer(), + #'basic.consume_ok'{consumer_tag = VHost1CTag} = + amqp_channel:subscribe(VHost1Ch, #'basic.consume'{queue = <<"vhost-1-queue-with-consumer">>}, VHost1Consumer), + + VHost2Consumer = sleeping_consumer(), + #'basic.consume_ok'{consumer_tag = VHost2CTag} = + amqp_channel:subscribe(VHost2Ch, #'basic.consume'{queue = <<"vhost-2-queue-with-consumer">>}, VHost2Consumer), + + timer:sleep(1000), + + Config1 ++ [ {default_consumer_pid, DefaultConsumer} + , {default_consumer_ctag, DefaultCTag} + , {default_channel, DefaultCh} + , {vhost1_consumer_pid, VHost1Consumer} + , {vhost1_consumer_ctag, VHost1CTag} + , {vhost1_channel, VHost1Ch} + , {vhost1_conn, VHost1Conn} + , {vhost2_consumer_pid, VHost2Consumer} + , {vhost2_consumer_ctag, VHost2CTag} + , {vhost2_channel, VHost2Ch} + , {vhost2_conn, VHost2Conn} + ]; init_per_group(aggregated_metrics, Config0) -> Config1 = rabbit_ct_helpers:merge_app_env( Config0, @@ -137,6 +211,28 @@ end_per_group(aggregated_metrics, Config) -> amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), rabbit_ct_client_helpers:close_channel(Ch), end_per_group_(Config); + +end_per_group(detailed_metrics, Config) -> + DefaultCh = ?config(default_channel, Config), + amqp_channel:call(DefaultCh, #'basic.cancel'{consumer_tag = ?config(default_consumer_ctag, Config)}), + ?config(default_consumer_pid, Config) ! stop, + rabbit_ct_client_helpers:close_channel(DefaultCh), + + VHost1Ch = ?config(vhost1_channel, Config), + amqp_channel:call(VHost1Ch, #'basic.cancel'{consumer_tag = ?config(vhost1_consumer_ctag, Config)}), + ?config(vhost1_consumer_pid, Config) ! stop, + amqp_channel:close(VHost1Ch), + amqp_connection:close(?config(vhost1_conn, Config)), + + VHost2Ch = ?config(vhost2_channel, Config), + amqp_channel:call(VHost2Ch, #'basic.cancel'{consumer_tag = ?config(vhost2_consumer_ctag, Config)}), + ?config(vhost2_consumer_pid, Config) ! stop, + amqp_channel:close(VHost2Ch), + amqp_connection:close(?config(vhost2_conn, Config)), + + %% Delete queues? + end_per_group_(Config); + end_per_group(_, Config) -> end_per_group_(Config). @@ -315,6 +411,88 @@ global_metrics_single_metric_family_test(Config) -> {match, MetricFamilyMatches} = re:run(Body, "TYPE rabbitmq_global_messages_acknowledged_total", [global]), ?assertEqual(1, length(MetricFamilyMatches)). +queue_consumer_count_single_vhost_per_object_test(Config) -> + {_, Body} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_consumer_count&per-object=1", [], 200), + + %% There should be exactly 2 metrics returned (2 queues in that vhost, `queue_consumer_count` has only single metric) + ?assertEqual(#{rabbitmq_detailed_queue_consumers => + #{#{queue => "vhost-1-queue-with-consumer",vhost => "vhost-1"} => [1], + #{queue => "vhost-1-queue-with-messages",vhost => "vhost-1"} => [0]}}, + parse_response(Body)), + ok. + +queue_consumer_count_all_vhosts_per_object_test(Config) -> + Expected = #{rabbitmq_detailed_queue_consumers => + #{#{queue => "vhost-1-queue-with-consumer",vhost => "vhost-1"} => [1], + #{queue => "vhost-1-queue-with-messages",vhost => "vhost-1"} => [0], + #{queue => "vhost-2-queue-with-consumer",vhost => "vhost-2"} => [1], + #{queue => "vhost-2-queue-with-messages",vhost => "vhost-2"} => [0], + #{queue => "default-queue-with-consumer",vhost => "/"} => [1], + #{queue => "default-queue-with-messages",vhost => "/"} => [0]}}, + + %% No vhost given, all should be returned + {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_consumer_count&per-object=1", [], 200), + ?assertEqual(Expected, parse_response(Body1)), + + %% Both vhosts are listed explicitly + {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&vhost=vhost-2&vhost=%2f&family=queue_consumer_count&per-object=1", [], 200), + ?assertEqual(Expected, parse_response(Body2)), + ok. + +queue_coarse_metrics_per_object_test(Config) -> + Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7], + #{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [7]}, + Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11], + #{queue => "vhost-2-queue-with-messages", vhost => "vhost-2"} => [11]}, + ExpectedD = #{#{queue => "default-queue-with-consumer", vhost => "/"} => [3], + #{queue => "default-queue-with-messages", vhost => "/"} => [3]}, + + {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_coarse_metrics", [], 200), + ?assertEqual(Expected1, + map_get(rabbitmq_detailed_queue_messages, parse_response(Body1))), + + {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200), + ?assertEqual(lists:foldl(fun maps:merge/2, #{}, [Expected1, Expected2, ExpectedD]), + map_get(rabbitmq_detailed_queue_messages, parse_response(Body2))), + + {_, Body3} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&vhost=vhost-2&family=queue_coarse_metrics", [], 200), + ?assertEqual(lists:foldl(fun maps:merge/2, #{}, [Expected1, Expected2]), + map_get(rabbitmq_detailed_queue_messages, parse_response(Body3))), + ok. + +queue_metrics_per_object_test(Config) -> + Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7], + #{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [7]}, + Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11], + #{queue => "vhost-2-queue-with-messages", vhost => "vhost-2"} => [11]}, + ExpectedD = #{#{queue => "default-queue-with-consumer", vhost => "/"} => [3], + #{queue => "default-queue-with-messages", vhost => "/"} => [3]}, + {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_metrics", [], 200), + ?assertEqual(Expected1, + map_get(rabbitmq_detailed_queue_messages_ram, parse_response(Body1))), + + {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=queue_metrics", [], 200), + ?assertEqual(lists:foldl(fun maps:merge/2, #{}, [Expected1, Expected2, ExpectedD]), + map_get(rabbitmq_detailed_queue_messages_ram, parse_response(Body2))), + + {_, Body3} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&vhost=vhost-2&family=queue_metrics", [], 200), + ?assertEqual(lists:foldl(fun maps:merge/2, #{}, [Expected1, Expected2]), + map_get(rabbitmq_detailed_queue_messages_ram, parse_response(Body3))), + ok. + +queue_consumer_count_and_queue_metrics_mutually_exclusive_test(Config) -> + {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_consumer_count&family=queue_metrics", [], 200), + ?assertEqual(#{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [1], + #{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [0]}, + map_get(rabbitmq_detailed_queue_consumers, parse_response(Body1))), + + ok. + +detailed_metrics_no_families_enabled_by_default(Config) -> + {_, Body} = http_get_with_pal(Config, "/metrics/detailed", [], 200), + ?assertEqual(#{}, parse_response(Body)), + ok. + http_get(Config, ReqHeaders, CodeExp) -> Path = proplists:get_value(prometheus_path, Config, "/metrics"), http_get(Config, Path, ReqHeaders, CodeExp). @@ -336,3 +514,43 @@ http_get_with_pal(Config, Path, ReqHeaders, CodeExp) -> %% Print and log response body - it makes is easier to find why a match failed ct:pal(Body), {Headers, Body}. + +parse_response(Body) -> + Lines = string:split(Body, "\n", all), + Metrics = [ parse_metric(L) + || L = [C|_] <- Lines, C /= $# + ], + lists:foldl(fun ({Metric, Label, Value}, MetricMap) -> + case string:prefix(atom_to_list(Metric), "telemetry") of + nomatch -> + OldLabelMap = maps:get(Metric, MetricMap, #{}), + OldValues = maps:get(Label, OldLabelMap, []), + NewValues = [Value|OldValues], + NewLabelMap = maps:put(Label, NewValues, OldLabelMap), + maps:put(Metric, NewLabelMap, MetricMap); + _ -> MetricMap + end + end, #{}, Metrics). + +parse_metric(M) -> + case string:lexemes(M, "{}" ) of + [Metric, Label, Value] -> + {list_to_atom(Metric), parse_label(Label), parse_value(string:trim(Value))}; + _ -> + [Metric, Value] = string:split(M, " "), + {list_to_atom(Metric), undefined, parse_value(string:trim(Value))} + end. + +parse_label(L) -> + Parts = string:split(L, ",", all), + maps:from_list([ parse_kv(P) || P <- Parts ]). + +parse_kv(KV) -> + [K, V] = string:split(KV, "="), + {list_to_atom(K), string:trim(V, both, [$"])}. + +parse_value(V) -> + case lists:all(fun (C) -> C >= $0 andalso C =< $9 end, V) of + true -> list_to_integer(V); + _ -> V + end. |