summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-16 09:18:36 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-16 09:18:36 +0200
commit27d06e8021f8f7d4fbfba2f4f10173df995f3e4c (patch)
tree00d5237a7b483fad256db57b5faccd0dedc0479d
parent9e6008a36222b3e721c79c99af3a4645287473ff (diff)
downloadrabbitmq-server-git-27d06e8021f8f7d4fbfba2f4f10173df995f3e4c.tar.gz
Add CLI command to list stream connections
-rw-r--r--deps/rabbitmq_stream/include/rabbit_stream.hrl21
-rw-r--r--deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl95
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream.erl27
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl4
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl72
-rw-r--r--deps/rabbitmq_stream/test/command_SUITE.erl136
6 files changed, 346 insertions, 9 deletions
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl
index f4f11b0324..684dd9a066 100644
--- a/deps/rabbitmq_stream/include/rabbit_stream.hrl
+++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl
@@ -47,4 +47,23 @@
-define(DEFAULT_INITIAL_CREDITS, 50000).
-define(DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING, 12500).
-define(DEFAULT_FRAME_MAX, 1048576). %% 1 MiB
--define(DEFAULT_HEARTBEAT, 60). %% 60 seconds \ No newline at end of file
+-define(DEFAULT_HEARTBEAT, 60). %% 60 seconds
+
+-define(INFO_ITEMS,
+ [conn_name,
+ port,
+ peer_port,
+ host,
+ peer_host,
+ user,
+ vhost,
+ subscriptions,
+ connection_state,
+ auth_mechanism,
+ heartbeat,
+ frame_max,
+ client_properties,
+ connected_at
+ ]).
+
+-define(STREAM_GUIDE_URL, <<"https://rabbitmq.com/stream.html">>). \ No newline at end of file
diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl
new file mode 100644
index 0000000000..f185ab044e
--- /dev/null
+++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl
@@ -0,0 +1,95 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 2.0 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved.
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
+
+-include("rabbit_stream.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-export([formatter/0,
+ scopes/0,
+ switches/0,
+ aliases/0,
+ usage/0,
+ usage_additional/0,
+ usage_doc_guides/0,
+ banner/2,
+ validate/2,
+ merge_defaults/2,
+ run/2,
+ output/2,
+ description/0,
+ help_section/0]).
+
+formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'.
+
+scopes() -> [ctl, diagnostics, streams].
+
+switches() -> [{verbose, boolean}].
+aliases() -> [{'V', verbose}].
+
+description() -> <<"Lists stream connections on the target node">>.
+
+help_section() ->
+ {plugin, stream}.
+
+validate(Args, _) ->
+ case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
+ ?INFO_ITEMS) of
+ {ok, _} -> ok;
+ Error -> Error
+ end.
+
+merge_defaults([], Opts) ->
+ merge_defaults([<<"conn_name">>], Opts);
+merge_defaults(Args, Opts) ->
+ {Args, maps:merge(#{verbose => false}, Opts)}.
+
+usage() ->
+ <<"list_stream_connections [<column> ...]">>.
+
+usage_additional() ->
+ Prefix = <<" must be one of ">>,
+ InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), <<", ">>),
+ [
+ {<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}
+ ].
+
+usage_doc_guides() ->
+ [?STREAM_GUIDE_URL].
+
+run(Args, #{node := NodeName,
+ timeout := Timeout,
+ verbose := Verbose}) ->
+ InfoKeys = case Verbose of
+ true -> ?INFO_ITEMS;
+ false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
+ end,
+ Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName),
+
+ 'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
+ NodeName,
+ rabbit_stream,
+ emit_connection_info_all,
+ [Nodes, InfoKeys],
+ Timeout,
+ InfoKeys,
+ length(Nodes)).
+
+banner(_, _) -> <<"Listing stream connections ...">>.
+
+output(Result, _Opts) ->
+ 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).
diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl
index ee0886b910..4f2d1eecfd 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream.erl
@@ -19,6 +19,9 @@
-export([start/2, host/0, port/0, kill_connection/1]).
-export([stop/1]).
+-export([emit_connection_info_local/3,
+ emit_connection_info_all/4,
+ list/0]).
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -66,4 +69,26 @@ kill_connection(ConnectionName) ->
after 1000 ->
ok
end
- end, pg_local:get_members(rabbit_stream_connections)). \ No newline at end of file
+ end, pg_local:get_members(rabbit_stream_connections)).
+
+emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
+ Pids = [spawn_link(Node, rabbit_stream, emit_connection_info_local,
+ [Items, Ref, AggregatorPid])
+ || Node <- Nodes],
+ rabbit_control_misc:await_emitters_termination(Pids),
+ ok.
+
+emit_connection_info_local(Items, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun(Pid) ->
+ rabbit_stream_reader:info(Pid, Items)
+ end,
+ list()).
+
+list() ->
+ [Client
+ || {_, ListSupPid, _, _} <- supervisor2:which_children(rabbit_stream_sup),
+ {_, RanchSup, supervisor, _} <- supervisor2:which_children(ListSupPid),
+ {ranch_conns_sup, ConnSup, _, _} <- supervisor:which_children(RanchSup),
+ {_, CliSup, _, _} <- supervisor:which_children(ConnSup),
+ {rabbit_stream_reader, Client, _, _} <- supervisor:which_children(CliSup)]. \ No newline at end of file
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 9992688d52..3b8c766552 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -186,7 +186,9 @@ handle_call({topology, VirtualHost, Stream}, _From, State) ->
_ ->
{error, stream_not_found}
end,
- {reply, Res, State}.
+ {reply, Res, State};
+handle_call(which_children, _From, State) ->
+ {reply, [], State}.
handle_cast(_, State) ->
{noreply, State}.
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 6f285894db..b43c7c837a 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -37,6 +37,16 @@
-record(stream_connection, {
name :: string(),
+ %% server host
+ host,
+ %% client host
+ peer_host,
+ %% server port
+ port,
+ %% client port
+ peer_port,
+ auth_mechanism,
+ connected_at :: integer(),
helper_sup :: pid(),
socket :: rabbit_net:socket(),
stream_leaders :: #{binary() => pid()},
@@ -47,6 +57,7 @@
virtual_host :: 'undefined' | binary(),
connection_step :: atom(), % tcp_connected, peer_properties_exchanged, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done
frame_max :: integer(),
+ heartbeat :: integer(),
heartbeater :: any(),
client_properties = #{} :: #{binary() => binary()},
monitors = #{} :: #{reference() => binary()}
@@ -63,7 +74,7 @@
-define(MAX_PERMISSION_CACHE_SIZE, 12).
%% API
--export([start_link/4, init/1]).
+-export([start_link/4, init/1, info/2]).
start_link(KeepaliveSup, Transport, Ref, Opts) ->
Pid = proc_lib:spawn_link(?MODULE, init,
@@ -83,8 +94,15 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
{ok, ConnStr} ->
Credits = atomics:new(1, [{signed, true}]),
init_credit(Credits, InitialCredits),
+ {PeerHost, PeerPort, Host, Port} =
+ socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
Connection = #stream_connection{
name = ConnStr,
+ host = Host,
+ peer_host = PeerHost,
+ port = Port,
+ peer_port = PeerPort,
+ connected_at = os:system_time(milli_seconds),
helper_sup = KeepaliveSup,
socket = RealSocket,
stream_leaders = #{},
@@ -109,6 +127,16 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
rabbit_log:warning("Closing connection because of ~p ~p~n", [Error, Reason])
end.
+socket_op(Sock, Fun) ->
+ RealSocket = rabbit_net:unwrap_socket(Sock),
+ case Fun(Sock) of
+ {ok, Res} -> Res;
+ {error, Reason} ->
+ rabbit_log:warning("Error during socket operation ~p~n", [Reason]),
+ rabbit_net:fast_close(RealSocket),
+ exit(normal)
+ end.
+
init_credit(CreditReference, Credits) ->
atomics:put(CreditReference, 1, Credits).
@@ -141,6 +169,9 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta
frame(Transport, Connection1, TuneFrame),
listen_loop_pre_auth(Transport, Connection1#stream_connection{connection_step = tuning}, State1, Configuration);
opened ->
+ % TODO remove registration to rabbit_stream_connections
+ % just meant to be able to close the connection remotely
+ % should be possible once the connections are available in ctl list_connections
pg_local:join(rabbit_stream_connections, self()),
listen_loop_post_auth(Transport, Connection1, State1, Configuration);
failure ->
@@ -306,6 +337,12 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
{infos, From} ->
From ! {self(), ClientProperties},
listen_loop_post_auth(Transport, Connection, State, Configuration);
+ {'$gen_call', From, info} ->
+ gen_server:reply(From, infos(?INFO_ITEMS, Connection, State)),
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
+ {'$gen_call', From, {info, Items}} ->
+ gen_server:reply(From, infos(Items, Connection, State)),
+ listen_loop_post_auth(Transport, Connection, State, Configuration);
{Closed, S} ->
demonitor_all_streams(Connection),
rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
@@ -499,7 +536,7 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_s
end,
Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>,
frame(Transport, S1, Frame),
- {S1, Rest};
+ {S1#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, Rest};
{error, _} ->
Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>,
frame(Transport, Connection0, Frame),
@@ -525,7 +562,8 @@ handle_frame_pre_auth(_Transport, #stream_connection{helper_sup = SupPid, socket
SupPid, Sock, ConnectionName,
Heartbeat, SendFun, Heartbeat, ReceiveFun),
- {Connection#stream_connection{connection_step = tuned, frame_max = FrameMax, heartbeater = Heartbeater}, State, Rest};
+ {Connection#stream_connection{connection_step = tuned, frame_max = FrameMax,
+ heartbeat = Heartbeat, heartbeater = Heartbeater}, State, Rest};
handle_frame_pre_auth(Transport, #stream_connection{user = User, socket = S} = Connection, State,
<<?COMMAND_OPEN:16, ?VERSION_0:16, CorrelationId:32,
VirtualHostLength:16, VirtualHost:VirtualHostLength/binary>>, Rest) ->
@@ -1095,6 +1133,28 @@ check_write_permitted(Resource, User, Context) ->
check_read_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, read, Context).
-%%clear_permission_cache() -> erase(permission_cache),
-%% erase(topic_permission_cache),
-%% ok. \ No newline at end of file
+info(Pid, InfoItems) ->
+ case InfoItems -- ?INFO_ITEMS of
+ [] ->
+ gen_server2:call(Pid, {info, InfoItems});
+ UnknownItems -> throw({bad_argument, UnknownItems})
+ end.
+
+infos(Items, Connection, State) -> [{Item, i(Item, Connection, State)} || Item <- Items].
+
+i(conn_name, #stream_connection{name = Name}, _) -> Name;
+i(port, #stream_connection{port = Port}, _) -> Port;
+i(peer_port, #stream_connection{peer_port = PeerPort}, _) -> PeerPort;
+i(host, #stream_connection{host = Host}, _) -> Host;
+i(peer_host, #stream_connection{peer_host = PeerHost}, _) -> PeerHost;
+i(user, #stream_connection{user = U}, _) -> U#user.username;
+i(vhost, #stream_connection{virtual_host = VirtualHost}, _) -> VirtualHost;
+i(subscriptions, _, #stream_connection_state{consumers = Consumers}) -> maps:size(Consumers);
+i(connection_state, _Connection, #stream_connection_state{blocked = true}) -> blocked;
+i(connection_state, _Connection, #stream_connection_state{blocked = false}) -> running;
+i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> Name;
+i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) -> Heartbeat;
+i(frame_max, #stream_connection{frame_max = FrameMax}, _) -> FrameMax;
+i(client_properties, #stream_connection{client_properties = CP}, _) -> CP;
+i(connected_at, #stream_connection{connected_at = T}, _) -> T;
+i(Item, #stream_connection{}, _) -> throw({bad_argument, Item}). \ No newline at end of file
diff --git a/deps/rabbitmq_stream/test/command_SUITE.erl b/deps/rabbitmq_stream/test/command_SUITE.erl
new file mode 100644
index 0000000000..41ab5904ff
--- /dev/null
+++ b/deps/rabbitmq_stream/test/command_SUITE.erl
@@ -0,0 +1,136 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(command_SUITE).
+-compile([export_all]).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_stream.hrl").
+
+
+-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand').
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ merge_defaults,
+ run
+ ]}
+ ].
+
+init_per_suite(Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config,
+ [{rmq_nodename_suffix, ?MODULE}]),
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps()).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config,
+ rabbit_ct_broker_helpers:teardown_steps()).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+merge_defaults(_Config) ->
+ {[<<"conn_name">>], #{verbose := false}} =
+ ?COMMAND:merge_defaults([], #{}),
+
+ {[<<"other_key">>], #{verbose := true}} =
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}),
+
+ {[<<"other_key">>], #{verbose := false}} =
+ ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}).
+
+
+run(Config) ->
+
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Opts = #{node => Node, timeout => 10000, verbose => false},
+
+ %% No connections
+ [] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)),
+
+ StreamPort = rabbit_stream_SUITE:get_stream_port(Config),
+
+ S1 = start_stream_connection(StreamPort),
+ ct:sleep(100),
+
+ [[{conn_name, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
+
+ S2 = start_stream_connection(StreamPort),
+ ct:sleep(100),
+
+ [[{conn_name, _}], [{conn_name, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
+
+ Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp),
+ start_amqp_connection(network, Node, Port),
+
+ %% There are still just two connections
+ [[{conn_name, _}], [{conn_name, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
+
+ start_amqp_connection(direct, Node, Port),
+
+ %% Still two MQTT connections, one direct AMQP 0-9-1 connection
+ [[{conn_name, _}], [{conn_name, _}]] =
+ 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)),
+
+ %% Verbose returns all keys
+ Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS),
+ AllKeys = 'Elixir.Enum':to_list(?COMMAND:run(Infos, Opts)),
+ AllKeys = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})),
+
+ %% There are two connections
+ [First, _Second] = AllKeys,
+
+ %% Keys are INFO_ITEMS
+ KeysCount = length(?INFO_ITEMS),
+ KeysCount = length(First),
+
+ {Keys, _} = lists:unzip(First),
+
+ [] = Keys -- ?INFO_ITEMS,
+ [] = ?INFO_ITEMS -- Keys,
+
+ rabbit_stream_SUITE:test_close(S1),
+ rabbit_stream_SUITE:test_close(S2),
+ ok.
+
+start_stream_connection(Port) ->
+ {ok, S} = gen_tcp:connect("localhost", Port, [{active, false},
+ {mode, binary}]),
+ rabbit_stream_SUITE:test_peer_properties(S),
+ rabbit_stream_SUITE:test_authenticate(S),
+ S.
+
+start_amqp_connection(Type, Node, Port) ->
+ Params = amqp_params(Type, Node, Port),
+ {ok, _Connection} = amqp_connection:start(Params).
+
+amqp_params(network, _, Port) ->
+ #amqp_params_network{port = Port};
+amqp_params(direct, Node, _) ->
+ #amqp_params_direct{node = Node}.