summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKarl Nilsson <kjnilsson@gmail.com>2021-04-13 16:59:29 +0100
committerGitHub <noreply@github.com>2021-04-13 16:59:29 +0100
commit5bb6bc80e1d45427b99ba2f4503023d4e163d4bc (patch)
tree62b5be6bb29069fdceee21bad01feadf6512c623
parentbfa1e4ec06cd33e271328b0f189ed0dbe7bab8ba (diff)
parent2f1f7c5288d6c4ba9dccafbd769ecad3981bf79a (diff)
downloadrabbitmq-server-git-5bb6bc80e1d45427b99ba2f4503023d4e163d4bc.tar.gz
Merge pull request #2936 from rabbitmq/stream-management
Observability fixes & enhancements for stream queues
-rw-r--r--deps/rabbit/src/rabbit_osiris_metrics.erl4
-rw-r--r--deps/rabbit/src/rabbit_stream_queue.erl39
-rw-r--r--deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs21
3 files changed, 61 insertions, 3 deletions
diff --git a/deps/rabbit/src/rabbit_osiris_metrics.erl b/deps/rabbit/src/rabbit_osiris_metrics.erl
index 4c3f3415b1..9cf4fdc256 100644
--- a/deps/rabbit/src/rabbit_osiris_metrics.erl
+++ b/deps/rabbit/src/rabbit_osiris_metrics.erl
@@ -32,7 +32,9 @@
state,
leader,
online,
- members
+ members,
+ memory,
+ readers
]).
-record(state, {timeout :: non_neg_integer()}).
diff --git a/deps/rabbit/src/rabbit_stream_queue.erl b/deps/rabbit/src/rabbit_stream_queue.erl
index f360d04edc..05977c3a58 100644
--- a/deps/rabbit/src/rabbit_stream_queue.erl
+++ b/deps/rabbit/src/rabbit_stream_queue.erl
@@ -45,6 +45,7 @@
delete_replica/3]).
-export([format_osiris_event/2]).
-export([update_stream_conf/2]).
+-export([readers/1]).
-export([status/2,
tracking_status/2]).
@@ -54,7 +55,7 @@
-define(INFO_KEYS, [name, durable, auto_delete, arguments, leader, members, online, state,
messages, messages_ready, messages_unacknowledged, committed_offset,
- policy, operator_policy, effective_policy_definition, type]).
+ policy, operator_policy, effective_policy_definition, type, memory]).
-type appender_seq() :: non_neg_integer().
@@ -429,6 +430,21 @@ i(leader, Q) when ?is_amqqueue(Q) ->
i(members, Q) when ?is_amqqueue(Q) ->
#{nodes := Nodes} = amqqueue:get_type_state(Q),
Nodes;
+i(memory, Q) when ?is_amqqueue(Q) ->
+ %% Return writer memory. It's not the full memory usage (we also have replica readers on
+ %% the writer node), but might be good enough
+ case amqqueue:get_pid(Q) of
+ none ->
+ 0;
+ Pid ->
+ try
+ {memory, M} = process_info(Pid, memory),
+ M
+ catch
+ error:badarg ->
+ 0
+ end
+ end;
i(online, Q) ->
#{name := StreamId} = amqqueue:get_type_state(Q),
case rabbit_stream_coordinator:members(StreamId) of
@@ -490,6 +506,12 @@ i(effective_policy_definition, Q) ->
undefined -> [];
Def -> Def
end;
+i(readers, Q) ->
+ QName = amqqueue:get_name(Q),
+ Conf = amqqueue:get_type_state(Q),
+ Nodes = [maps:get(leader_node, Conf) | maps:get(replica_nodes, Conf)],
+ {Data, _} = rpc:multicall(Nodes, ?MODULE, readers, [QName]),
+ lists:flatten(Data);
i(type, _) ->
stream;
i(_, _) ->
@@ -540,6 +562,21 @@ tracking_status(Vhost, QueueName) ->
E
end.
+readers(QName) ->
+ try
+ Data = osiris_counters:overview(),
+ Readers = case maps:get({osiris_writer, QName}, Data, not_found) of
+ not_found ->
+ maps:get(readers, maps:get({osiris_replica, QName}, Data, #{}), 0);
+ Map ->
+ maps:get(readers, Map, 0)
+ end,
+ {node(), Readers}
+ catch
+ _:_ ->
+ {node(), 0}
+ end.
+
init(Q) when ?is_amqqueue(Q) ->
Leader = amqqueue:get_pid(Q),
#{name := StreamId} = amqqueue:get_type_state(Q),
diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
index 465808da8f..e3ac572fde 100644
--- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
+++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
@@ -128,20 +128,35 @@
<th>State</th>
<td><%= fmt_object_state(queue) %></td>
</tr>
+ <% if(queue.consumers) { %>
<tr>
<th>Consumers</th>
<td><%= fmt_string(queue.consumers) %></td>
</tr>
+ <% } else if(queue.hasOwnProperty('consumer_details')) { %>
+ <tr>
+ <th>Consumers</th>
+ <td><%= fmt_string(queue.consumer_details.length) %></td>
+ </tr>
+ <% } %>
+ <% if (!is_stream(queue)) { %>
<tr>
<th>Consumer capacity <span class="help" id="queue-consumer-capacity"></th>
<td><%= fmt_percent(queue.consumer_capacity) %></td>
</tr>
- <% if (is_quorum(queue) || is_stream(queue)) { %>
+ <% } %>
+ <% if (is_quorum(queue)) { %>
<tr>
<th>Open files</th>
<td><%= fmt_table_short(queue.open_files) %></td>
</tr>
<% } %>
+ <% if (is_stream(queue)) { %>
+ <tr>
+ <th>Readers</th>
+ <td><%= fmt_table_short(queue.readers) %></td>
+ </tr>
+ <% } %>
</table>
<table class="facts">
@@ -173,9 +188,11 @@
<td class="r">
<%= fmt_num_thousands(queue.messages_unacknowledged) %>
</td>
+ <% if (is_quorum(queue)) { %>
<td class="r">
<%= fmt_num_thousands(queue.messages_ram) %>
</td>
+ <% } %>
<% if (is_classic(queue)) { %>
<td class="r">
<%= fmt_num_thousands(queue.messages_persistent) %>
@@ -186,6 +203,7 @@
<% } %>
</tr>
<tr>
+ <% if (is_classic(queue) || is_quorum(queue)) { %>
<th>
Message body bytes
<span class="help" id="queue-message-body-bytes"></span>
@@ -202,6 +220,7 @@
<td class="r">
<%= fmt_bytes(queue.message_bytes_ram) %>
</td>
+ <% } %>
<% if (is_classic(queue)) { %>
<td class="r">
<%= fmt_bytes(queue.message_bytes_persistent) %>