diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-16 09:18:36 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-16 09:18:36 +0200 |
commit | 27d06e8021f8f7d4fbfba2f4f10173df995f3e4c (patch) | |
tree | 00d5237a7b483fad256db57b5faccd0dedc0479d | |
parent | 9e6008a36222b3e721c79c99af3a4645287473ff (diff) | |
download | rabbitmq-server-git-27d06e8021f8f7d4fbfba2f4f10173df995f3e4c.tar.gz |
Add CLI command to list stream connections
6 files changed, 346 insertions, 9 deletions
diff --git a/deps/rabbitmq_stream/include/rabbit_stream.hrl b/deps/rabbitmq_stream/include/rabbit_stream.hrl index f4f11b0324..684dd9a066 100644 --- a/deps/rabbitmq_stream/include/rabbit_stream.hrl +++ b/deps/rabbitmq_stream/include/rabbit_stream.hrl @@ -47,4 +47,23 @@ -define(DEFAULT_INITIAL_CREDITS, 50000). -define(DEFAULT_CREDITS_REQUIRED_FOR_UNBLOCKING, 12500). -define(DEFAULT_FRAME_MAX, 1048576). %% 1 MiB --define(DEFAULT_HEARTBEAT, 60). %% 60 seconds
\ No newline at end of file +-define(DEFAULT_HEARTBEAT, 60). %% 60 seconds + +-define(INFO_ITEMS, + [conn_name, + port, + peer_port, + host, + peer_host, + user, + vhost, + subscriptions, + connection_state, + auth_mechanism, + heartbeat, + frame_max, + client_properties, + connected_at + ]). + +-define(STREAM_GUIDE_URL, <<"https://rabbitmq.com/stream.html">>).
\ No newline at end of file diff --git a/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl new file mode 100644 index 0000000000..f185ab044e --- /dev/null +++ b/deps/rabbitmq_stream/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand.erl @@ -0,0 +1,95 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 2.0 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at https://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2020 VMware, Inc. or its affiliates. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand'). + +-include("rabbit_stream.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([formatter/0, + scopes/0, + switches/0, + aliases/0, + usage/0, + usage_additional/0, + usage_doc_guides/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'. + +scopes() -> [ctl, diagnostics, streams]. + +switches() -> [{verbose, boolean}]. +aliases() -> [{'V', verbose}]. + +description() -> <<"Lists stream connections on the target node">>. + +help_section() -> + {plugin, stream}. + +validate(Args, _) -> + case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args, + ?INFO_ITEMS) of + {ok, _} -> ok; + Error -> Error + end. + +merge_defaults([], Opts) -> + merge_defaults([<<"conn_name">>], Opts); +merge_defaults(Args, Opts) -> + {Args, maps:merge(#{verbose => false}, Opts)}. + +usage() -> + <<"list_stream_connections [<column> ...]">>. + +usage_additional() -> + Prefix = <<" must be one of ">>, + InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), <<", ">>), + [ + {<<"<column>">>, <<Prefix/binary, InfoItems/binary>>} + ]. + +usage_doc_guides() -> + [?STREAM_GUIDE_URL]. + +run(Args, #{node := NodeName, + timeout := Timeout, + verbose := Verbose}) -> + InfoKeys = case Verbose of + true -> ?INFO_ITEMS; + false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args) + end, + Nodes = 'Elixir.RabbitMQ.CLI.Core.Helpers':nodes_in_cluster(NodeName), + + 'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items( + NodeName, + rabbit_stream, + emit_connection_info_all, + [Nodes, InfoKeys], + Timeout, + InfoKeys, + length(Nodes)). + +banner(_, _) -> <<"Listing stream connections ...">>. + +output(Result, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result). diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index ee0886b910..4f2d1eecfd 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -19,6 +19,9 @@ -export([start/2, host/0, port/0, kill_connection/1]). -export([stop/1]). +-export([emit_connection_info_local/3, + emit_connection_info_all/4, + list/0]). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -66,4 +69,26 @@ kill_connection(ConnectionName) -> after 1000 -> ok end - end, pg_local:get_members(rabbit_stream_connections)).
\ No newline at end of file + end, pg_local:get_members(rabbit_stream_connections)). + +emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> + Pids = [spawn_link(Node, rabbit_stream, emit_connection_info_local, + [Items, Ref, AggregatorPid]) + || Node <- Nodes], + rabbit_control_misc:await_emitters_termination(Pids), + ok. + +emit_connection_info_local(Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun(Pid) -> + rabbit_stream_reader:info(Pid, Items) + end, + list()). + +list() -> + [Client + || {_, ListSupPid, _, _} <- supervisor2:which_children(rabbit_stream_sup), + {_, RanchSup, supervisor, _} <- supervisor2:which_children(ListSupPid), + {ranch_conns_sup, ConnSup, _, _} <- supervisor:which_children(RanchSup), + {_, CliSup, _, _} <- supervisor:which_children(ConnSup), + {rabbit_stream_reader, Client, _, _} <- supervisor:which_children(CliSup)].
\ No newline at end of file diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 9992688d52..3b8c766552 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -186,7 +186,9 @@ handle_call({topology, VirtualHost, Stream}, _From, State) -> _ -> {error, stream_not_found} end, - {reply, Res, State}. + {reply, Res, State}; +handle_call(which_children, _From, State) -> + {reply, [], State}. handle_cast(_, State) -> {noreply, State}. diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 6f285894db..b43c7c837a 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -37,6 +37,16 @@ -record(stream_connection, { name :: string(), + %% server host + host, + %% client host + peer_host, + %% server port + port, + %% client port + peer_port, + auth_mechanism, + connected_at :: integer(), helper_sup :: pid(), socket :: rabbit_net:socket(), stream_leaders :: #{binary() => pid()}, @@ -47,6 +57,7 @@ virtual_host :: 'undefined' | binary(), connection_step :: atom(), % tcp_connected, peer_properties_exchanged, authenticating, authenticated, tuning, tuned, opened, failure, closing, closing_done frame_max :: integer(), + heartbeat :: integer(), heartbeater :: any(), client_properties = #{} :: #{binary() => binary()}, monitors = #{} :: #{reference() => binary()} @@ -63,7 +74,7 @@ -define(MAX_PERMISSION_CACHE_SIZE, 12). %% API --export([start_link/4, init/1]). +-export([start_link/4, init/1, info/2]). start_link(KeepaliveSup, Transport, Ref, Opts) -> Pid = proc_lib:spawn_link(?MODULE, init, @@ -83,8 +94,15 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, {ok, ConnStr} -> Credits = atomics:new(1, [{signed, true}]), init_credit(Credits, InitialCredits), + {PeerHost, PeerPort, Host, Port} = + socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), Connection = #stream_connection{ name = ConnStr, + host = Host, + peer_host = PeerHost, + port = Port, + peer_port = PeerPort, + connected_at = os:system_time(milli_seconds), helper_sup = KeepaliveSup, socket = RealSocket, stream_leaders = #{}, @@ -109,6 +127,16 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, rabbit_log:warning("Closing connection because of ~p ~p~n", [Error, Reason]) end. +socket_op(Sock, Fun) -> + RealSocket = rabbit_net:unwrap_socket(Sock), + case Fun(Sock) of + {ok, Res} -> Res; + {error, Reason} -> + rabbit_log:warning("Error during socket operation ~p~n", [Reason]), + rabbit_net:fast_close(RealSocket), + exit(normal) + end. + init_credit(CreditReference, Credits) -> atomics:put(CreditReference, 1, Credits). @@ -141,6 +169,9 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta frame(Transport, Connection1, TuneFrame), listen_loop_pre_auth(Transport, Connection1#stream_connection{connection_step = tuning}, State1, Configuration); opened -> + % TODO remove registration to rabbit_stream_connections + % just meant to be able to close the connection remotely + % should be possible once the connections are available in ctl list_connections pg_local:join(rabbit_stream_connections, self()), listen_loop_post_auth(Transport, Connection1, State1, Configuration); failure -> @@ -306,6 +337,12 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, {infos, From} -> From ! {self(), ClientProperties}, listen_loop_post_auth(Transport, Connection, State, Configuration); + {'$gen_call', From, info} -> + gen_server:reply(From, infos(?INFO_ITEMS, Connection, State)), + listen_loop_post_auth(Transport, Connection, State, Configuration); + {'$gen_call', From, {info, Items}} -> + gen_server:reply(From, infos(Items, Connection, State)), + listen_loop_post_auth(Transport, Connection, State, Configuration); {Closed, S} -> demonitor_all_streams(Connection), rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), @@ -499,7 +536,7 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_s end, Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>, frame(Transport, S1, Frame), - {S1, Rest}; + {S1#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, Rest}; {error, _} -> Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>, frame(Transport, Connection0, Frame), @@ -525,7 +562,8 @@ handle_frame_pre_auth(_Transport, #stream_connection{helper_sup = SupPid, socket SupPid, Sock, ConnectionName, Heartbeat, SendFun, Heartbeat, ReceiveFun), - {Connection#stream_connection{connection_step = tuned, frame_max = FrameMax, heartbeater = Heartbeater}, State, Rest}; + {Connection#stream_connection{connection_step = tuned, frame_max = FrameMax, + heartbeat = Heartbeat, heartbeater = Heartbeater}, State, Rest}; handle_frame_pre_auth(Transport, #stream_connection{user = User, socket = S} = Connection, State, <<?COMMAND_OPEN:16, ?VERSION_0:16, CorrelationId:32, VirtualHostLength:16, VirtualHost:VirtualHostLength/binary>>, Rest) -> @@ -1095,6 +1133,28 @@ check_write_permitted(Resource, User, Context) -> check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). -%%clear_permission_cache() -> erase(permission_cache), -%% erase(topic_permission_cache), -%% ok.
\ No newline at end of file +info(Pid, InfoItems) -> + case InfoItems -- ?INFO_ITEMS of + [] -> + gen_server2:call(Pid, {info, InfoItems}); + UnknownItems -> throw({bad_argument, UnknownItems}) + end. + +infos(Items, Connection, State) -> [{Item, i(Item, Connection, State)} || Item <- Items]. + +i(conn_name, #stream_connection{name = Name}, _) -> Name; +i(port, #stream_connection{port = Port}, _) -> Port; +i(peer_port, #stream_connection{peer_port = PeerPort}, _) -> PeerPort; +i(host, #stream_connection{host = Host}, _) -> Host; +i(peer_host, #stream_connection{peer_host = PeerHost}, _) -> PeerHost; +i(user, #stream_connection{user = U}, _) -> U#user.username; +i(vhost, #stream_connection{virtual_host = VirtualHost}, _) -> VirtualHost; +i(subscriptions, _, #stream_connection_state{consumers = Consumers}) -> maps:size(Consumers); +i(connection_state, _Connection, #stream_connection_state{blocked = true}) -> blocked; +i(connection_state, _Connection, #stream_connection_state{blocked = false}) -> running; +i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> Name; +i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) -> Heartbeat; +i(frame_max, #stream_connection{frame_max = FrameMax}, _) -> FrameMax; +i(client_properties, #stream_connection{client_properties = CP}, _) -> CP; +i(connected_at, #stream_connection{connected_at = T}, _) -> T; +i(Item, #stream_connection{}, _) -> throw({bad_argument, Item}).
\ No newline at end of file diff --git a/deps/rabbitmq_stream/test/command_SUITE.erl b/deps/rabbitmq_stream/test/command_SUITE.erl new file mode 100644 index 0000000000..41ab5904ff --- /dev/null +++ b/deps/rabbitmq_stream/test/command_SUITE.erl @@ -0,0 +1,136 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(command_SUITE). +-compile([export_all]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("eunit/include/eunit.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_stream.hrl"). + + +-define(COMMAND, 'Elixir.RabbitMQ.CLI.Ctl.Commands.ListStreamConnectionsCommand'). + +all() -> + [ + {group, non_parallel_tests} + ]. + +groups() -> + [ + {non_parallel_tests, [], [ + merge_defaults, + run + ]} + ]. + +init_per_suite(Config) -> + Config1 = rabbit_ct_helpers:set_config(Config, + [{rmq_nodename_suffix, ?MODULE}]), + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config1, + rabbit_ct_broker_helpers:setup_steps()). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config, + rabbit_ct_broker_helpers:teardown_steps()). + +init_per_group(_, Config) -> + Config. + +end_per_group(_, Config) -> + Config. + +init_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_started(Config, Testcase). + +end_per_testcase(Testcase, Config) -> + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +merge_defaults(_Config) -> + {[<<"conn_name">>], #{verbose := false}} = + ?COMMAND:merge_defaults([], #{}), + + {[<<"other_key">>], #{verbose := true}} = + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => true}), + + {[<<"other_key">>], #{verbose := false}} = + ?COMMAND:merge_defaults([<<"other_key">>], #{verbose => false}). + + +run(Config) -> + + Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Opts = #{node => Node, timeout => 10000, verbose => false}, + + %% No connections + [] = 'Elixir.Enum':to_list(?COMMAND:run([], Opts)), + + StreamPort = rabbit_stream_SUITE:get_stream_port(Config), + + S1 = start_stream_connection(StreamPort), + ct:sleep(100), + + [[{conn_name, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)), + + S2 = start_stream_connection(StreamPort), + ct:sleep(100), + + [[{conn_name, _}], [{conn_name, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)), + + Port = rabbit_ct_broker_helpers:get_node_config(Config, 0, tcp_port_amqp), + start_amqp_connection(network, Node, Port), + + %% There are still just two connections + [[{conn_name, _}], [{conn_name, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)), + + start_amqp_connection(direct, Node, Port), + + %% Still two MQTT connections, one direct AMQP 0-9-1 connection + [[{conn_name, _}], [{conn_name, _}]] = + 'Elixir.Enum':to_list(?COMMAND:run([<<"conn_name">>], Opts)), + + %% Verbose returns all keys + Infos = lists:map(fun(El) -> atom_to_binary(El, utf8) end, ?INFO_ITEMS), + AllKeys = 'Elixir.Enum':to_list(?COMMAND:run(Infos, Opts)), + AllKeys = 'Elixir.Enum':to_list(?COMMAND:run([], Opts#{verbose => true})), + + %% There are two connections + [First, _Second] = AllKeys, + + %% Keys are INFO_ITEMS + KeysCount = length(?INFO_ITEMS), + KeysCount = length(First), + + {Keys, _} = lists:unzip(First), + + [] = Keys -- ?INFO_ITEMS, + [] = ?INFO_ITEMS -- Keys, + + rabbit_stream_SUITE:test_close(S1), + rabbit_stream_SUITE:test_close(S2), + ok. + +start_stream_connection(Port) -> + {ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, + {mode, binary}]), + rabbit_stream_SUITE:test_peer_properties(S), + rabbit_stream_SUITE:test_authenticate(S), + S. + +start_amqp_connection(Type, Node, Port) -> + Params = amqp_params(Type, Node, Port), + {ok, _Connection} = amqp_connection:start(Params). + +amqp_params(network, _, Port) -> + #amqp_params_network{port = Port}; +amqp_params(direct, Node, _) -> + #amqp_params_direct{node = Node}. |