diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-20 15:09:34 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-20 15:09:34 +0200 |
commit | 184f177d7844d79957a2b990375297d3452ba6be (patch) | |
tree | 510ed3a2313e5af2f3c92b635a3a3340828f823b | |
parent | cc030ac195c00dc7f34616f0d2303deee6864371 (diff) | |
download | rabbitmq-server-git-184f177d7844d79957a2b990375297d3452ba6be.tar.gz |
Emit stats for management
WIP
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream.erl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 14 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 152 |
3 files changed, 141 insertions, 27 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index 4f2d1eecfd..ac713a1aba 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -62,7 +62,7 @@ kill_connection(ConnectionName) -> lists:foreach(fun(ConnectionPid) -> ConnectionPid ! {infos, self()}, receive - {ConnectionPid, #{<<"name">> := ConnectionNameBin}} -> + {ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} -> exit(ConnectionPid, kill); {ConnectionPid, _ClientProperties} -> ok diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 9fa4e3521d..1a0d982884 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -76,6 +76,11 @@ stream_queue_arguments(ArgumentsAcc, #{<<"initial-cluster-size">> := Value} = Ar [{<<"x-initial-cluster-size">>, long, binary_to_integer(Value)}] ++ ArgumentsAcc, maps:remove(<<"initial-cluster-size">>, Arguments) ); +stream_queue_arguments(ArgumentsAcc, #{<<"queue-leader-locator">> := Value} = Arguments) -> + stream_queue_arguments( + [{<<"x-queue-leader-locator">>, longstr, Value}] ++ ArgumentsAcc, + maps:remove(<<"queue-leader-locator">>, Arguments) + ); stream_queue_arguments(ArgumentsAcc, _Arguments) -> ArgumentsAcc. @@ -83,6 +88,15 @@ validate_stream_queue_arguments([]) -> ok; validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSize} | _]) when ClusterSize =< 0 -> error; +validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) -> + case lists:member(Locator, [<<"client-local">>, + <<"random">>, + <<"least-leaders">>]) of + true -> + validate_stream_queue_arguments(T); + false -> + error + end; validate_stream_queue_arguments([_ | T]) -> validate_stream_queue_arguments(T). diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 5dc5d5fa38..f94a698042 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -60,7 +60,9 @@ heartbeat :: integer(), heartbeater :: any(), client_properties = #{} :: #{binary() => binary()}, - monitors = #{} :: #{reference() => binary()} + monitors = #{} :: #{reference() => binary()}, + stats_timer :: reference(), + send_file_oct :: atomics:atomics_ref() }). -record(configuration, { @@ -72,6 +74,16 @@ -define(RESPONSE_FRAME_SIZE, 10). % 2 (key) + 2 (version) + 4 (correlation ID) + 2 (response code) -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(CREATION_EVENT_KEYS, + [pid, name, port, peer_port, host, + peer_host, ssl, peer_cert_subject, peer_cert_issuer, + peer_cert_validity, auth_mechanism, ssl_protocol, + ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, + timeout, frame_max, channel_max, client_properties, connected_at, + node, user_who_performed_action]). +-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]). +-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels, garbage_collection, + timeout]). %% API -export([start_link/4, init/1, info/2]). @@ -93,11 +105,13 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, case rabbit_net:connection_string(Sock, inbound) of {ok, ConnStr} -> Credits = atomics:new(1, [{signed, true}]), + SendFileOct = atomics:new(1, [{signed, false}]), + atomics:put(SendFileOct, 1, 0), init_credit(Credits, InitialCredits), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), Connection = #stream_connection{ - name = ConnStr, + name = rabbit_data_coercion:to_binary(ConnStr), host = Host, peer_host = PeerHost, port = Port, @@ -110,7 +124,8 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, credits = Credits, authentication_state = none, connection_step = tcp_connected, - frame_max = FrameMax}, + frame_max = FrameMax, + send_file_oct = SendFileOct}, State = #stream_connection_state{ consumers = #{}, blocked = false, data = none }, @@ -173,7 +188,17 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta % just meant to be able to close the connection remotely % should be possible once the connections are available in ctl list_connections pg_local:join(rabbit_stream_connections, self()), - listen_loop_post_auth(Transport, Connection1, State1, Configuration); + Connection2 = rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer), + Connection3 = rabbit_event:ensure_stats_timer(Connection2, #stream_connection.stats_timer, emit_stats), + Infos = infos(?CREATION_EVENT_KEYS, Connection3, State1), + rabbit_core_metrics:connection_created(self(), Infos), + rabbit_event:notify(connection_created, Infos), + + % FIXME emit stats? + + % FIXME handle {'$gen_cast', {force_event_refresh, Ref}} event below? + + listen_loop_post_auth(Transport, Connection3, State1, Configuration); failure -> close(Transport, S); _ -> @@ -195,7 +220,8 @@ close(Transport, S) -> listen_loop_post_auth(Transport, #stream_connection{socket = S, stream_subscriptions = StreamSubscriptions, credits = Credits, - heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties} = Connection, + heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties, + send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers, blocked = Blocked} = State, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) -> {OK, Closed, Error} = Transport:messages(), @@ -205,7 +231,8 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, #stream_connection{connection_step = Step} = Connection1, case Step of closing -> - close(Transport, S); + close(Transport, S), + notify_connection_closed(Connection1, State1); close_sent -> rabbit_log:debug("Transitioned to close_sent ~n"), Transport:setopts(S, [{active, once}]), @@ -309,7 +336,8 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, _ -> {{segment, Segment1}, {credit, Credit1}} = send_chunks( Transport, - Consumer + Consumer, + SendFileOct ), Consumer#consumer{segment = Segment1, credit = Credit1} end, @@ -343,12 +371,17 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, {'$gen_call', From, {info, Items}} -> gen_server:reply(From, infos(Items, Connection, State)), listen_loop_post_auth(Transport, Connection, State, Configuration); + emit_stats -> + Connection1 = emit_stats(Connection, State), + listen_loop_post_auth(Transport, Connection1, State, Configuration); {Closed, S} -> demonitor_all_streams(Connection), + notify_connection_closed(Connection, State), rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), ok; {Error, S, Reason} -> demonitor_all_streams(Connection), + notify_connection_closed(Connection, State), rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]); M -> rabbit_log:warning("Unknown message ~p~n", [M]), @@ -368,17 +401,20 @@ listen_loop_post_close(Transport, #stream_connection{socket = S} = Connection, S case Step of closing_done -> rabbit_log:debug("Received close confirmation from client"), - close(Transport, S); + close(Transport, S), + notify_connection_closed(Connection1, State1); _ -> Transport:setopts(S, [{active, once}]), listen_loop_post_close(Transport, Connection1, State1, Configuration) end; {Closed, S} -> + notify_connection_closed(Connection, State), rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), ok; {Error, S, Reason} -> rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]), - close(Transport, S); + close(Transport, S), + notify_connection_closed(Connection, State); M -> rabbit_log:warning("Ignored message on closing ~p~n", [M]) end. @@ -621,7 +657,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi {Connection, State, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = Socket, - stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, user = User} = Connection, + stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, user = User, + send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary, OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) -> @@ -671,7 +708,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, {{segment, Segment1}, {credit, Credit1}} = send_chunks( Transport, - ConsumerState + ConsumerState, + SendFileOct ), Consumers1 = Consumers#{SubscriptionId => ConsumerState#consumer{segment = Segment1, credit = Credit1}}, @@ -725,7 +763,7 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre stream_leaders = StreamLeaders1 }, State#stream_connection_state{consumers = Consumers1}, Rest} end; -handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, +handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, <<?COMMAND_CREDIT:16, ?VERSION_0:16, SubscriptionId:8/unsigned, Credit:16/signed>>, Rest) -> @@ -736,7 +774,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, {{segment, Segment1}, {credit, Credit1}} = send_chunks( Transport, Consumer, - AvailableCredit + Credit + AvailableCredit + Credit, + SendFileOct ), Consumer1 = Consumer#consumer{segment = Segment1, credit = Credit1}, @@ -945,6 +984,15 @@ handle_frame_post_auth(Transport, Connection, State, Frame, Rest) -> frame(Transport, Connection, CloseFrame), {Connection#stream_connection{connection_step = close_sent}, State, Rest}. +notify_connection_closed(#stream_connection{name = Name} = Connection, ConnectionState) -> + rabbit_core_metrics:connection_closed(self()), + ClientProperties = i(client_properties, Connection, ConnectionState), + EventProperties = [{name, Name}, + {pid, self()}, + {node, node()}, + {client_properties, ClientProperties}], + rabbit_event:notify(connection_closed, EventProperties). + parse_map(<<>>, _Count) -> {#{}, <<>>}; parse_map(Content, 0) -> @@ -1072,38 +1120,40 @@ subscription_exists(StreamSubscriptions, SubscriptionId) -> SubscriptionIds = lists:flatten(maps:values(StreamSubscriptions)), lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds). -send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}) -> +send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}, Counter) -> fun(Size) -> FrameSize = 2 + 2 + 1 + Size, FrameBeginning = <<FrameSize:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8/unsigned>>, - Transport:send(S, FrameBeginning) + Transport:send(S, FrameBeginning), + atomics:add(Counter, 1, Size) end. -send_chunks(Transport, #consumer{credit = Credit} = State) -> - send_chunks(Transport, State, Credit). +send_chunks(Transport, #consumer{credit = Credit} = State, Counter) -> + send_chunks(Transport, State, Credit, Counter). -send_chunks(_Transport, #consumer{segment = Segment}, 0) -> +send_chunks(_Transport, #consumer{segment = Segment}, 0, _Counter) -> {{segment, Segment}, {credit, 0}}; -send_chunks(Transport, #consumer{segment = Segment} = State, Credit) -> - send_chunks(Transport, State, Segment, Credit, true). +send_chunks(Transport, #consumer{segment = Segment} = State, Credit, Counter) -> + send_chunks(Transport, State, Segment, Credit, true, Counter). -send_chunks(_Transport, _State, Segment, 0 = _Credit, _Retry) -> +send_chunks(_Transport, _State, Segment, 0 = _Credit, _Retry, _Counter) -> {{segment, Segment}, {credit, 0}}; -send_chunks(Transport, #consumer{socket = S} = State, Segment, Credit, Retry) -> - case osiris_log:send_file(S, Segment, send_file_callback(Transport, State)) of +send_chunks(Transport, #consumer{socket = S} = State, Segment, Credit, Retry, Counter) -> + case osiris_log:send_file(S, Segment, send_file_callback(Transport, State, Counter)) of {ok, Segment1} -> send_chunks( Transport, State, Segment1, Credit - 1, - true + true, + Counter ); {end_of_stream, Segment1} -> case Retry of true -> timer:sleep(1), - send_chunks(Transport, State, Segment1, Credit, false); + send_chunks(Transport, State, Segment1, Credit, false, Counter); false -> #consumer{member_pid = LocalMember} = State, osiris:register_offset_listener(LocalMember, osiris_log:next_offset(Segment1)), @@ -1142,6 +1192,19 @@ check_write_permitted(Resource, User, Context) -> check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). +emit_stats(Connection, ConnectionState) -> + [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I + = infos(?SIMPLE_METRICS, Connection, ConnectionState), + Infos = infos(?OTHER_METRICS, Connection, ConnectionState), + rabbit_core_metrics:connection_stats(Pid, Infos), + rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions), + rabbit_event:notify(connection_stats, Infos ++ I), + Connection1 = rabbit_event:reset_stats_timer(Connection, #stream_connection.stats_timer), + ensure_stats_timer(Connection1). + +ensure_stats_timer(Connection = #stream_connection{}) -> + rabbit_event:ensure_stats_timer(Connection, #stream_connection.stats_timer, emit_stats). + info(Pid, InfoItems) -> case InfoItems -- ?INFO_ITEMS of [] -> @@ -1151,11 +1214,47 @@ info(Pid, InfoItems) -> infos(Items, Connection, State) -> [{Item, i(Item, Connection, State)} || Item <- Items]. +i(pid, _, _) -> self(); +i(node, _, _) -> node(); +i(SockStat, #stream_connection{socket = Sock, send_file_oct = Counter}, _) when + SockStat =:= send_oct -> % Number of bytes sent from the socket. + case rabbit_net:getstat(Sock, [SockStat]) of + {ok, [{_, N}]} when is_number(N) -> N + atomics:get(Counter, 1); + _ -> 0 + atomics:get(Counter, 1) + end; +i(SockStat, #stream_connection{socket = Sock}, _) when + SockStat =:= recv_oct; % Number of bytes received by the socket. + SockStat =:= recv_cnt; % Number of packets received by the socket. + SockStat =:= send_cnt; % Number of packets sent from the socket. + SockStat =:= send_pend -> % Number of bytes waiting to be sent by the socket. + case rabbit_net:getstat(Sock, [SockStat]) of + {ok, [{_, N}]} when is_number(N) -> N; + _ -> 0 + end; +i(reductions, _, _) -> + {reductions, Reductions} = erlang:process_info(self(), reductions), + Reductions; +i(garbage_collection, _, _) -> + rabbit_misc:get_gc_info(self()); +i(state, Connection, ConnectionState) -> i(connection_state, Connection, ConnectionState); +i(timeout, Connection, ConnectionState) -> i(heartbeat, Connection, ConnectionState); +i(name, Connection, ConnectionState) -> i(conn_name, Connection, ConnectionState); i(conn_name, #stream_connection{name = Name}, _) -> Name; i(port, #stream_connection{port = Port}, _) -> Port; i(peer_port, #stream_connection{peer_port = PeerPort}, _) -> PeerPort; i(host, #stream_connection{host = Host}, _) -> Host; i(peer_host, #stream_connection{peer_host = PeerHost}, _) -> PeerHost; +i(ssl, _, _) -> false; +i(peer_cert_subject, _, _) -> ''; +i(peer_cert_issuer, _, _) -> ''; +i(peer_cert_validity, _, _) -> ''; +i(ssl_protocol, _, _) -> ''; +i(ssl_key_exchange, _, _) -> ''; +i(ssl_cipher, _, _) -> ''; +i(ssl_hash, _, _) -> ''; +i(channels, _, _) -> 0; +i(protocol, _, _) -> {<<"stream">>, ""}; +i(user_who_performed_action, Connection, ConnectionState) -> i(user, Connection, ConnectionState); i(user, #stream_connection{user = U}, _) -> U#user.username; i(vhost, #stream_connection{virtual_host = VirtualHost}, _) -> VirtualHost; i(subscriptions, _, #stream_connection_state{consumers = Consumers}) -> maps:size(Consumers); @@ -1164,6 +1263,7 @@ i(connection_state, _Connection, #stream_connection_state{blocked = false}) -> i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> Name; i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) -> Heartbeat; i(frame_max, #stream_connection{frame_max = FrameMax}, _) -> FrameMax; -i(client_properties, #stream_connection{client_properties = CP}, _) -> CP; +i(channel_max, _, _) -> 0; +i(client_properties, #stream_connection{client_properties = CP}, _) -> rabbit_misc:to_amqp_table(CP); i(connected_at, #stream_connection{connected_at = T}, _) -> T; i(Item, #stream_connection{}, _) -> throw({bad_argument, Item}).
\ No newline at end of file |