diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-23 15:43:11 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-10-23 15:43:11 +0200 |
commit | 34711b405893e3b1457fe0c7d55b1153249f5c94 (patch) | |
tree | 76db69d8959921c2f887bedb0b77d698e46fd845 | |
parent | 43898e59b1331fc23251a68d3be983501f54686e (diff) | |
parent | 28133566828b71c427a009d9d1860931b4ff092c (diff) | |
download | rabbitmq-server-git-34711b405893e3b1457fe0c7d55b1153249f5c94.tar.gz |
Merge pull request #1 from rabbitmq/management-integration
Emit stats for management
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream.erl | 2 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_manager.erl | 4 | ||||
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 273 |
3 files changed, 224 insertions, 55 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl index 4f2d1eecfd..ac713a1aba 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream.erl @@ -62,7 +62,7 @@ kill_connection(ConnectionName) -> lists:foreach(fun(ConnectionPid) -> ConnectionPid ! {infos, self()}, receive - {ConnectionPid, #{<<"name">> := ConnectionNameBin}} -> + {ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} -> exit(ConnectionPid, kill); {ConnectionPid, _ClientProperties} -> ok diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl index 39ab1a6d8a..5b318e0635 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl @@ -95,8 +95,8 @@ validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSiz error; validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) -> case lists:member(Locator, [<<"client-local">>, - <<"random">>, - <<"least-leaders">>]) of + <<"random">>, + <<"least-leaders">>]) of true -> validate_stream_queue_arguments(T); false -> diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index 5dc5d5fa38..6d94b43af9 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -60,7 +60,9 @@ heartbeat :: integer(), heartbeater :: any(), client_properties = #{} :: #{binary() => binary()}, - monitors = #{} :: #{reference() => binary()} + monitors = #{} :: #{reference() => binary()}, + stats_timer :: reference(), + send_file_oct :: atomics:atomics_ref() }). -record(configuration, { @@ -72,6 +74,20 @@ -define(RESPONSE_FRAME_SIZE, 10). % 2 (key) + 2 (version) + 4 (correlation ID) + 2 (response code) -define(MAX_PERMISSION_CACHE_SIZE, 12). +-define(CREATION_EVENT_KEYS, + [pid, name, port, peer_port, host, + peer_host, ssl, peer_cert_subject, peer_cert_issuer, + peer_cert_validity, auth_mechanism, ssl_protocol, + ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost, + timeout, frame_max, channel_max, client_properties, connected_at, + node, user_who_performed_action]). +-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]). +-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels, garbage_collection, + timeout]). +-define(AUTH_NOTIFICATION_INFO_KEYS, + [host, name, peer_host, peer_port, protocol, auth_mechanism, + ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject, + peer_cert_validity]). %% API -export([start_link/4, init/1, info/2]). @@ -93,16 +109,19 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, case rabbit_net:connection_string(Sock, inbound) of {ok, ConnStr} -> Credits = atomics:new(1, [{signed, true}]), + SendFileOct = atomics:new(1, [{signed, false}]), + atomics:put(SendFileOct, 1, 0), init_credit(Credits, InitialCredits), {PeerHost, PeerPort, Host, Port} = socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end), Connection = #stream_connection{ - name = ConnStr, + name = rabbit_data_coercion:to_binary(ConnStr), host = Host, peer_host = PeerHost, port = Port, peer_port = PeerPort, connected_at = os:system_time(milli_seconds), + auth_mechanism = none, helper_sup = KeepaliveSup, socket = RealSocket, stream_leaders = #{}, @@ -110,7 +129,8 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, credits = Credits, authentication_state = none, connection_step = tcp_connected, - frame_max = FrameMax}, + frame_max = FrameMax, + send_file_oct = SendFileOct}, State = #stream_connection_state{ consumers = #{}, blocked = false, data = none }, @@ -173,7 +193,16 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta % just meant to be able to close the connection remotely % should be possible once the connections are available in ctl list_connections pg_local:join(rabbit_stream_connections, self()), - listen_loop_post_auth(Transport, Connection1, State1, Configuration); + Connection2 = rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer), + Connection3 = ensure_stats_timer(Connection2), + Infos = augment_infos_with_user_provided_connection_name( + infos(?CREATION_EVENT_KEYS, Connection3, State1), + Connection3 + ), + rabbit_core_metrics:connection_created(self(), Infos), + rabbit_event:notify(connection_created, Infos), + rabbit_networking:register_non_amqp_connection(self()), + listen_loop_post_auth(Transport, Connection3, State1, Configuration); failure -> close(Transport, S); _ -> @@ -189,15 +218,25 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta close(Transport, S) end. +augment_infos_with_user_provided_connection_name(Infos, #stream_connection{client_properties = ClientProperties}) -> + case ClientProperties of + #{<<"connection_name">> := UserProvidedConnectionName} -> + [{user_provided_name, UserProvidedConnectionName} | Infos]; + _ -> + Infos + end. + close(Transport, S) -> Transport:shutdown(S, write), Transport:close(S). listen_loop_post_auth(Transport, #stream_connection{socket = S, stream_subscriptions = StreamSubscriptions, credits = Credits, - heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties} = Connection, + heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties, + send_file_oct = SendFileOct} = Connection0, #stream_connection_state{consumers = Consumers, blocked = Blocked} = State, #configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) -> + Connection = ensure_stats_timer(Connection0), {OK, Closed, Error} = Transport:messages(), receive {OK, S, Data} -> @@ -205,7 +244,9 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, #stream_connection{connection_step = Step} = Connection1, case Step of closing -> - close(Transport, S); + close(Transport, S), + rabbit_networking:unregister_non_amqp_connection(self()), + notify_connection_closed(Connection1, State1); close_sent -> rabbit_log:debug("Transitioned to close_sent ~n"), Transport:setopts(S, [{active, once}]), @@ -309,7 +350,8 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, _ -> {{segment, Segment1}, {credit, Credit1}} = send_chunks( Transport, - Consumer + Consumer, + SendFileOct ), Consumer#consumer{segment = Segment1, credit = Credit1} end, @@ -343,12 +385,36 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S, {'$gen_call', From, {info, Items}} -> gen_server:reply(From, infos(Items, Connection, State)), listen_loop_post_auth(Transport, Connection, State, Configuration); + emit_stats -> + Connection1 = emit_stats(Connection, State), + listen_loop_post_auth(Transport, Connection1, State, Configuration); + {'$gen_cast', {force_event_refresh, Ref}} -> + Infos = augment_infos_with_user_provided_connection_name( + infos(?CREATION_EVENT_KEYS, Connection, State), + Connection + ), + rabbit_event:notify(connection_created, Infos, Ref), + Connection1 = rabbit_event:init_stats_timer(Connection, #stream_connection.stats_timer), + listen_loop_post_auth(Transport, Connection1, State, Configuration); + {'$gen_call', From, {shutdown, Explanation}} -> + % likely closing call from the management plugin + gen_server:reply(From, ok), + rabbit_log:info("Forcing stream connection ~p closing: ~p~n", [self(), Explanation]), + demonitor_all_streams(Connection), + rabbit_networking:unregister_non_amqp_connection(self()), + notify_connection_closed(Connection, State), + close(Transport, S), + ok; {Closed, S} -> demonitor_all_streams(Connection), + rabbit_networking:unregister_non_amqp_connection(self()), + notify_connection_closed(Connection, State), rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), ok; {Error, S, Reason} -> demonitor_all_streams(Connection), + rabbit_networking:unregister_non_amqp_connection(self()), + notify_connection_closed(Connection, State), rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]); M -> rabbit_log:warning("Unknown message ~p~n", [M]), @@ -368,17 +434,23 @@ listen_loop_post_close(Transport, #stream_connection{socket = S} = Connection, S case Step of closing_done -> rabbit_log:debug("Received close confirmation from client"), - close(Transport, S); + close(Transport, S), + rabbit_networking:unregister_non_amqp_connection(self()), + notify_connection_closed(Connection1, State1); _ -> Transport:setopts(S, [{active, once}]), listen_loop_post_close(Transport, Connection1, State1, Configuration) end; {Closed, S} -> + rabbit_networking:unregister_non_amqp_connection(self()), + notify_connection_closed(Connection, State), rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]), ok; {Error, S, Reason} -> rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]), - close(Transport, S); + close(Transport, S), + rabbit_networking:unregister_non_amqp_connection(self()), + notify_connection_closed(Connection, State); M -> rabbit_log:warning("Ignored message on closing ~p~n", [M]) end. @@ -491,7 +563,10 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, St Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]), {Connection, State, Rest}; -handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_state = AuthState0} = Connection0, State, +handle_frame_pre_auth(Transport, + #stream_connection{socket = S, + authentication_state = AuthState0, + host = Host} = Connection0, State, <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, MechanismLength:16, Mechanism:MechanismLength/binary, SaslFragment/binary>>, Rest) -> @@ -511,32 +586,46 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_s AS -> AS end, - {S1, FrameFragment} = case AuthMechanism:handle_response(SaslBin, AuthState) of - {refused, _Username, Msg, Args} -> - rabbit_log:warning(Msg, Args), - {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>}; - {protocol_error, Msg, Args} -> - rabbit_log:warning(Msg, Args), - {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>}; - {challenge, Challenge, AuthState1} -> - ChallengeSize = byte_size(Challenge), - {Connection0#stream_connection{authentication_state = AuthState1, connection_step = authenticating}, - <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>> - }; - {ok, User = #user{username = Username}} -> - case rabbit_access_control:check_user_loopback(Username, S) of - ok -> - {Connection0#stream_connection{authentication_state = done, user = User, connection_step = authenticated}, - <<?RESPONSE_CODE_OK:16>> - }; - not_allowed -> - rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]), - {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>} - end - end, + RemoteAddress = list_to_binary(inet:ntoa(Host)), + C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, + {C2, FrameFragment} = + case AuthMechanism:handle_response(SaslBin, AuthState) of + {refused, Username, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream), + auth_fail(Username, Msg, Args, C1, State), + rabbit_log:warning(Msg, Args), + {C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>}; + {protocol_error, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream), + notify_auth_result(none, user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], + C1, State), + rabbit_log:warning(Msg, Args), + {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>}; + {challenge, Challenge, AuthState1} -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream), + ChallengeSize = byte_size(Challenge), + {C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating}, + <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>> + }; + {ok, User = #user{username = Username}} -> + case rabbit_access_control:check_user_loopback(Username, S) of + ok -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream), + notify_auth_result(Username, user_authentication_success, + [], C1, State), + {C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated}, + <<?RESPONSE_CODE_OK:16>> + }; + not_allowed -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream), + rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]), + {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>} + end + end, Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>, - frame(Transport, S1, Frame), - {S1#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, Rest}; + frame(Transport, C1, Frame), + {C2, Rest}; {error, _} -> Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>, frame(Transport, Connection0, Frame), @@ -591,6 +680,20 @@ handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) -> rabbit_log:warning("unknown frame ~p ~p, closing connection.~n", [Frame, Rest]), {Connection#stream_connection{connection_step = failure}, State, Rest}. +auth_fail(Username, Msg, Args, Connection, ConnectionState) -> + notify_auth_result(Username, user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], Connection, ConnectionState). + +notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState) -> + EventProps = [{connection_type, network}, + {name, case Username of none -> ''; _ -> Username end}] ++ + [case Item of + name -> {connection_name, i(name, Connection, ConnectionState)}; + _ -> {Item, i(Item, Connection, ConnectionState)} + end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++ + ExtraProps, + rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). + handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits, virtual_host = VirtualHost, user = User} = Connection, State, <<?COMMAND_PUBLISH:16, ?VERSION_0:16, @@ -621,7 +724,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi {Connection, State, Rest} end; handle_frame_post_auth(Transport, #stream_connection{socket = Socket, - stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, user = User} = Connection, + stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, user = User, + send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, <<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary, OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) -> @@ -671,7 +775,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket, {{segment, Segment1}, {credit, Credit1}} = send_chunks( Transport, - ConsumerState + ConsumerState, + SendFileOct ), Consumers1 = Consumers#{SubscriptionId => ConsumerState#consumer{segment = Segment1, credit = Credit1}}, @@ -725,7 +830,7 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre stream_leaders = StreamLeaders1 }, State#stream_connection_state{consumers = Consumers1}, Rest} end; -handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, +handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection, #stream_connection_state{consumers = Consumers} = State, <<?COMMAND_CREDIT:16, ?VERSION_0:16, SubscriptionId:8/unsigned, Credit:16/signed>>, Rest) -> @@ -736,7 +841,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection, {{segment, Segment1}, {credit, Credit1}} = send_chunks( Transport, Consumer, - AvailableCredit + Credit + AvailableCredit + Credit, + SendFileOct ), Consumer1 = Consumer#consumer{segment = Segment1, credit = Credit1}, @@ -945,6 +1051,16 @@ handle_frame_post_auth(Transport, Connection, State, Frame, Rest) -> frame(Transport, Connection, CloseFrame), {Connection#stream_connection{connection_step = close_sent}, State, Rest}. +notify_connection_closed(#stream_connection{name = Name} = Connection, ConnectionState) -> + rabbit_core_metrics:connection_closed(self()), + ClientProperties = i(client_properties, Connection, ConnectionState), + EventProperties = [{name, Name}, + {pid, self()}, + {node, node()}, + {client_properties, ClientProperties}], + rabbit_event:notify(connection_closed, + augment_infos_with_user_provided_connection_name(EventProperties, Connection)). + parse_map(<<>>, _Count) -> {#{}, <<>>}; parse_map(Content, 0) -> @@ -1072,38 +1188,40 @@ subscription_exists(StreamSubscriptions, SubscriptionId) -> SubscriptionIds = lists:flatten(maps:values(StreamSubscriptions)), lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds). -send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}) -> +send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}, Counter) -> fun(Size) -> FrameSize = 2 + 2 + 1 + Size, FrameBeginning = <<FrameSize:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8/unsigned>>, - Transport:send(S, FrameBeginning) + Transport:send(S, FrameBeginning), + atomics:add(Counter, 1, Size) end. -send_chunks(Transport, #consumer{credit = Credit} = State) -> - send_chunks(Transport, State, Credit). +send_chunks(Transport, #consumer{credit = Credit} = State, Counter) -> + send_chunks(Transport, State, Credit, Counter). -send_chunks(_Transport, #consumer{segment = Segment}, 0) -> +send_chunks(_Transport, #consumer{segment = Segment}, 0, _Counter) -> {{segment, Segment}, {credit, 0}}; -send_chunks(Transport, #consumer{segment = Segment} = State, Credit) -> - send_chunks(Transport, State, Segment, Credit, true). +send_chunks(Transport, #consumer{segment = Segment} = State, Credit, Counter) -> + send_chunks(Transport, State, Segment, Credit, true, Counter). -send_chunks(_Transport, _State, Segment, 0 = _Credit, _Retry) -> +send_chunks(_Transport, _State, Segment, 0 = _Credit, _Retry, _Counter) -> {{segment, Segment}, {credit, 0}}; -send_chunks(Transport, #consumer{socket = S} = State, Segment, Credit, Retry) -> - case osiris_log:send_file(S, Segment, send_file_callback(Transport, State)) of +send_chunks(Transport, #consumer{socket = S} = State, Segment, Credit, Retry, Counter) -> + case osiris_log:send_file(S, Segment, send_file_callback(Transport, State, Counter)) of {ok, Segment1} -> send_chunks( Transport, State, Segment1, Credit - 1, - true + true, + Counter ); {end_of_stream, Segment1} -> case Retry of true -> timer:sleep(1), - send_chunks(Transport, State, Segment1, Credit, false); + send_chunks(Transport, State, Segment1, Credit, false, Counter); false -> #consumer{member_pid = LocalMember} = State, osiris:register_offset_listener(LocalMember, osiris_log:next_offset(Segment1)), @@ -1142,6 +1260,19 @@ check_write_permitted(Resource, User, Context) -> check_read_permitted(Resource, User, Context) -> check_resource_access(User, Resource, read, Context). +emit_stats(Connection, ConnectionState) -> + [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I + = infos(?SIMPLE_METRICS, Connection, ConnectionState), + Infos = infos(?OTHER_METRICS, Connection, ConnectionState), + rabbit_core_metrics:connection_stats(Pid, Infos), + rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions), + rabbit_event:notify(connection_stats, Infos ++ I), + Connection1 = rabbit_event:reset_stats_timer(Connection, #stream_connection.stats_timer), + ensure_stats_timer(Connection1). + +ensure_stats_timer(Connection = #stream_connection{}) -> + rabbit_event:ensure_stats_timer(Connection, #stream_connection.stats_timer, emit_stats). + info(Pid, InfoItems) -> case InfoItems -- ?INFO_ITEMS of [] -> @@ -1151,19 +1282,57 @@ info(Pid, InfoItems) -> infos(Items, Connection, State) -> [{Item, i(Item, Connection, State)} || Item <- Items]. +i(pid, _, _) -> self(); +i(node, _, _) -> node(); +i(SockStat, #stream_connection{socket = Sock, send_file_oct = Counter}, _) when + SockStat =:= send_oct -> % Number of bytes sent from the socket. + case rabbit_net:getstat(Sock, [SockStat]) of + {ok, [{_, N}]} when is_number(N) -> N + atomics:get(Counter, 1); + _ -> 0 + atomics:get(Counter, 1) + end; +i(SockStat, #stream_connection{socket = Sock}, _) when + SockStat =:= recv_oct; % Number of bytes received by the socket. + SockStat =:= recv_cnt; % Number of packets received by the socket. + SockStat =:= send_cnt; % Number of packets sent from the socket. + SockStat =:= send_pend -> % Number of bytes waiting to be sent by the socket. + case rabbit_net:getstat(Sock, [SockStat]) of + {ok, [{_, N}]} when is_number(N) -> N; + _ -> 0 + end; +i(reductions, _, _) -> + {reductions, Reductions} = erlang:process_info(self(), reductions), + Reductions; +i(garbage_collection, _, _) -> + rabbit_misc:get_gc_info(self()); +i(state, Connection, ConnectionState) -> i(connection_state, Connection, ConnectionState); +i(timeout, Connection, ConnectionState) -> i(heartbeat, Connection, ConnectionState); +i(name, Connection, ConnectionState) -> i(conn_name, Connection, ConnectionState); i(conn_name, #stream_connection{name = Name}, _) -> Name; i(port, #stream_connection{port = Port}, _) -> Port; i(peer_port, #stream_connection{peer_port = PeerPort}, _) -> PeerPort; i(host, #stream_connection{host = Host}, _) -> Host; i(peer_host, #stream_connection{peer_host = PeerHost}, _) -> PeerHost; +i(ssl, _, _) -> false; +i(peer_cert_subject, _, _) -> ''; +i(peer_cert_issuer, _, _) -> ''; +i(peer_cert_validity, _, _) -> ''; +i(ssl_protocol, _, _) -> ''; +i(ssl_key_exchange, _, _) -> ''; +i(ssl_cipher, _, _) -> ''; +i(ssl_hash, _, _) -> ''; +i(channels, _, _) -> 0; +i(protocol, _, _) -> {<<"stream">>, ""}; +i(user_who_performed_action, Connection, ConnectionState) -> i(user, Connection, ConnectionState); i(user, #stream_connection{user = U}, _) -> U#user.username; i(vhost, #stream_connection{virtual_host = VirtualHost}, _) -> VirtualHost; i(subscriptions, _, #stream_connection_state{consumers = Consumers}) -> maps:size(Consumers); i(connection_state, _Connection, #stream_connection_state{blocked = true}) -> blocked; i(connection_state, _Connection, #stream_connection_state{blocked = false}) -> running; +i(auth_mechanism, #stream_connection{auth_mechanism = none}, _) -> none; i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> Name; i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) -> Heartbeat; i(frame_max, #stream_connection{frame_max = FrameMax}, _) -> FrameMax; -i(client_properties, #stream_connection{client_properties = CP}, _) -> CP; +i(channel_max, _, _) -> 0; +i(client_properties, #stream_connection{client_properties = CP}, _) -> rabbit_misc:to_amqp_table(CP); i(connected_at, #stream_connection{connected_at = T}, _) -> T; i(Item, #stream_connection{}, _) -> throw({bad_argument, Item}).
\ No newline at end of file |