diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-12-07 15:56:23 +0100 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-12-07 15:56:23 +0100 |
commit | 3f1d010ecd78d8b7ea0c97399d2b48a0c9a29238 (patch) | |
tree | b700d6067a1fc62edfe2c3ebc2db5e98a0f455ee | |
parent | 2b45d8635b2a59f05f669ac93e457e25fb8daba3 (diff) | |
download | rabbitmq-server-git-3f1d010ecd78d8b7ea0c97399d2b48a0c9a29238.tar.gz |
Add stream connection consumers REST endpoint
8 files changed, 289 insertions, 8 deletions
diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs index 9a09f89b72..aff8ce65d1 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs @@ -1,4 +1,4 @@ -<h1>Connection <%= fmt_string(connection.name) %> <%= fmt_maybe_vhost(connection.vhost) %></h1> +<h2>Connection <%= fmt_string(connection.name) %> <%= fmt_maybe_vhost(connection.vhost) %></h1> <% if (!disable_stats) { %> <div class="section"> diff --git a/deps/rabbitmq_stream_management/priv/www/js/stream.js b/deps/rabbitmq_stream_management/priv/www/js/stream.js index 1bee90ee63..57c6c0dcfd 100644 --- a/deps/rabbitmq_stream_management/priv/www/js/stream.js +++ b/deps/rabbitmq_stream_management/priv/www/js/stream.js @@ -2,6 +2,13 @@ dispatcher_add(function(sammy) { sammy.get('#/stream/connections', function() { renderStreamConnections(); }); + sammy.get('#/stream/connections/:vhost/:name', function() { + var vhost = esc(this.params['vhost']); + var name = esc(this.params['name']); + render({'connection': {path: '/stream/connections/'+ vhost + '/' + name, + options: {ranges: ['data-rates-conn']}}}, + 'streamConnection', '#/stream/connections'); + }); }); @@ -31,4 +38,8 @@ function renderStreamConnections() { 'streamConnections', '#/stream/connections'); } +function link_stream_conn(vhost, name, desc) { + return _link_to(short_conn(name), '#/stream/connections/' + esc(vhost) + '/' + esc(name)); +} + RENDER_CALLBACKS['streamConnections'] = function() { renderStreamConnections() };
\ No newline at end of file diff --git a/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnection.ejs b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnection.ejs new file mode 100644 index 0000000000..a06e728ebd --- /dev/null +++ b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnection.ejs @@ -0,0 +1,138 @@ +<h2>Connection <%= fmt_string(connection.name) %> <%= fmt_maybe_vhost(connection.vhost) %></h1> + +<% if (!disable_stats) { %> +<div class="section"> +<h2>Overview</h2> +<div class="hider updatable"> + <%= data_rates('data-rates-conn', connection, 'Data rates') %> + +<h3>Details</h3> +<table class="facts facts-l"> +<% if (nodes_interesting) { %> +<tr> + <th>Node</th> + <td><%= fmt_node(connection.node) %></td> +</tr> +<% } %> + +<% if (connection.client_properties.connection_name) { %> +<tr> + <th>Client-provided name</th> + <td><%= fmt_string(connection.client_properties.connection_name) %></td> +</tr> +<% } %> + +<tr> + <th>Username</th> + <td><%= fmt_string(connection.user) %></td> +</tr> +<tr> + <th>Protocol</th> + <td><%= connection.protocol %></td> +</tr> +<tr> + <th>Connected at</th> + <td><%= fmt_timestamp(connection.connected_at) %></td> +</tr> + +<% if (connection.auth_mechanism) { %> +<tr> + <th>Authentication</th> + <td><%= connection.auth_mechanism %></td> +</tr> +<% } %> +</table> + +<% if (connection.state) { %> +<table class="facts"> +<tr> + <th>State</th> + <td><%= fmt_object_state(connection) %></td> +</tr> +<tr> + <th>Heartbeat</th> + <td><%= fmt_time(connection.timeout, 's') %></td> +</tr> +<tr> + <th>Frame max</th> + <td><%= connection.frame_max %> bytes</td> +</tr> +</table> + +<% } %> + +</div> +</div> + +<div class="section"> + <h2>Channels</h2> + <div class="hider updatable"> + </div> +</div> + +<% if (properties_size(connection.client_properties) > 0) { %> +<div class="section-hidden"> +<h2>Client properties</h2> +<div class="hider"> +<%= fmt_table_long(connection.client_properties) %> +</div> +</div> +<% } %> + +<% if(connection.reductions || connection.garbage_collection) { %> +<div class="section-hidden"> +<h2>Runtime Metrics (Advanced)</h2> + <div class="hider updatable"> + <%= data_reductions('reductions-rates-conn', connection) %> + <table class="facts"> + <% if (connection.garbage_collection.min_bin_vheap_size) { %> + <tr> + <th>Minimum binary virtual heap size in words (min_bin_vheap_size)</th> + <td><%= connection.garbage_collection.min_bin_vheap_size %></td> + </tr> + <% } %> + + <% if (connection.garbage_collection.min_heap_size) { %> + <tr> + <th>Minimum heap size in words (min_heap_size)</th> + <td><%= connection.garbage_collection.min_heap_size %></td> + </tr> + <% } %> + + <% if (connection.garbage_collection.fullsweep_after) { %> + <tr> + <th>Maximum generational collections before fullsweep (fullsweep_after)</th> + <td><%= connection.garbage_collection.fullsweep_after %></td> + </tr> + <% } %> + + <% if (connection.garbage_collection.minor_gcs) { %> + <tr> + <th>Number of minor GCs (minor_gcs)</th> + <td><%= connection.garbage_collection.minor_gcs %></td> + </tr> + <% } %> + </table> + </div> +</div> + +<% } %> +<% } %> + +<div class="section-hidden"> + <h2>Close this connection</h2> + <div class="hider"> + <form action="#/connections" method="delete" class="confirm"> + <input type="hidden" name="name" value="<%= fmt_string(connection.name) %>"/> + <table class="form"> + <tr> + <th><label>Reason:</label></th> + <td> + <input type="text" name="reason" value="Closed via management plugin" class="wide"/> + </td> + </tr> + </table> + <input type="submit" value="Force Close"/> + </form> + </div> +</div> diff --git a/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnections.ejs b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnections.ejs index 1c11af4ca9..08dc0eab42 100644 --- a/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnections.ejs +++ b/deps/rabbitmq_stream_management/priv/www/js/tmpl/streamConnections.ejs @@ -70,11 +70,11 @@ <% } %> <% if(connection.client_properties) { %> <td> - <%= link_conn(connection.name) %> + <%= link_stream_conn(connection.vhost, connection.name) %> <sub><%= fmt_string(short_conn(connection.client_properties.connection_name)) %></sub> </td> <% } else { %> - <td><%= link_conn(connection.name) %></td> + <td><%= link_stream_conn(connection.vhost, connection.name) %></td> <% } %> <% if (nodes_interesting) { %> <td><%= fmt_node(connection.node) %></td> diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_connection_consumers_mgmt.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_connection_consumers_mgmt.erl new file mode 100644 index 0000000000..de9f55426b --- /dev/null +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_connection_consumers_mgmt.erl @@ -0,0 +1,55 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_stream_connection_consumers_mgmt). + +-behaviour(rabbit_mgmt_extension). + +-export([dispatcher/0, web_ui/0]). +-export([init/2, to_json/2, content_types_provided/2, is_authorized/2]). +-export([resource_exists/2]). +-export([variances/2]). + +-include_lib("rabbitmq_management_agent/include/rabbit_mgmt_records.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). + +dispatcher() -> [{"/stream/connections/:vhost/:connection/consumers", ?MODULE, []}]. + +web_ui() -> []. + +%%-------------------------------------------------------------------- +init(Req, _State) -> + {cowboy_rest, rabbit_mgmt_headers:set_common_permission_headers(Req, ?MODULE), #context{}}. + +variances(Req, Context) -> + {[<<"accept-encoding">>, <<"origin">>], Req, Context}. + +content_types_provided(ReqData, Context) -> + {rabbit_mgmt_util:responder_map(to_json), ReqData, Context}. + +resource_exists(ReqData, Context) -> + case rabbit_mgmt_wm_connection:conn(ReqData) of + error -> {false, ReqData, Context}; + _Conn -> {true, ReqData, Context} + end. + +to_json(ReqData, Context) -> + Pid = proplists:get_value(pid, rabbit_mgmt_wm_connection:conn(ReqData)), + Consumers = rabbit_mgmt_format:strip_pids(rabbit_stream_mgmt_db:get_connection_consumers(Pid)), + rabbit_mgmt_util:reply_list( + Consumers, + ReqData, Context). + +is_authorized(ReqData, Context) -> + try + rabbit_mgmt_util:is_authorized_user( + ReqData, Context, rabbit_mgmt_wm_connection:conn(ReqData)) + catch + {error, invalid_range_parameters, Reason} -> + rabbit_mgmt_util:bad_request(iolist_to_binary(Reason), ReqData, Context) + end. + diff --git a/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl b/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl index f8efdea8d5..df1e6972ad 100644 --- a/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl +++ b/deps/rabbitmq_stream_management/src/rabbit_stream_mgmt_db.erl @@ -11,7 +11,8 @@ -include_lib("rabbit_common/include/rabbit.hrl"). -export([get_all_consumers/1, get_all_publishers/1]). --export([consumer_data/2, publisher_data/2]). +-export([consumer_data/3, publisher_data/2]). +-export([get_connection_consumers/1]). get_all_consumers(VHosts) -> rabbit_mgmt_db:submit(fun(_Interval) -> consumers_stats(VHosts) end). @@ -19,21 +20,30 @@ get_all_consumers(VHosts) -> get_all_publishers(VHosts) -> rabbit_mgmt_db:submit(fun(_Interval) -> publishers_stats(VHosts) end). +get_connection_consumers(ConnectionPid) when is_pid(ConnectionPid) -> + rabbit_mgmt_db:submit(fun(_Interval) -> connection_consumers_stats(ConnectionPid) end). + consumers_stats(VHost) -> - Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, consumer_data, [VHost]}), + Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, consumer_data, + [VHost, fun consumers_by_vhost/1]}), [V || {_, V} <- maps:to_list(Data)]. publishers_stats(VHost) -> Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, publisher_data, [VHost]}), [V || {_, V} <- maps:to_list(Data)]. -consumer_data(_Pid, VHost) -> +connection_consumers_stats(ConnectionPid) -> + Data = rabbit_mgmt_db:get_data_from_nodes({rabbit_stream_mgmt_db, consumer_data, + [ConnectionPid, fun consumers_by_connection/1]}), + [V || {_, V} <- maps:to_list(Data)]. + +consumer_data(_Pid, Param, QueryFun) -> maps:from_list( [begin AugmentedConsumer = augment_consumer(C), {C, augment_connection_pid(AugmentedConsumer) ++ AugmentedConsumer} end - || C <- consumers_by_vhost(VHost)] + || C <- QueryFun(Param)] ). publisher_data(_Pid, VHost) -> @@ -67,6 +77,15 @@ publishers_by_vhost(VHost) -> [{'orelse', {'==', 'all', VHost}, {'==', VHost, '$1'}}], ['$_']}]). +consumers_by_connection(ConnectionPid) -> + get_entity_stats(?TABLE_CONSUMER, ConnectionPid). + +get_entity_stats(Table, Id) -> + ets:select(Table, match_entity_spec(Id)). + +match_entity_spec(ConnectionId) -> + [{{{'_', '$1', '_'}, '_'}, [{'==', ConnectionId, '$1'}], ['$_']}]. + augment_connection_pid(Consumer) -> Pid = rabbit_misc:pget(connection, Consumer), Conn = rabbit_mgmt_data:lookup_element(connection_created_stats, Pid, 3), diff --git a/deps/rabbitmq_stream_management/test/http_SUITE.erl b/deps/rabbitmq_stream_management/test/http_SUITE.erl index c1bf15181c..178f4523be 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE.erl +++ b/deps/rabbitmq_stream_management/test/http_SUITE.erl @@ -37,7 +37,7 @@ init_per_suite(Config) -> rabbit_ct_helpers:run_setup_steps(Config1, [fun(StepConfig) -> rabbit_ct_helpers:merge_app_env(StepConfig, - {rabbit, [{collect_statistics_interval, 1000}]}) + {rabbit, [{collect_statistics_interval, 500}]}) end] ++ rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()). diff --git a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java index e4e4c7144a..76295ea229 100644 --- a/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java +++ b/deps/rabbitmq_stream_management/test/http_SUITE_data/src/test/java/com/rabbitmq/stream/HttpTest.java @@ -18,8 +18,10 @@ package com.rabbitmq.stream; import static com.rabbitmq.stream.TestUtils.waitUntil; import static java.lang.String.format; +import static org.assertj.core.api.Assertions.as; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.InstanceOfAssertFactories.MAP; import com.google.gson.Gson; import com.google.gson.GsonBuilder; @@ -169,6 +171,62 @@ public class HttpTest { } @Test + void connectionConsumers() throws Exception { + Callable<List<Map<String, Object>>> request = () -> toMaps(get("/stream/connections")); + int initialCount = request.call().size(); + String s = UUID.randomUUID().toString(); + Client c1 = cf.get(new ClientParameters().virtualHost("vh1")); + try { + c1.create(s); + assertThat(c1.subscribe((byte) 0, s, OffsetSpecification.first(), 10).isOk()).isTrue(); + assertThat(c1.subscribe((byte) 1, s, OffsetSpecification.first(), 5).isOk()).isTrue(); + Client c2 = + cf.get( + new ClientParameters() + .virtualHost("vh1") + .username("user-management") + .password("user-management")); + assertThat(c2.subscribe((byte) 0, s, OffsetSpecification.first(), 10).isOk()).isTrue(); + waitUntil(() -> request.call().size() == initialCount + 2); + + Callable<Map<String, Object>> cRequest = + () -> toMap(get("/stream/connections/vh1/" + connectionName(c1))); + // wait until some stats are in the response + waitUntil(() -> cRequest.call().containsKey("recv_oct_details")); + + Callable<List<Map<String, Object>>> consumersRequest = + () -> toMaps(get("/stream/connections/vh1/" + connectionName(c1) + "/consumers")); + List<Map<String, Object>> consumers = consumersRequest.call(); + + assertThat(consumers).hasSize(2); + consumers.forEach( + c -> { + assertThat(c).containsKeys("subscription_id", "credits", "connection_details", "queue"); + assertThat(c) + .extractingByKey("connection_details", as(MAP)) + .containsValue(connectionName(c1)); + }); + + consumersRequest = + () -> toMaps(get("/stream/connections/vh1/" + connectionName(c2) + "/consumers")); + consumers = consumersRequest.call(); + assertThat(consumers).hasSize(1); + assertThat(consumers.get(0)) + .extractingByKey("connection_details", as(MAP)) + .containsValue(connectionName(c2)); + + assertThatThrownBy( + () -> + get( + httpClient("user-management"), + "/stream/connections/vh1/" + connectionName(c1) + "/consumers")) + .hasMessageContaining("401"); + } finally { + c1.delete(s); + } + } + + @Test void publishers() throws Exception { Callable<List<Map<String, Object>>> request = () -> toMaps(get("/stream/publishers")); int initialCount = request.call().size(); |