summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexey Lebedeff <binarin@binarin.info>2021-09-07 14:28:57 +0200
committerAlexey Lebedeff <binarin@binarin.info>2021-09-20 14:59:17 +0200
commit4bb2262140663b186c8d7beb57d10e3d86ef52ea (patch)
tree1279d722ab3f4c89bf83b0aa3d6541fbe1bbe5d0
parent8f207e3c5f729a364f8d31a85f8d6ea313922e6a (diff)
downloadrabbitmq-server-git-4bb2262140663b186c8d7beb57d10e3d86ef52ea.tar.gz
Allow selective querying for prometheus plugin
-rw-r--r--deps/rabbitmq_prometheus/README.md14
-rw-r--r--deps/rabbitmq_prometheus/metrics-detailed.md238
-rw-r--r--deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl256
-rw-r--r--deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl3
-rw-r--r--deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl45
-rw-r--r--deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl220
6 files changed, 685 insertions, 91 deletions
diff --git a/deps/rabbitmq_prometheus/README.md b/deps/rabbitmq_prometheus/README.md
index c65c2675bf..61a5a2b903 100644
--- a/deps/rabbitmq_prometheus/README.md
+++ b/deps/rabbitmq_prometheus/README.md
@@ -85,6 +85,20 @@ To go back to aggregated metrics on-the-fly, run the following command:
rabbitmqctl eval 'application:set_env(rabbitmq_prometheus, return_per_object_metrics, false).'
```
+## Selective querying of per-object metrics
+
+As mentioned in the previous section, returning a lot of per-object metrics is quite computationally expensive process. One of the reasons is that `/metrics/per-object` returns every possible metric for every possible object - even if having them makes no sense in the day-to-day monitoring activity.
+
+That's why there is an additional endpoint that always return per-object metrics and allows one to explicitly query only the things that are relevant - `/metrics/detailed`. By default it doesn't return anything at all, but it's possible to specify required metric groups and virtual host filters in the GET-parameters. Scraping `/metrics/detailed?vhost=vhost-1&vhost=vhost-2&family=queue_coarse_metrics&family=queue_consumer_count`. will only return requested metrics (and not, for example, channel metrics that include erlang PID in labels).
+
+This endpoint supports the following parameters:
+
+* Zero or more `family` - only the requested metric families will be returned. The full list is documented in [metrics-detailed](metrics-detailed.md).
+* Zero or more `vhost` - if it's given, queue related metrics (`queue_coarse_metrics`, `queue_consumer_count` and `queue_metrics`) will be returned only for given vhost(s).
+
+The returned metrics use different prefix `rabbitmq_detailed_` (instead of plain `rabbitmq_` used by other endpoints), so that endpoint can be used simultaneously with `/metrics`, and existing dashboards won't be affected.
+
+Here are the performance gains you can expect from using this endpoint. On a test system with 10k queues/10k consumer/10k producers, `/metrics/per-object` took a bit over 2 minutes. Querying `/metrics/detailed?family=queue_coarse_metrics&family=queue_consumer_count` provides just enough metrics to see how many messages sit in every queue and how much consumers each of these queues have. And it takes only 2 seconds, a significant improvement over indiscriminate `/metrics/per-object`.
## Contributing
diff --git a/deps/rabbitmq_prometheus/metrics-detailed.md b/deps/rabbitmq_prometheus/metrics-detailed.md
new file mode 100644
index 0000000000..262fca04bb
--- /dev/null
+++ b/deps/rabbitmq_prometheus/metrics-detailed.md
@@ -0,0 +1,238 @@
+## Configurable RabbitMQ metric groups
+
+Those are metrics than can be explicitly requested via `/metrics/detailed` endpoint.
+
+### Generic metrics
+
+These are some generic metrics, which do not refer to any specific
+queue/connection/etc.
+
+#### Connection/channel/queue churn
+
+Group `connection_churn_metrics`:
+
+| Metric | Description |
+|--------------------------------------------|--------------------------------------------------|
+| rabbitmq_detailed_connections_opened_total | Total number of connections opened |
+| rabbitmq_detailed_connections_closed_total | Total number of connections closed or terminated |
+| rabbitmq_detailed_channels_opened_total | Total number of channels opened |
+| rabbitmq_detailed_channels_closed_total | Total number of channels closed |
+| rabbitmq_detailed_queues_declared_total | Total number of queues declared |
+| rabbitmq_detailed_queues_created_total | Total number of queues created |
+| rabbitmq_detailed_queues_deleted_total | Total number of queues deleted |
+
+
+#### Erlang VM/Disk IO via RabbitMQ
+
+Group `node_coarse_metrics`:
+
+| Metric | Description |
+|-----------------------------------------------------------|-----------------------------------------------------------------------|
+| rabbitmq_detailed_process_open_fds | Open file descriptors |
+| rabbitmq_detailed_process_open_tcp_sockets | Open TCP sockets |
+| rabbitmq_detailed_process_resident_memory_bytes | Memory used in bytes |
+| rabbitmq_detailed_disk_space_available_bytes | Disk space available in bytes |
+| rabbitmq_detailed_erlang_processes_used | Erlang processes used |
+| rabbitmq_detailed_erlang_gc_runs_total | Total number of Erlang garbage collector runs |
+| rabbitmq_detailed_erlang_gc_reclaimed_bytes_total | Total number of bytes of memory reclaimed by Erlang garbage collector |
+| rabbitmq_detailed_erlang_scheduler_context_switches_total | Total number of Erlang scheduler context switches |
+
+Group `node_metrics`:
+
+| Metric | Description |
+|----------------------------------------------------|----------------------------------------|
+| rabbitmq_detailed_process_max_fds | Open file descriptors limit |
+| rabbitmq_detailed_process_max_tcp_sockets | Open TCP sockets limit |
+| rabbitmq_detailed_resident_memory_limit_bytes | Memory high watermark in bytes |
+| rabbitmq_detailed_disk_space_available_limit_bytes | Free disk space low watermark in bytes |
+| rabbitmq_detailed_erlang_processes_limit | Erlang processes limit |
+| rabbitmq_detailed_erlang_scheduler_run_queue | Erlang scheduler run queue |
+| rabbitmq_detailed_erlang_net_ticktime_seconds | Inter-node heartbeat interval |
+| rabbitmq_detailed_erlang_uptime_seconds | Node uptime |
+
+
+Group `node_persister_metrics`:
+
+| Metric | Description |
+|-------------------------------------------------------|------------------------------------------------------|
+| rabbitmq_detailed_io_read_ops_total | Total number of I/O read operations |
+| rabbitmq_detailed_io_read_bytes_total | Total number of I/O bytes read |
+| rabbitmq_detailed_io_write_ops_total | Total number of I/O write operations |
+| rabbitmq_detailed_io_write_bytes_total | Total number of I/O bytes written |
+| rabbitmq_detailed_io_sync_ops_total | Total number of I/O sync operations |
+| rabbitmq_detailed_io_seek_ops_total | Total number of I/O seek operations |
+| rabbitmq_detailed_io_open_attempt_ops_total | Total number of file open attempts |
+| rabbitmq_detailed_io_reopen_ops_total | Total number of times files have been reopened |
+| rabbitmq_detailed_schema_db_ram_tx_total | Total number of Schema DB memory transactions |
+| rabbitmq_detailed_schema_db_disk_tx_total | Total number of Schema DB disk transactions |
+| rabbitmq_detailed_msg_store_read_total | Total number of Message Store read operations |
+| rabbitmq_detailed_msg_store_write_total | Total number of Message Store write operations |
+| rabbitmq_detailed_queue_index_read_ops_total | Total number of Queue Index read operations |
+| rabbitmq_detailed_queue_index_write_ops_total | Total number of Queue Index write operations |
+| rabbitmq_detailed_queue_index_journal_write_ops_total | Total number of Queue Index Journal write operations |
+| rabbitmq_detailed_io_read_time_seconds_total | Total I/O read time |
+| rabbitmq_detailed_io_write_time_seconds_total | Total I/O write time |
+| rabbitmq_detailed_io_sync_time_seconds_total | Total I/O sync time |
+| rabbitmq_detailed_io_seek_time_seconds_total | Total I/O seek time |
+| rabbitmq_detailed_io_open_attempt_time_seconds_total | Total file open attempts time |
+
+
+#### Raft metrics
+
+Group `ra_metrics`:
+
+| Metric | Description |
+|-----------------------------------------------------|--------------------------------------------|
+| rabbitmq_detailed_raft_term_total | Current Raft term number |
+| rabbitmq_detailed_raft_log_snapshot_index | Raft log snapshot index |
+| rabbitmq_detailed_raft_log_last_applied_index | Raft log last applied index |
+| rabbitmq_detailed_raft_log_commit_index | Raft log commit index |
+| rabbitmq_detailed_raft_log_last_written_index | Raft log last written index |
+| rabbitmq_detailed_raft_entry_commit_latency_seconds | Time taken for a log entry to be committed |
+
+#### Auth metrics
+
+Group `auth_attempt_metrics`:
+
+| Metric | Description |
+|-------------------------------------------------|----------------------------------------------------|
+| rabbitmq_detailed_auth_attempts_total | Total number of authorization attempts |
+| rabbitmq_detailed_auth_attempts_succeeded_total | Total number of successful authentication attempts |
+| rabbitmq_detailed_auth_attempts_failed_total | Total number of failed authentication attempts |
+
+
+Group `auth_attempt_detailed_metrics` (when aggregated, it produces the same numbers as `auth_attempt_metrics` - so it's mutually exclusive with it in the aggregation mode):
+
+| Metric | Description |
+|----------------------------------------------------------|--------------------------------------------------------------------|
+| rabbitmq_detailed_auth_attempts_detailed_total | Total number of authorization attempts with source info |
+| rabbitmq_detailed_auth_attempts_detailed_succeeded_total | Total number of successful authorization attempts with source info |
+| rabbitmq_detailed_auth_attempts_detailed_failed_total | Total number of failed authorization attempts with source info |
+
+
+### Queue metrics
+
+Each of metrics in this group refers to a single queue in its label. Amount of data and performance totally depends on the number of queues.
+
+They are listed from least expensive to collect to the most expensive.
+
+#### Queue coarse metrics
+
+Group `queue_coarse_metrics`:
+
+| Metric | Description |
+|--------------------------------------------------|--------------------------------------------------------------|
+| rabbitmq_detailed_queue_messages_ready | Messages ready to be delivered to consumers |
+| rabbitmq_detailed_queue_messages_unacked | Messages delivered to consumers but not yet acknowledged |
+| rabbitmq_detailed_queue_messages | Sum of ready and unacknowledged messages - total queue depth |
+| rabbitmq_detailed_queue_process_reductions_total | Total number of queue process reductions |
+
+#### Per-queue consumer count
+
+Group `queue_consumer_count`. This is a strict subset of `queue_metrics` which contains only a single metric (if both `queue_consumer_count` and `queue_metrics` are requested, the former will be automatically skipped):
+
+| Metric | Description |
+|-----------------------------------|----------------------|
+| rabbitmq_detailed_queue_consumers | Consumers on a queue |
+
+This is one of the more telling metrics, and having it separately allows to skip some expensive operations for extracting/exposing the other metrics from the same datasource.
+
+#### Detailed queue metrics
+
+Group `queue_metrics` contains all the metrics for every queue, and can be relatively expensive to produce:
+
+| Metric | Description |
+|---------------------------------------------------|------------------------------------------------------------|
+| rabbitmq_detailed_queue_consumers | Consumers on a queue |
+| rabbitmq_detailed_queue_consumer_capacity | Consumer capacity |
+| rabbitmq_detailed_queue_consumer_utilisation | Same as consumer capacity |
+| rabbitmq_detailed_queue_process_memory_bytes | Memory in bytes used by the Erlang queue process |
+| rabbitmq_detailed_queue_messages_ram | Ready and unacknowledged messages stored in memory |
+| rabbitmq_detailed_queue_messages_ram_bytes | Size of ready and unacknowledged messages stored in memory |
+| rabbitmq_detailed_queue_messages_ready_ram | Ready messages stored in memory |
+| rabbitmq_detailed_queue_messages_unacked_ram | Unacknowledged messages stored in memory |
+| rabbitmq_detailed_queue_messages_persistent | Persistent messages |
+| rabbitmq_detailed_queue_messages_persistent_bytes | Size in bytes of persistent messages |
+| rabbitmq_detailed_queue_messages_bytes | Size in bytes of ready and unacknowledged messages |
+| rabbitmq_detailed_queue_messages_ready_bytes | Size in bytes of ready messages |
+| rabbitmq_detailed_queue_messages_unacked_bytes | Size in bytes of all unacknowledged messages |
+| rabbitmq_detailed_queue_messages_paged_out | Messages paged out to disk |
+| rabbitmq_detailed_queue_messages_paged_out_bytes | Size in bytes of messages paged out to disk |
+| rabbitmq_detailed_queue_disk_reads_total | Total number of times queue read messages from disk |
+| rabbitmq_detailed_queue_disk_writes_total | Total number of times queue wrote messages to disk |
+
+Tests show that performance difference between it and `queue_consumer_count` is approximately 8 times. E.g. on a test broker with 10k queues/producers/consumers, scrape time was ~8 second and ~1 respectively. So while it's expensive, it's not prohibitively so - especially compared to other metrics from per-connection/channel groups.
+
+### Connection/channel metrics
+
+All of those include Erlang PID in their label, which is rarely useful when ingested into Prometheus. And they are most expensive to produce, the most resources are spent by `/metrics/per-object` on these.
+
+#### Connection metrics
+
+Group `connection_coarse_metrics`:
+
+| Metric | Description |
+|-------------------------------------------------------|------------------------------------------------|
+| rabbitmq_detailed_connection_incoming_bytes_total | Total number of bytes received on a connection |
+| rabbitmq_detailed_connection_outgoing_bytes_total | Total number of bytes sent on a connection |
+| rabbitmq_detailed_connection_process_reductions_total | Total number of connection process reductions |
+
+Group `connection_metrics`:
+
+| Metric | Description |
+|-----------------------------------------------------|------------------------------------------------------|
+| rabbitmq_detailed_connection_incoming_packets_total | Total number of packets received on a connection |
+| rabbitmq_detailed_connection_outgoing_packets_total | Total number of packets sent on a connection |
+| rabbitmq_detailed_connection_pending_packets | Number of packets waiting to be sent on a connection |
+| rabbitmq_detailed_connection_channels | Channels on a connection |
+
+#### General channel metrics
+
+Group `channel_metrics`:
+
+| Metric | Description |
+|------------------------------------------------|-----------------------------------------------------------------------|
+| rabbitmq_detailed_channel_consumers | Consumers on a channel |
+| rabbitmq_detailed_channel_messages_unacked | Delivered but not yet acknowledged messages |
+| rabbitmq_detailed_channel_messages_unconfirmed | Published but not yet confirmed messages |
+| rabbitmq_detailed_channel_messages_uncommitted | Messages received in a transaction but not yet committed |
+| rabbitmq_detailed_channel_acks_uncommitted | Message acknowledgements in a transaction not yet committed |
+| rabbitmq_detailed_consumer_prefetch | Limit of unacknowledged messages for each consumer |
+| rabbitmq_detailed_channel_prefetch | Total limit of unacknowledged messages for all consumers on a channel |
+
+
+Group `channel_process_metrics`:
+
+| Metric | Description |
+|----------------------------------------------------|--------------------------------------------|
+| rabbitmq_detailed_channel_process_reductions_total | Total number of channel process reductions |
+
+
+#### Channel metrics with queue/exchange breakdowns
+
+Group `channel_exchange_metrics`:
+
+| Metric | Description |
+|--------------------------------------------------------------|--------------------------------------------------------------------------------------------------------------|
+| rabbitmq_detailed_channel_messages_published_total | Total number of messages published into an exchange on a channel |
+| rabbitmq_detailed_channel_messages_confirmed_total | Total number of messages published into an exchange and confirmed on the channel |
+| rabbitmq_detailed_channel_messages_unroutable_returned_total | Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable |
+| rabbitmq_detailed_channel_messages_unroutable_dropped_total | Total number of messages published as non-mandatory into an exchange and dropped as unroutable |
+
+Group `channel_queue_metrics`:
+
+| Metric | Description |
+|--------------------------------------------------------|-----------------------------------------------------------------------------------|
+| rabbitmq_detailed_channel_get_ack_total | Total number of messages fetched with basic.get in manual acknowledgement mode |
+| rabbitmq_detailed_channel_get_total | Total number of messages fetched with basic.get in automatic acknowledgement mode |
+| rabbitmq_detailed_channel_messages_delivered_ack_total | Total number of messages delivered to consumers in manual acknowledgement mode |
+| rabbitmq_detailed_channel_messages_delivered_total | Total number of messages delivered to consumers in automatic acknowledgement mode |
+| rabbitmq_detailed_channel_messages_redelivered_total | Total number of messages redelivered to consumers |
+| rabbitmq_detailed_channel_messages_acked_total | Total number of messages acknowledged by consumers |
+| rabbitmq_detailed_channel_get_empty_total | Total number of times basic.get operations fetched no message |
+
+Group `channel_queue_exchange_metrics`:
+
+| Metric | Description |
+|--------------------------------------------------|----------------------------------------------|
+| rabbitmq_detailed_queue_messages_published_total | Total number of messages published to queues |
diff --git a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
index f06ab643e3..bd1cd9067f 100644
--- a/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
+++ b/deps/rabbitmq_prometheus/src/collectors/prometheus_rabbitmq_core_metrics_collector.erl
@@ -20,14 +20,16 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-behaviour(prometheus_collector).
-
-%% Because all metrics are from RabbitMQ's perspective,
-%% cached for up to 5 seconds by default (configurable),
-%% we prepend rabbitmq_ to all metrics emitted by this collector.
-%% Some metrics are for Erlang (erlang_), Mnesia (schema_db_) or the System (io_),
+%% We prepend either rabbitmq_ or rabbitmq_detailed_ to all metrics emitted by this collector.
+%% As there are also some metrics for Erlang (erlang_), Mnesia (schema_db_) or the System (io_),
%% as observed by RabbitMQ.
+
+%% Used by `/metrics` and `/metrics/per-object`.
-define(METRIC_NAME_PREFIX, "rabbitmq_").
+%% Used by `/metrics/detailed` endpoint
+-define(DETAILED_METRIC_NAME_PREFIX, "rabbitmq_detailed_").
+
%% ==The source of these metrics can be found in the rabbit_core_metrics module==
%% The relevant files are:
%% * rabbit_common/src/rabbit_core_metrics.erl
@@ -52,37 +54,8 @@
-define(MICROSECOND, 1000000).
-define(METRICS_RAW, [
- {channel_metrics, [
- {2, undefined, channel_consumers, gauge, "Consumers on a channel", consumer_count},
- {2, undefined, channel_messages_unacked, gauge, "Delivered but not yet acknowledged messages", messages_unacknowledged},
- {2, undefined, channel_messages_unconfirmed, gauge, "Published but not yet confirmed messages", messages_unconfirmed},
- {2, undefined, channel_messages_uncommitted, gauge, "Messages received in a transaction but not yet committed", messages_uncommitted},
- {2, undefined, channel_acks_uncommitted, gauge, "Message acknowledgements in a transaction not yet committed", acks_uncommitted},
- {2, undefined, consumer_prefetch, gauge, "Limit of unacknowledged messages for each consumer", prefetch_count},
- {2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count}
- ]},
-
- {channel_exchange_metrics, [
- {2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"},
- {3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"},
- {4, undefined, channel_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"},
- {5, undefined, channel_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"}
- ]},
-
- {channel_process_metrics, [
- {2, undefined, channel_process_reductions_total, counter, "Total number of channel process reductions"}
- ]},
-
- {channel_queue_metrics, [
- {2, undefined, channel_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"},
- {3, undefined, channel_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"},
- {4, undefined, channel_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"},
- {5, undefined, channel_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"},
- {6, undefined, channel_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"},
- {7, undefined, channel_messages_acked_total, counter, "Total number of messages acknowledged by consumers"},
- {8, undefined, channel_get_empty_total, counter, "Total number of times basic.get operations fetched no message"}
- ]},
+%%% Those are global, i.e. they contain no reference to queue/vhost/channel
{connection_churn_metrics, [
{2, undefined, connections_opened_total, counter, "Total number of connections opened"},
{3, undefined, connections_closed_total, counter, "Total number of connections closed or terminated"},
@@ -92,24 +65,6 @@
{7, undefined, queues_created_total, counter, "Total number of queues created"},
{8, undefined, queues_deleted_total, counter, "Total number of queues deleted"}
]},
-
- {connection_coarse_metrics, [
- {2, undefined, connection_incoming_bytes_total, counter, "Total number of bytes received on a connection"},
- {3, undefined, connection_outgoing_bytes_total, counter, "Total number of bytes sent on a connection"},
- {4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
- ]},
-
- {connection_metrics, [
- {2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
- {2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
- {2, undefined, connection_pending_packets, gauge, "Number of packets waiting to be sent on a connection", send_pend},
- {2, undefined, connection_channels, gauge, "Channels on a connection", channels}
- ]},
-
- {channel_queue_exchange_metrics, [
- {2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"}
- ]},
-
{node_coarse_metrics, [
{2, undefined, process_open_fds, gauge, "Open file descriptors", fd_used},
{2, undefined, process_open_tcp_sockets, gauge, "Open TCP sockets", sockets_used},
@@ -120,7 +75,6 @@
{2, undefined, erlang_gc_reclaimed_bytes_total, counter, "Total number of bytes of memory reclaimed by Erlang garbage collector", gc_bytes_reclaimed},
{2, undefined, erlang_scheduler_context_switches_total, counter, "Total number of Erlang scheduler context switches", context_switches}
]},
-
{node_metrics, [
{2, undefined, process_max_fds, gauge, "Open file descriptors limit", fd_total},
{2, undefined, process_max_tcp_sockets, gauge, "Open TCP sockets limit", sockets_total},
@@ -164,6 +118,19 @@
{7, ?MILLISECOND, raft_entry_commit_latency_seconds, gauge, "Time taken for a log entry to be committed"}
]},
+ {auth_attempt_metrics, [
+ {2, undefined, auth_attempts_total, counter, "Total number of authorization attempts"},
+ {3, undefined, auth_attempts_succeeded_total, counter, "Total number of successful authentication attempts"},
+ {4, undefined, auth_attempts_failed_total, counter, "Total number of failed authentication attempts"}
+ ]},
+
+ {auth_attempt_detailed_metrics, [
+ {2, undefined, auth_attempts_detailed_total, counter, "Total number of authorization attempts with source info"},
+ {3, undefined, auth_attempts_detailed_succeeded_total, counter, "Total number of successful authorization attempts with source info"},
+ {4, undefined, auth_attempts_detailed_failed_total, counter, "Total number of failed authorization attempts with source info"}
+ ]},
+
+%%% Those metrics have reference only to a queue name. This is the only group where filtering (e.g. by vhost) makes sense.
{queue_coarse_metrics, [
{2, undefined, queue_messages_ready, gauge, "Messages ready to be delivered to consumers"},
{3, undefined, queue_messages_unacked, gauge, "Messages delivered to consumers but not yet acknowledged"},
@@ -171,6 +138,10 @@
{5, undefined, queue_process_reductions_total, counter, "Total number of queue process reductions"}
]},
+ {queue_consumer_count, [
+ {2, undefined, queue_consumers, gauge, "Consumers on a queue", consumers}
+ ]},
+
{queue_metrics, [
{2, undefined, queue_consumers, gauge, "Consumers on a queue", consumers},
{2, undefined, queue_consumer_capacity, gauge, "Consumer capacity", consumer_capacity},
@@ -191,18 +162,56 @@
{2, undefined, queue_disk_writes_total, counter, "Total number of times queue wrote messages to disk", disk_writes}
]},
- {auth_attempt_metrics, [
- {2, undefined, auth_attempts_total, counter, "Total number of authorization attempts"},
- {3, undefined, auth_attempts_succeeded_total, counter, "Total number of successful authentication attempts"},
- {4, undefined, auth_attempts_failed_total, counter, "Total number of failed authentication attempts"}
+%%% Metrics that contain reference to a channel. Some of them also have
+%%% a queue name, but in this case filtering on it doesn't make any
+%%% sense, as the queue is not an object of interest here.
+ {channel_metrics, [
+ {2, undefined, channel_consumers, gauge, "Consumers on a channel", consumer_count},
+ {2, undefined, channel_messages_unacked, gauge, "Delivered but not yet acknowledged messages", messages_unacknowledged},
+ {2, undefined, channel_messages_unconfirmed, gauge, "Published but not yet confirmed messages", messages_unconfirmed},
+ {2, undefined, channel_messages_uncommitted, gauge, "Messages received in a transaction but not yet committed", messages_uncommitted},
+ {2, undefined, channel_acks_uncommitted, gauge, "Message acknowledgements in a transaction not yet committed", acks_uncommitted},
+ {2, undefined, consumer_prefetch, gauge, "Limit of unacknowledged messages for each consumer", prefetch_count},
+ {2, undefined, channel_prefetch, gauge, "Total limit of unacknowledged messages for all consumers on a channel", global_prefetch_count}
]},
- {auth_attempt_detailed_metrics, [
- {2, undefined, auth_attempts_detailed_total, counter, "Total number of authorization attempts with source info"},
- {3, undefined, auth_attempts_detailed_succeeded_total, counter, "Total number of successful authorization attempts with source info"},
- {4, undefined, auth_attempts_detailed_failed_total, counter, "Total number of failed authorization attempts with source info"}
- ]}
+ {channel_exchange_metrics, [
+ {2, undefined, channel_messages_published_total, counter, "Total number of messages published into an exchange on a channel"},
+ {3, undefined, channel_messages_confirmed_total, counter, "Total number of messages published into an exchange and confirmed on the channel"},
+ {4, undefined, channel_messages_unroutable_returned_total, counter, "Total number of messages published as mandatory into an exchange and returned to the publisher as unroutable"},
+ {5, undefined, channel_messages_unroutable_dropped_total, counter, "Total number of messages published as non-mandatory into an exchange and dropped as unroutable"}
+ ]},
+
+ {channel_process_metrics, [
+ {2, undefined, channel_process_reductions_total, counter, "Total number of channel process reductions"}
+ ]},
+ {channel_queue_metrics, [
+ {2, undefined, channel_get_ack_total, counter, "Total number of messages fetched with basic.get in manual acknowledgement mode"},
+ {3, undefined, channel_get_total, counter, "Total number of messages fetched with basic.get in automatic acknowledgement mode"},
+ {4, undefined, channel_messages_delivered_ack_total, counter, "Total number of messages delivered to consumers in manual acknowledgement mode"},
+ {5, undefined, channel_messages_delivered_total, counter, "Total number of messages delivered to consumers in automatic acknowledgement mode"},
+ {6, undefined, channel_messages_redelivered_total, counter, "Total number of messages redelivered to consumers"},
+ {7, undefined, channel_messages_acked_total, counter, "Total number of messages acknowledged by consumers"},
+ {8, undefined, channel_get_empty_total, counter, "Total number of times basic.get operations fetched no message"}
+ ]},
+
+ {connection_coarse_metrics, [
+ {2, undefined, connection_incoming_bytes_total, counter, "Total number of bytes received on a connection"},
+ {3, undefined, connection_outgoing_bytes_total, counter, "Total number of bytes sent on a connection"},
+ {4, undefined, connection_process_reductions_total, counter, "Total number of connection process reductions"}
+ ]},
+
+ {connection_metrics, [
+ {2, undefined, connection_incoming_packets_total, counter, "Total number of packets received on a connection", recv_cnt},
+ {2, undefined, connection_outgoing_packets_total, counter, "Total number of packets sent on a connection", send_cnt},
+ {2, undefined, connection_pending_packets, gauge, "Number of packets waiting to be sent on a connection", send_pend},
+ {2, undefined, connection_channels, gauge, "Channels on a connection", channels}
+ ]},
+
+ {channel_queue_exchange_metrics, [
+ {2, undefined, queue_messages_published_total, counter, "Total number of messages published to queues"}
+ ]}
]).
-define(TOTALS, [
@@ -222,29 +231,45 @@ register() ->
deregister_cleanup(_) -> ok.
+collect_mf('detailed', Callback) ->
+ collect(true, ?DETAILED_METRIC_NAME_PREFIX, vhosts_filter_from_pdict(), enabled_mfs_from_pdict(), Callback),
+ ok;
collect_mf('per-object', Callback) ->
- collect(true, Callback);
+ collect(true, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
+ totals(Callback),
+ ok;
collect_mf(_Registry, Callback) ->
PerObjectMetrics = application:get_env(rabbitmq_prometheus, return_per_object_metrics, false),
- collect(PerObjectMetrics, Callback).
+ collect(PerObjectMetrics, ?METRIC_NAME_PREFIX, false, ?METRICS_RAW, Callback),
+ totals(Callback),
+ ok.
-collect(PerObjectMetrics, Callback) ->
+collect(PerObjectMetrics, Prefix, VHostsFilter, IncludedMFs, Callback) ->
[begin
- Data = get_data(Table, PerObjectMetrics),
- mf(Callback, Contents, Data)
- end || {Table, Contents} <- ?METRICS_RAW, include_when_per_object_metrics(PerObjectMetrics, Table)],
+ Data = get_data(Table, PerObjectMetrics, VHostsFilter),
+ mf(Callback, Prefix, Contents, Data)
+ end || {Table, Contents} <- IncludedMFs, not mutually_exclusive_mf(PerObjectMetrics, Table, IncludedMFs)].
+
+totals(Callback) ->
[begin
Size = ets:info(Table, size),
mf_totals(Callback, Name, Type, Help, Size)
end || {Table, Name, Type, Help} <- ?TOTALS],
add_metric_family(build_info(), Callback),
- add_metric_family(identity_info(), Callback),
- ok.
-
-include_when_per_object_metrics(false, auth_attempt_detailed_metrics) ->
- false;
-include_when_per_object_metrics(_, _) ->
- true.
+ add_metric_family(identity_info(), Callback).
+
+%% Aggregated `auth``_attempt_detailed_metrics` and
+%% `auth_attempt_metrics` are the same numbers. The former is just
+%% more computationally intensive.
+mutually_exclusive_mf(false, auth_attempt_detailed_metrics, _) ->
+ true;
+%% `queue_consumer_count` is a strict subset of queue metrics. They
+%% read from the same table, but `queue_consumer_count` skips a lot of
+%% `proplists:get_value/2` calls.
+mutually_exclusive_mf(_, queue_consumer_count, MFs) ->
+ lists:keymember(queue_metrics, 1, MFs);
+mutually_exclusive_mf(_, _, _) ->
+ false.
build_info() ->
ProductInfo = rabbit:product_info(),
@@ -296,7 +321,7 @@ identity_info() ->
add_metric_family({Name, Type, Help, Metrics}, Callback) ->
Callback(create_mf(?METRIC_NAME(Name), Help, Type, Metrics)).
-mf(Callback, Contents, Data) ->
+mf(Callback, Prefix, Contents, Data) ->
[begin
Fun = case Conversion of
undefined ->
@@ -306,7 +331,7 @@ mf(Callback, Contents, Data) ->
end,
Callback(
create_mf(
- ?METRIC_NAME(Name),
+ [Prefix, prometheus_model_helpers:metric_name(Name)],
Help,
catch_boolean(Type),
?MODULE,
@@ -323,7 +348,7 @@ mf(Callback, Contents, Data) ->
end,
Callback(
create_mf(
- ?METRIC_NAME(Name),
+ [Prefix, prometheus_model_helpers:metric_name(Name)],
Help,
catch_boolean(Type),
?MODULE,
@@ -416,7 +441,7 @@ emit_gauge_metric_if_defined(Labels, Value) ->
gauge_metric(Labels, Value)
end.
-get_data(connection_metrics = Table, false) ->
+get_data(connection_metrics = Table, false, _) ->
{Table, A1, A2, A3, A4} = ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4}) ->
{T,
sum(proplists:get_value(recv_cnt, Props), A1),
@@ -425,7 +450,7 @@ get_data(connection_metrics = Table, false) ->
sum(proplists:get_value(channels, Props), A4)}
end, empty(Table), Table),
[{Table, [{recv_cnt, A1}, {send_cnt, A2}, {send_pend, A3}, {channels, A4}]}];
-get_data(channel_metrics = Table, false) ->
+get_data(channel_metrics = Table, false, _) ->
{Table, A1, A2, A3, A4, A5, A6, A7} =
ets:foldl(fun({_, Props}, {T, A1, A2, A3, A4, A5, A6, A7}) ->
{T,
@@ -440,10 +465,24 @@ get_data(channel_metrics = Table, false) ->
[{Table, [{consumer_count, A1}, {messages_unacknowledged, A2}, {messages_unconfirmed, A3},
{messages_uncommitted, A4}, {acks_uncommitted, A5}, {prefetch_count, A6},
{global_prefetch_count, A7}]}];
-get_data(queue_metrics = Table, false) ->
+get_data(queue_consumer_count = MF, false, VHostsFilter) ->
+ Table = queue_metrics, %% Real table name
+ {_, A1} = ets:foldl(fun
+ ({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
+ Acc;
+ ({_, Props, _}, {T, A1}) ->
+ {T,
+ sum(proplists:get_value(consumers, Props), A1)
+ }
+ end, empty(MF), Table),
+ [{Table, [{consumers, A1}]}];
+get_data(queue_metrics = Table, false, VHostsFilter) ->
{Table, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10, A11, A12, A13, A14, A15, A16} =
- ets:foldl(fun({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10,
- A11, A12, A13, A14, A15, A16}) ->
+ ets:foldl(fun
+ ({#resource{kind = queue, virtual_host = VHost}, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
+ Acc;
+ ({_, Props, _}, {T, A1, A2, A3, A4, A5, A6, A7, A8, A9, A10,
+ A11, A12, A13, A14, A15, A16}) ->
{T,
sum(proplists:get_value(consumers, Props), A1),
sum(proplists:get_value(consumer_utilisation, Props), A2),
@@ -470,14 +509,18 @@ get_data(queue_metrics = Table, false) ->
{message_bytes_ready, A11}, {message_bytes_unacknowledged, A12},
{messages_paged_out, A13}, {message_bytes_paged_out, A14},
{disk_reads, A15}, {disk_writes, A16}]}];
-get_data(Table, false) when Table == channel_exchange_metrics;
+get_data(Table, false, VHostsFilter) when Table == channel_exchange_metrics;
Table == queue_coarse_metrics;
Table == channel_queue_metrics;
Table == connection_coarse_metrics;
Table == channel_queue_exchange_metrics;
Table == ra_metrics;
Table == channel_process_metrics ->
- Result = ets:foldl(fun({_, V1}, {T, A1}) ->
+ Result = ets:foldl(fun
+ %% For queue_coarse_metrics
+ ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _}, Acc) when is_map(VHostsFilter), map_get(VHost, VHostsFilter) == false ->
+ Acc;
+ ({_, V1}, {T, A1}) ->
{T, V1 + A1};
({_, V1, _}, {T, A1}) ->
{T, V1 + A1};
@@ -503,7 +546,24 @@ get_data(Table, false) when Table == channel_exchange_metrics;
_ ->
[Result]
end;
-get_data(Table, _) ->
+get_data(queue_coarse_metrics = Table, true, VHostsFilter) when is_map(VHostsFilter) ->
+ ets:foldl(fun
+ ({#resource{kind = queue, virtual_host = VHost}, _, _, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
+ [Row|Acc];
+ (_, Acc) ->
+ Acc
+ end, [], Table);
+get_data(MF, true, VHostsFilter) when is_map(VHostsFilter), MF == queue_metrics orelse MF == queue_consumer_count ->
+ Table = queue_metrics,
+ ets:foldl(fun
+ ({#resource{kind = queue, virtual_host = VHost}, _, _} = Row, Acc) when map_get(VHost, VHostsFilter) ->
+ [Row|Acc];
+ (_, Acc) ->
+ Acc
+ end, [], Table);
+get_data(queue_consumer_count, true, _) ->
+ ets:tab2list(queue_metrics);
+get_data(Table, _, _) ->
ets:tab2list(Table).
division(0, 0) ->
@@ -514,7 +574,7 @@ division(A, B) ->
accumulate_count_and_sum(Value, {Count, Sum}) ->
{Count + 1, Sum + Value}.
-empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics ->
+empty(T) when T == channel_queue_exchange_metrics; T == channel_process_metrics; T == queue_consumer_count ->
{T, 0};
empty(T) when T == connection_coarse_metrics; T == auth_attempt_metrics; T == auth_attempt_detailed_metrics ->
{T, 0, 0, 0};
@@ -533,3 +593,23 @@ sum('', B) ->
B;
sum(A, B) ->
A + B.
+
+enabled_mfs_from_pdict() ->
+ case get(prometheus_mf_filter) of
+ undefined ->
+ [];
+ MFNames ->
+ MFNameSet = sets:from_list(MFNames),
+ [ MF || MF = {Table, _} <- ?METRICS_RAW, sets:is_element(Table, MFNameSet) ]
+ end.
+
+vhosts_filter_from_pdict() ->
+ case get(prometheus_vhost_filter) of
+ undefined ->
+ false;
+ L ->
+ %% Having both excluded and included hosts in this map makes some guards easier (or even possible).
+ All = maps:from_list([ {VHost, false} || VHost <- rabbit_vhost:list()]),
+ Enabled = maps:from_list([ {VHost, true} || VHost <- L ]),
+ maps:merge(All, Enabled)
+ end.
diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl
index bf397d8ca1..45f5bb738d 100644
--- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl
+++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_dispatcher.erl
@@ -27,6 +27,9 @@ build_dispatcher() ->
prometheus_rabbitmq_core_metrics_collector,
prometheus_rabbitmq_global_metrics_collector
]),
+ prometheus_registry:register_collectors('detailed', [
+ prometheus_rabbitmq_core_metrics_collector
+ ]),
rabbit_prometheus_handler:setup(),
cowboy_router:compile([{'_', dispatcher()}]).
diff --git a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
index c2f8ddb080..954e2f878d 100644
--- a/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
+++ b/deps/rabbitmq_prometheus/src/rabbit_prometheus_handler.erl
@@ -32,7 +32,8 @@ is_authorized(ReqData, Context) ->
setup() ->
setup_metrics(telemetry_registry()),
- setup_metrics('per-object').
+ setup_metrics('per-object'),
+ setup_metrics('detailed').
setup_metrics(Registry) ->
ScrapeDuration = [{name, ?SCRAPE_DURATION},
@@ -67,6 +68,7 @@ gen_response(<<"GET">>, Request) ->
false ->
cowboy_req:reply(404, #{}, <<"Unknown Registry">>, Request);
Registry ->
+ put_filtering_options_into_process_dictionary(Request),
gen_metrics_response(Registry, Request)
end;
gen_response(_, Request) ->
@@ -146,4 +148,43 @@ encode_format(ContentType, Encoding, Scrape, Registry) ->
encode_format_("gzip", Scrape) ->
zlib:gzip(Scrape);
encode_format_("identity", Scrape) ->
- Scrape.
+ Scrape.
+
+%% It's not easy to pass this information in a pure way (it'll require changing prometheus.erl)
+put_filtering_options_into_process_dictionary(Request) ->
+ #{vhost := VHosts, family := Families} = cowboy_req:match_qs([{vhost, [], undefined}, {family, [], undefined}], Request),
+ case parse_vhosts(VHosts) of
+ Vs when is_list(Vs) ->
+ put(prometheus_vhost_filter, Vs);
+ _ -> ok
+ end,
+ case parse_metric_families(Families) of
+ Fs when is_list(Fs) ->
+ put(prometheus_mf_filter, Fs);
+ _ -> ok
+ end,
+ ok.
+
+parse_vhosts(N) when is_binary(N) ->
+ parse_vhosts([N]);
+parse_vhosts(L) when is_list(L) ->
+ [ VHostName || VHostName <- L, rabbit_vhost:exists(VHostName)];
+parse_vhosts(_) ->
+ false.
+
+parse_metric_families(N) when is_binary(N) ->
+ parse_metric_families([N]);
+parse_metric_families([]) ->
+ [];
+parse_metric_families([B|Bs]) ->
+ %% binary_to_existing_atom() should be enough, as it's used for filtering things out.
+ %% Getting a full list of supported metrics would be harder.
+ %% NB But on the other hand, it's nice to have validation. Implement it?
+ case catch erlang:binary_to_existing_atom(B) of
+ A when is_atom(A) ->
+ [A|parse_metric_families(Bs)];
+ _ ->
+ parse_metric_families(Bs)
+ end;
+parse_metric_families(_) ->
+ false.
diff --git a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
index f713e36a72..b903410057 100644
--- a/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
+++ b/deps/rabbitmq_prometheus/test/rabbit_prometheus_http_SUITE.erl
@@ -22,7 +22,8 @@ all() ->
{group, aggregated_metrics},
{group, per_object_metrics},
{group, per_object_endpoint_metrics},
- {group, commercial}
+ {group, commercial},
+ {group, detailed_metrics}
].
groups() ->
@@ -48,6 +49,14 @@ groups() ->
]},
{commercial, [], [
build_info_product_test
+ ]},
+ {detailed_metrics, [], [
+ detailed_metrics_no_families_enabled_by_default,
+ queue_consumer_count_single_vhost_per_object_test,
+ queue_consumer_count_all_vhosts_per_object_test,
+ queue_coarse_metrics_per_object_test,
+ queue_metrics_per_object_test,
+ queue_consumer_count_and_queue_metrics_mutually_exclusive_test
]}
].
@@ -84,6 +93,71 @@ init_per_group(per_object_endpoint_metrics, Config0) ->
]},
Config1 = rabbit_ct_helpers:merge_app_env(Config0, PathConfig),
init_per_group(aggregated_metrics, Config1);
+init_per_group(detailed_metrics, Config0) ->
+ StatsEnv = {rabbit, [{collect_statistics, coarse}, {collect_statistics_interval, 100}]},
+
+ Config1 = init_per_group(detailed_metrics, rabbit_ct_helpers:merge_app_env(Config0, StatsEnv), []),
+
+ rabbit_ct_broker_helpers:add_vhost(Config1, 0, <<"vhost-1">>, <<"guest">>),
+ rabbit_ct_broker_helpers:set_full_permissions(Config1, <<"vhost-1">>),
+ VHost1Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config1, 0, <<"vhost-1">>),
+ {ok, VHost1Ch} = amqp_connection:open_channel(VHost1Conn),
+
+ rabbit_ct_broker_helpers:add_vhost(Config1, 0, <<"vhost-2">>, <<"guest">>),
+ rabbit_ct_broker_helpers:set_full_permissions(Config1, <<"vhost-2">>),
+ VHost2Conn = rabbit_ct_client_helpers:open_unmanaged_connection(Config1, 0, <<"vhost-2">>),
+ {ok, VHost2Ch} = amqp_connection:open_channel(VHost2Conn),
+
+ DefaultCh = rabbit_ct_client_helpers:open_channel(Config1),
+
+ [
+ (fun () ->
+ QPart = case VHost of
+ <<"/">> -> <<"default">>;
+ _ -> VHost
+ end,
+ QName = << QPart/binary, "-", Q/binary>>,
+ #'queue.declare_ok'{} = amqp_channel:call(Ch,
+ #'queue.declare'{queue = QName,
+ durable = true
+
+ }),
+ lists:foreach( fun (_) ->
+ amqp_channel:cast(Ch,
+ #'basic.publish'{routing_key = QName},
+ #amqp_msg{payload = <<"msg">>})
+ end, lists:seq(1, MsgNum) )
+ end)()
+ || {VHost, Ch, MsgNum} <- [{<<"/">>, DefaultCh, 3}, {<<"vhost-1">>, VHost1Ch, 7}, {<<"vhost-2">>, VHost2Ch, 11}],
+ Q <- [ <<"queue-with-messages">>, <<"queue-with-consumer">> ]
+ ],
+
+ DefaultConsumer = sleeping_consumer(),
+ #'basic.consume_ok'{consumer_tag = DefaultCTag} =
+ amqp_channel:subscribe(DefaultCh, #'basic.consume'{queue = <<"default-queue-with-consumer">>}, DefaultConsumer),
+
+ VHost1Consumer = sleeping_consumer(),
+ #'basic.consume_ok'{consumer_tag = VHost1CTag} =
+ amqp_channel:subscribe(VHost1Ch, #'basic.consume'{queue = <<"vhost-1-queue-with-consumer">>}, VHost1Consumer),
+
+ VHost2Consumer = sleeping_consumer(),
+ #'basic.consume_ok'{consumer_tag = VHost2CTag} =
+ amqp_channel:subscribe(VHost2Ch, #'basic.consume'{queue = <<"vhost-2-queue-with-consumer">>}, VHost2Consumer),
+
+ timer:sleep(1000),
+
+ Config1 ++ [ {default_consumer_pid, DefaultConsumer}
+ , {default_consumer_ctag, DefaultCTag}
+ , {default_channel, DefaultCh}
+ , {vhost1_consumer_pid, VHost1Consumer}
+ , {vhost1_consumer_ctag, VHost1CTag}
+ , {vhost1_channel, VHost1Ch}
+ , {vhost1_conn, VHost1Conn}
+ , {vhost2_consumer_pid, VHost2Consumer}
+ , {vhost2_consumer_ctag, VHost2CTag}
+ , {vhost2_channel, VHost2Ch}
+ , {vhost2_conn, VHost2Conn}
+ ];
init_per_group(aggregated_metrics, Config0) ->
Config1 = rabbit_ct_helpers:merge_app_env(
Config0,
@@ -137,6 +211,28 @@ end_per_group(aggregated_metrics, Config) ->
amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}),
rabbit_ct_client_helpers:close_channel(Ch),
end_per_group_(Config);
+
+end_per_group(detailed_metrics, Config) ->
+ DefaultCh = ?config(default_channel, Config),
+ amqp_channel:call(DefaultCh, #'basic.cancel'{consumer_tag = ?config(default_consumer_ctag, Config)}),
+ ?config(default_consumer_pid, Config) ! stop,
+ rabbit_ct_client_helpers:close_channel(DefaultCh),
+
+ VHost1Ch = ?config(vhost1_channel, Config),
+ amqp_channel:call(VHost1Ch, #'basic.cancel'{consumer_tag = ?config(vhost1_consumer_ctag, Config)}),
+ ?config(vhost1_consumer_pid, Config) ! stop,
+ amqp_channel:close(VHost1Ch),
+ amqp_connection:close(?config(vhost1_conn, Config)),
+
+ VHost2Ch = ?config(vhost2_channel, Config),
+ amqp_channel:call(VHost2Ch, #'basic.cancel'{consumer_tag = ?config(vhost2_consumer_ctag, Config)}),
+ ?config(vhost2_consumer_pid, Config) ! stop,
+ amqp_channel:close(VHost2Ch),
+ amqp_connection:close(?config(vhost2_conn, Config)),
+
+ %% Delete queues?
+ end_per_group_(Config);
+
end_per_group(_, Config) ->
end_per_group_(Config).
@@ -315,6 +411,88 @@ global_metrics_single_metric_family_test(Config) ->
{match, MetricFamilyMatches} = re:run(Body, "TYPE rabbitmq_global_messages_acknowledged_total", [global]),
?assertEqual(1, length(MetricFamilyMatches)).
+queue_consumer_count_single_vhost_per_object_test(Config) ->
+ {_, Body} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_consumer_count&per-object=1", [], 200),
+
+ %% There should be exactly 2 metrics returned (2 queues in that vhost, `queue_consumer_count` has only single metric)
+ ?assertEqual(#{rabbitmq_detailed_queue_consumers =>
+ #{#{queue => "vhost-1-queue-with-consumer",vhost => "vhost-1"} => [1],
+ #{queue => "vhost-1-queue-with-messages",vhost => "vhost-1"} => [0]}},
+ parse_response(Body)),
+ ok.
+
+queue_consumer_count_all_vhosts_per_object_test(Config) ->
+ Expected = #{rabbitmq_detailed_queue_consumers =>
+ #{#{queue => "vhost-1-queue-with-consumer",vhost => "vhost-1"} => [1],
+ #{queue => "vhost-1-queue-with-messages",vhost => "vhost-1"} => [0],
+ #{queue => "vhost-2-queue-with-consumer",vhost => "vhost-2"} => [1],
+ #{queue => "vhost-2-queue-with-messages",vhost => "vhost-2"} => [0],
+ #{queue => "default-queue-with-consumer",vhost => "/"} => [1],
+ #{queue => "default-queue-with-messages",vhost => "/"} => [0]}},
+
+ %% No vhost given, all should be returned
+ {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?family=queue_consumer_count&per-object=1", [], 200),
+ ?assertEqual(Expected, parse_response(Body1)),
+
+ %% Both vhosts are listed explicitly
+ {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&vhost=vhost-2&vhost=%2f&family=queue_consumer_count&per-object=1", [], 200),
+ ?assertEqual(Expected, parse_response(Body2)),
+ ok.
+
+queue_coarse_metrics_per_object_test(Config) ->
+ Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7],
+ #{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [7]},
+ Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11],
+ #{queue => "vhost-2-queue-with-messages", vhost => "vhost-2"} => [11]},
+ ExpectedD = #{#{queue => "default-queue-with-consumer", vhost => "/"} => [3],
+ #{queue => "default-queue-with-messages", vhost => "/"} => [3]},
+
+ {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_coarse_metrics", [], 200),
+ ?assertEqual(Expected1,
+ map_get(rabbitmq_detailed_queue_messages, parse_response(Body1))),
+
+ {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=queue_coarse_metrics", [], 200),
+ ?assertEqual(lists:foldl(fun maps:merge/2, #{}, [Expected1, Expected2, ExpectedD]),
+ map_get(rabbitmq_detailed_queue_messages, parse_response(Body2))),
+
+ {_, Body3} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&vhost=vhost-2&family=queue_coarse_metrics", [], 200),
+ ?assertEqual(lists:foldl(fun maps:merge/2, #{}, [Expected1, Expected2]),
+ map_get(rabbitmq_detailed_queue_messages, parse_response(Body3))),
+ ok.
+
+queue_metrics_per_object_test(Config) ->
+ Expected1 = #{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [7],
+ #{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [7]},
+ Expected2 = #{#{queue => "vhost-2-queue-with-consumer", vhost => "vhost-2"} => [11],
+ #{queue => "vhost-2-queue-with-messages", vhost => "vhost-2"} => [11]},
+ ExpectedD = #{#{queue => "default-queue-with-consumer", vhost => "/"} => [3],
+ #{queue => "default-queue-with-messages", vhost => "/"} => [3]},
+ {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_metrics", [], 200),
+ ?assertEqual(Expected1,
+ map_get(rabbitmq_detailed_queue_messages_ram, parse_response(Body1))),
+
+ {_, Body2} = http_get_with_pal(Config, "/metrics/detailed?family=queue_metrics", [], 200),
+ ?assertEqual(lists:foldl(fun maps:merge/2, #{}, [Expected1, Expected2, ExpectedD]),
+ map_get(rabbitmq_detailed_queue_messages_ram, parse_response(Body2))),
+
+ {_, Body3} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&vhost=vhost-2&family=queue_metrics", [], 200),
+ ?assertEqual(lists:foldl(fun maps:merge/2, #{}, [Expected1, Expected2]),
+ map_get(rabbitmq_detailed_queue_messages_ram, parse_response(Body3))),
+ ok.
+
+queue_consumer_count_and_queue_metrics_mutually_exclusive_test(Config) ->
+ {_, Body1} = http_get_with_pal(Config, "/metrics/detailed?vhost=vhost-1&family=queue_consumer_count&family=queue_metrics", [], 200),
+ ?assertEqual(#{#{queue => "vhost-1-queue-with-consumer", vhost => "vhost-1"} => [1],
+ #{queue => "vhost-1-queue-with-messages", vhost => "vhost-1"} => [0]},
+ map_get(rabbitmq_detailed_queue_consumers, parse_response(Body1))),
+
+ ok.
+
+detailed_metrics_no_families_enabled_by_default(Config) ->
+ {_, Body} = http_get_with_pal(Config, "/metrics/detailed", [], 200),
+ ?assertEqual(#{}, parse_response(Body)),
+ ok.
+
http_get(Config, ReqHeaders, CodeExp) ->
Path = proplists:get_value(prometheus_path, Config, "/metrics"),
http_get(Config, Path, ReqHeaders, CodeExp).
@@ -336,3 +514,43 @@ http_get_with_pal(Config, Path, ReqHeaders, CodeExp) ->
%% Print and log response body - it makes is easier to find why a match failed
ct:pal(Body),
{Headers, Body}.
+
+parse_response(Body) ->
+ Lines = string:split(Body, "\n", all),
+ Metrics = [ parse_metric(L)
+ || L = [C|_] <- Lines, C /= $#
+ ],
+ lists:foldl(fun ({Metric, Label, Value}, MetricMap) ->
+ case string:prefix(atom_to_list(Metric), "telemetry") of
+ nomatch ->
+ OldLabelMap = maps:get(Metric, MetricMap, #{}),
+ OldValues = maps:get(Label, OldLabelMap, []),
+ NewValues = [Value|OldValues],
+ NewLabelMap = maps:put(Label, NewValues, OldLabelMap),
+ maps:put(Metric, NewLabelMap, MetricMap);
+ _ -> MetricMap
+ end
+ end, #{}, Metrics).
+
+parse_metric(M) ->
+ case string:lexemes(M, "{}" ) of
+ [Metric, Label, Value] ->
+ {list_to_atom(Metric), parse_label(Label), parse_value(string:trim(Value))};
+ _ ->
+ [Metric, Value] = string:split(M, " "),
+ {list_to_atom(Metric), undefined, parse_value(string:trim(Value))}
+ end.
+
+parse_label(L) ->
+ Parts = string:split(L, ",", all),
+ maps:from_list([ parse_kv(P) || P <- Parts ]).
+
+parse_kv(KV) ->
+ [K, V] = string:split(KV, "="),
+ {list_to_atom(K), string:trim(V, both, [$"])}.
+
+parse_value(V) ->
+ case lists:all(fun (C) -> C >= $0 andalso C =< $9 end, V) of
+ true -> list_to_integer(V);
+ _ -> V
+ end.