summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-12-07 15:56:23 +0100
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-12-07 15:56:23 +0100
commit3f1d010ecd78d8b7ea0c97399d2b48a0c9a29238 (patch)
treeb700d6067a1fc62edfe2c3ebc2db5e98a0f455ee
parent2b45d8635b2a59f05f669ac93e457e25fb8daba3 (diff)
downloadrabbitmq-server-git-3f1d010ecd78d8b7ea0c97399d2b48a0c9a29238.tar.gz
Add stream connection consumers REST endpoint
-rw-r--r--deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs2
-rw-r--r--deps/rabbitmq_stream_management/priv/www/js/stream.js11
-rw-r--r--deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnection.ejs138
-rw-r--r--deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnections.ejs4
-rw-r--r--deps/rabbitmq_stream_management/src/rabbit_stream_connection_consumers_mgmt.erl55
-rw-r--r--deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl27
-rw-r--r--deps/rabbitmq_stream_management/test/http_SUITE.erl2
-rw-r--r--deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java58
8 files changed, 289 insertions, 8 deletions
diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs
index 9a09f89b72..aff8ce65d1 100644
--- a/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs
+++ b/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs
@@ -1,4 +1,4 @@
-<h1>Connection <%= fmt_string(connection.name) %> <%= fmt_maybe_vhost(connection.vhost) %></h1>
+<h2>Connection <%= fmt_string(connection.name) %> <%= fmt_maybe_vhost(connection.vhost) %></h1>
<% if (!disable_stats) { %>
<div class="section">
diff --git a/deps/rabbitmq_stream_management/priv/www/js/stream.js b/deps/rabbitmq_stream_management/priv/www/js/stream.js
index 1bee90ee63..57c6c0dcfd 100644
--- a/deps/rabbitmq_stream_management/priv/www/js/stream.js
+++ b/deps/rabbitmq_stream_management/priv/www/js/stream.js
@@ -2,6 +2,13 @@ dispatcher_add(function(sammy) {
sammy.get('#/stream/connections', function() {
renderStreamConnections();
});
+ sammy.get('#/stream/connections/:vhost/:name', function() {
+ var vhost = esc(this.params['vhost']);
+ var name = esc(this.params['name']);
+ render({'connection': {path: '/stream/connections/'+ vhost + '/' + name,
+ options: {ranges: ['data-rates-conn']}}},
+ 'streamConnection', '#/stream/connections');
+ });
});
@@ -31,4 +38,8 @@ function renderStreamConnections() {
'streamConnections', '#/stream/connections');
}
+function link_stream_conn(vhost, name, desc) {
+ return _link_to(short_conn(name), '#/stream/connections/' + esc(vhost) + '/' + esc(name));
+}
+
RENDER_CALLBACKS['streamConnections'] = function() { renderStreamConnections() }; \ No newline at end of file
diff --git a/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnection.ejs b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnection.ejs
new file mode 100644
index 0000000000..a06e728ebd
--- /dev/null
+++ b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnection.ejs
@@ -0,0 +1,138 @@
+<h2>Connection <%= fmt_string(connection.name) %> <%= fmt_maybe_vhost(connection.vhost) %></h1>
+
+<% if (!disable_stats) { %>
+<div class="section">
+<h2>Overview</h2>
+<div class="hider updatable">
+ <%= data_rates('data-rates-conn', connection, 'Data rates') %>
+
+<h3>Details</h3>
+<table class="facts facts-l">
+<% if (nodes_interesting) { %>
+<tr>
+ <th>Node</th>
+ <td><%= fmt_node(connection.node) %></td>
+</tr>
+<% } %>
+
+<% if (connection.client_properties.connection_name) { %>
+<tr>
+ <th>Client-provided name</th>
+ <td><%= fmt_string(connection.client_properties.connection_name) %></td>
+</tr>
+<% } %>
+
+<tr>
+ <th>Username</th>
+ <td><%= fmt_string(connection.user) %></td>
+</tr>
+<tr>
+ <th>Protocol</th>
+ <td><%= connection.protocol %></td>
+</tr>
+<tr>
+ <th>Connected at</th>
+ <td><%= fmt_timestamp(connection.connected_at) %></td>
+</tr>
+
+<% if (connection.auth_mechanism) { %>
+<tr>
+ <th>Authentication</th>
+ <td><%= connection.auth_mechanism %></td>
+</tr>
+<% } %>
+</table>
+
+<% if (connection.state) { %>
+<table class="facts">
+<tr>
+ <th>State</th>
+ <td><%= fmt_object_state(connection) %></td>
+</tr>
+<tr>
+ <th>Heartbeat</th>
+ <td><%= fmt_time(connection.timeout, 's') %></td>
+</tr>
+<tr>
+ <th>Frame max</th>
+ <td><%= connection.frame_max %> bytes</td>
+</tr>
+</table>
+
+<% } %>
+
+</div>
+</div>
+
+<div class="section">
+ <h2>Channels</h2>
+ <div class="hider updatable">
+ </div>
+</div>
+
+<% if (properties_size(connection.client_properties) > 0) { %>
+<div class="section-hidden">
+<h2>Client properties</h2>
+<div class="hider">
+<%= fmt_table_long(connection.client_properties) %>
+</div>
+</div>
+<% } %>
+
+<% if(connection.reductions || connection.garbage_collection) { %>
+<div class="section-hidden">
+<h2>Runtime Metrics (Advanced)</h2>
+ <div class="hider updatable">
+ <%= data_reductions('reductions-rates-conn', connection) %>
+ <table class="facts">
+ <% if (connection.garbage_collection.min_bin_vheap_size) { %>
+ <tr>
+ <th>Minimum binary virtual heap size in words (min_bin_vheap_size)</th>
+ <td><%= connection.garbage_collection.min_bin_vheap_size %></td>
+ </tr>
+ <% } %>
+
+ <% if (connection.garbage_collection.min_heap_size) { %>
+ <tr>
+ <th>Minimum heap size in words (min_heap_size)</th>
+ <td><%= connection.garbage_collection.min_heap_size %></td>
+ </tr>
+ <% } %>
+
+ <% if (connection.garbage_collection.fullsweep_after) { %>
+ <tr>
+ <th>Maximum generational collections before fullsweep (fullsweep_after)</th>
+ <td><%= connection.garbage_collection.fullsweep_after %></td>
+ </tr>
+ <% } %>
+
+ <% if (connection.garbage_collection.minor_gcs) { %>
+ <tr>
+ <th>Number of minor GCs (minor_gcs)</th>
+ <td><%= connection.garbage_collection.minor_gcs %></td>
+ </tr>
+ <% } %>
+ </table>
+ </div>
+</div>
+
+<% } %>
+<% } %>
+
+<div class="section-hidden">
+ <h2>Close this connection</h2>
+ <div class="hider">
+ <form action="#/connections" method="delete" class="confirm">
+ <input type="hidden" name="name" value="<%= fmt_string(connection.name) %>"/>
+ <table class="form">
+ <tr>
+ <th><label>Reason:</label></th>
+ <td>
+ <input type="text" name="reason" value="Closed via management plugin" class="wide"/>
+ </td>
+ </tr>
+ </table>
+ <input type="submit" value="Force Close"/>
+ </form>
+ </div>
+</div>
diff --git a/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnections.ejs b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnections.ejs
index 1c11af4ca9..08dc0eab42 100644
--- a/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnections.ejs
+++ b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnections.ejs
@@ -70,11 +70,11 @@
<% } %>
<% if(connection.client_properties) { %>
<td>
- <%= link_conn(connection.name) %>
+ <%= link_stream_conn(connection.vhost, connection.name) %>
<sub><%= fmt_string(short_conn(connection.client_properties.connection_name)) %></sub>
</td>
<% } else { %>
- <td><%= link_conn(connection.name) %></td>
+ <td><%= link_stream_conn(connection.vhost, connection.name) %></td>
<% } %>
<% if (nodes_interesting) { %>
<td><%= fmt_node(connection.node) %></td>
diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_connection_consumers_mgmt.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_connection_consumers_mgmt.erl
new file mode 100644
index 0000000000..de9f55426b
--- /dev/null
+++ b/deps/rabbitmq_stream_management/src/rabbit_stream_connection_consumers_mgmt.erl
@@ -0,0 +1,55 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_stream_connection_consumers_mgmt).
+
+-behaviour(rabbit_mgmt_extension).
+
+-export([dispatcher/0, web_ui/0]).
+-export([init/2, to_json/2, content_types_provided/2, is_authorized/2]).
+-export([resource_exists/2]).
+-export([variances/2]).
+
+-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+dispatcher() -> [{"/stream/connections/:vhost/:connection/consumers", ?MODULE, []}].
+
+web_ui() -> [].
+
+%%--------------------------------------------------------------------
+init(Req, _State) ->
+ {cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}.
+
+variances(Req, Context) ->
+ {[<<"accept-encoding">>, <<"origin">>], Req, Context}.
+
+content_types_provided(ReqData, Context) ->
+ {rabbit_mgmt_util:responder_map(to_json), ReqData, Context}.
+
+resource_exists(ReqData, Context) ->
+ case rabbit_mgmt_wm_connection:conn(ReqData) of
+ error -> {false, ReqData, Context};
+ _Conn -> {true, ReqData, Context}
+ end.
+
+to_json(ReqData, Context) ->
+ Pid = proplists:get_value(pid, rabbit_mgmt_wm_connection:conn(ReqData)),
+ Consumers = rabbit_mgmt_format:strip_pids(rabbit_stream_mgmt_db:get_connection_consumers(Pid)),
+ rabbit_mgmt_util:reply_list(
+ Consumers,
+ ReqData, Context).
+
+is_authorized(ReqData, Context) ->
+ try
+ rabbit_mgmt_util:is_authorized_user(
+ ReqData, Context, rabbit_mgmt_wm_connection:conn(ReqData))
+ catch
+ {error, invalid_range_parameters, Reason} ->
+ rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, Context)
+ end.
+
diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl
index f8efdea8d5..df1e6972ad 100644
--- a/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl
+++ b/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl
@@ -11,7 +11,8 @@
-include_lib("rabbit_common/include/rabbit.hrl").
-export([get_all_consumers/1, get_all_publishers/1]).
--export([consumer_data/2, publisher_data/2]).
+-export([consumer_data/3, publisher_data/2]).
+-export([get_connection_consumers/1]).
get_all_consumers(VHosts) ->
rabbit_mgmt_db:submit(fun(_Interval) -> consumers_stats(VHosts) end).
@@ -19,21 +20,30 @@ get_all_consumers(VHosts) ->
get_all_publishers(VHosts) ->
rabbit_mgmt_db:submit(fun(_Interval) -> publishers_stats(VHosts) end).
+get_connection_consumers(ConnectionPid) when is_pid(ConnectionPid) ->
+ rabbit_mgmt_db:submit(fun(_Interval) -> connection_consumers_stats(ConnectionPid) end).
+
consumers_stats(VHost) ->
- Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, consumer_data, [VHost]}),
+ Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, consumer_data,
+ [VHost, fun consumers_by_vhost/1]}),
[V || {_, V} <- maps:to_list(Data)].
publishers_stats(VHost) ->
Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, publisher_data, [VHost]}),
[V || {_, V} <- maps:to_list(Data)].
-consumer_data(_Pid, VHost) ->
+connection_consumers_stats(ConnectionPid) ->
+ Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, consumer_data,
+ [ConnectionPid, fun consumers_by_connection/1]}),
+ [V || {_, V} <- maps:to_list(Data)].
+
+consumer_data(_Pid, Param, QueryFun) ->
maps:from_list(
[begin
AugmentedConsumer = augment_consumer(C),
{C, augment_connection_pid(AugmentedConsumer) ++ AugmentedConsumer}
end
- || C <- consumers_by_vhost(VHost)]
+ || C <- QueryFun(Param)]
).
publisher_data(_Pid, VHost) ->
@@ -67,6 +77,15 @@ publishers_by_vhost(VHost) ->
[{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}],
['$_']}]).
+consumers_by_connection(ConnectionPid) ->
+ get_entity_stats(?TABLE_CONSUMER, ConnectionPid).
+
+get_entity_stats(Table, Id) ->
+ ets:select(Table, match_entity_spec(Id)).
+
+match_entity_spec(ConnectionId) ->
+ [{{{'_', '$1', '_'}, '_'}, [{'==', ConnectionId, '$1'}], ['$_']}].
+
augment_connection_pid(Consumer) ->
Pid = rabbit_misc:pget(connection, Consumer),
Conn = rabbit_mgmt_data:lookup_element(connection_created_stats, Pid, 3),
diff --git a/deps/rabbitmq_stream_management/test/http_SUITE.erl b/deps/rabbitmq_stream_management/test/http_SUITE.erl
index c1bf15181c..178f4523be 100644
--- a/deps/rabbitmq_stream_management/test/http_SUITE.erl
+++ b/deps/rabbitmq_stream_management/test/http_SUITE.erl
@@ -37,7 +37,7 @@ init_per_suite(Config) ->
rabbit_ct_helpers:run_setup_steps(Config1,
[fun(StepConfig) ->
rabbit_ct_helpers:merge_app_env(StepConfig,
- {rabbit, [{collect_statistics_interval, 1000}]})
+ {rabbit, [{collect_statistics_interval, 500}]})
end] ++
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java
index e4e4c7144a..76295ea229 100644
--- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java
+++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java
@@ -18,8 +18,10 @@ package com.rabbitmq.stream;
import static com.rabbitmq.stream.TestUtils.waitUntil;
import static java.lang.String.format;
+import static org.assertj.core.api.Assertions.as;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.InstanceOfAssertFactories.MAP;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
@@ -169,6 +171,62 @@ public class HttpTest {
}
@Test
+ void connectionConsumers() throws Exception {
+ Callable<List<Map<String, Object>>> request = () -> toMaps(get("/stream/connections"));
+ int initialCount = request.call().size();
+ String s = UUID.randomUUID().toString();
+ Client c1 = cf.get(new ClientParameters().virtualHost("vh1"));
+ try {
+ c1.create(s);
+ assertThat(c1.subscribe((byte) 0, s, OffsetSpecification.first(), 10).isOk()).isTrue();
+ assertThat(c1.subscribe((byte) 1, s, OffsetSpecification.first(), 5).isOk()).isTrue();
+ Client c2 =
+ cf.get(
+ new ClientParameters()
+ .virtualHost("vh1")
+ .username("user-management")
+ .password("user-management"));
+ assertThat(c2.subscribe((byte) 0, s, OffsetSpecification.first(), 10).isOk()).isTrue();
+ waitUntil(() -> request.call().size() == initialCount + 2);
+
+ Callable<Map<String, Object>> cRequest =
+ () -> toMap(get("/stream/connections/vh1/" + connectionName(c1)));
+ // wait until some stats are in the response
+ waitUntil(() -> cRequest.call().containsKey("recv_oct_details"));
+
+ Callable<List<Map<String, Object>>> consumersRequest =
+ () -> toMaps(get("/stream/connections/vh1/" + connectionName(c1) + "/consumers"));
+ List<Map<String, Object>> consumers = consumersRequest.call();
+
+ assertThat(consumers).hasSize(2);
+ consumers.forEach(
+ c -> {
+ assertThat(c).containsKeys("subscription_id", "credits", "connection_details", "queue");
+ assertThat(c)
+ .extractingByKey("connection_details", as(MAP))
+ .containsValue(connectionName(c1));
+ });
+
+ consumersRequest =
+ () -> toMaps(get("/stream/connections/vh1/" + connectionName(c2) + "/consumers"));
+ consumers = consumersRequest.call();
+ assertThat(consumers).hasSize(1);
+ assertThat(consumers.get(0))
+ .extractingByKey("connection_details", as(MAP))
+ .containsValue(connectionName(c2));
+
+ assertThatThrownBy(
+ () ->
+ get(
+ httpClient("user-management"),
+ "/stream/connections/vh1/" + connectionName(c1) + "/consumers"))
+ .hasMessageContaining("401");
+ } finally {
+ c1.delete(s);
+ }
+ }
+
+ @Test
void publishers() throws Exception {
Callable<List<Map<String, Object>>> request = () -> toMaps(get("/stream/publishers"));
int initialCount = request.call().size();