diff options
author | Karl Nilsson <kjnilsson@gmail.com> | 2021-04-13 16:59:29 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-04-13 16:59:29 +0100 |
commit | 5bb6bc80e1d45427b99ba2f4503023d4e163d4bc (patch) | |
tree | 62b5be6bb29069fdceee21bad01feadf6512c623 | |
parent | bfa1e4ec06cd33e271328b0f189ed0dbe7bab8ba (diff) | |
parent | 2f1f7c5288d6c4ba9dccafbd769ecad3981bf79a (diff) | |
download | rabbitmq-server-git-5bb6bc80e1d45427b99ba2f4503023d4e163d4bc.tar.gz |
Merge pull request #2936 from rabbitmq/stream-management
Observability fixes & enhancements for stream queues
-rw-r--r-- | deps/rabbit/src/rabbit_osiris_metrics.erl | 4 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_stream_queue.erl | 39 | ||||
-rw-r--r-- | deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs | 21 |
3 files changed, 61 insertions, 3 deletions
diff --git a/deps/rabbit/src/rabbit_osiris_metrics.erl b/deps/rabbit/src/rabbit_osiris_metrics.erl index 4c3f3415b1..9cf4fdc256 100644 --- a/deps/rabbit/src/rabbit_osiris_metrics.erl +++ b/deps/rabbit/src/rabbit_osiris_metrics.erl @@ -32,7 +32,9 @@ state, leader, online, - members + members, + memory, + readers ]). -record(state, {timeout :: non_neg_integer()}). diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl index f360d04edc..05977c3a58 100644 --- a/deps/rabbit/src/rabbit_stream_queue.erl +++ b/deps/rabbit/src/rabbit_stream_queue.erl @@ -45,6 +45,7 @@ delete_replica/3]). -export([format_osiris_event/2]). -export([update_stream_conf/2]). +-export([readers/1]). -export([status/2, tracking_status/2]). @@ -54,7 +55,7 @@ -define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state, messages, messages_ready, messages_unacknowledged, committed_offset, - policy, operator_policy, effective_policy_definition, type]). + policy, operator_policy, effective_policy_definition, type, memory]). -type appender_seq() :: non_neg_integer(). @@ -429,6 +430,21 @@ i(leader, Q) when ?is_amqqueue(Q) -> i(members, Q) when ?is_amqqueue(Q) -> #{nodes := Nodes} = amqqueue:get_type_state(Q), Nodes; +i(memory, Q) when ?is_amqqueue(Q) -> + %% Return writer memory. It's not the full memory usage (we also have replica readers on + %% the writer node), but might be good enough + case amqqueue:get_pid(Q) of + none -> + 0; + Pid -> + try + {memory, M} = process_info(Pid, memory), + M + catch + error:badarg -> + 0 + end + end; i(online, Q) -> #{name := StreamId} = amqqueue:get_type_state(Q), case rabbit_stream_coordinator:members(StreamId) of @@ -490,6 +506,12 @@ i(effective_policy_definition, Q) -> undefined -> []; Def -> Def end; +i(readers, Q) -> + QName = amqqueue:get_name(Q), + Conf = amqqueue:get_type_state(Q), + Nodes = [maps:get(leader_node, Conf) | maps:get(replica_nodes, Conf)], + {Data, _} = rpc:multicall(Nodes, ?MODULE, readers, [QName]), + lists:flatten(Data); i(type, _) -> stream; i(_, _) -> @@ -540,6 +562,21 @@ tracking_status(Vhost, QueueName) -> E end. +readers(QName) -> + try + Data = osiris_counters:overview(), + Readers = case maps:get({osiris_writer, QName}, Data, not_found) of + not_found -> + maps:get(readers, maps:get({osiris_replica, QName}, Data, #{}), 0); + Map -> + maps:get(readers, Map, 0) + end, + {node(), Readers} + catch + _:_ -> + {node(), 0} + end. + init(Q) when ?is_amqqueue(Q) -> Leader = amqqueue:get_pid(Q), #{name := StreamId} = amqqueue:get_type_state(Q), diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs index 465808da8f..e3ac572fde 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs @@ -128,20 +128,35 @@ <th>State</th> <td><%= fmt_object_state(queue) %></td> </tr> + <% if(queue.consumers) { %> <tr> <th>Consumers</th> <td><%= fmt_string(queue.consumers) %></td> </tr> + <% } else if(queue.hasOwnProperty('consumer_details')) { %> + <tr> + <th>Consumers</th> + <td><%= fmt_string(queue.consumer_details.length) %></td> + </tr> + <% } %> + <% if (!is_stream(queue)) { %> <tr> <th>Consumer capacity <span class="help" id="queue-consumer-capacity"></th> <td><%= fmt_percent(queue.consumer_capacity) %></td> </tr> - <% if (is_quorum(queue) || is_stream(queue)) { %> + <% } %> + <% if (is_quorum(queue)) { %> <tr> <th>Open files</th> <td><%= fmt_table_short(queue.open_files) %></td> </tr> <% } %> + <% if (is_stream(queue)) { %> + <tr> + <th>Readers</th> + <td><%= fmt_table_short(queue.readers) %></td> + </tr> + <% } %> </table> <table class="facts"> @@ -173,9 +188,11 @@ <td class="r"> <%= fmt_num_thousands(queue.messages_unacknowledged) %> </td> + <% if (is_quorum(queue)) { %> <td class="r"> <%= fmt_num_thousands(queue.messages_ram) %> </td> + <% } %> <% if (is_classic(queue)) { %> <td class="r"> <%= fmt_num_thousands(queue.messages_persistent) %> @@ -186,6 +203,7 @@ <% } %> </tr> <tr> + <% if (is_classic(queue) || is_quorum(queue)) { %> <th> Message body bytes <span class="help" id="queue-message-body-bytes"></span> @@ -202,6 +220,7 @@ <td class="r"> <%= fmt_bytes(queue.message_bytes_ram) %> </td> + <% } %> <% if (is_classic(queue)) { %> <td class="r"> <%= fmt_bytes(queue.message_bytes_persistent) %> |