summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-23 15:43:11 +0200
committerGitHub <noreply@github.com>2020-10-23 15:43:11 +0200
commit34711b405893e3b1457fe0c7d55b1153249f5c94 (patch)
tree76db69d8959921c2f887bedb0b77d698e46fd845
parent43898e59b1331fc23251a68d3be983501f54686e (diff)
parent28133566828b71c427a009d9d1860931b4ff092c (diff)
downloadrabbitmq-server-git-34711b405893e3b1457fe0c7d55b1153249f5c94.tar.gz
Merge pull request #1 from rabbitmq/management-integration
Emit stats for management
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream.erl2
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_manager.erl4
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl273
3 files changed, 224 insertions, 55 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream.erl b/deps/rabbitmq_stream/src/rabbit_stream.erl
index 4f2d1eecfd..ac713a1aba 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream.erl
@@ -62,7 +62,7 @@ kill_connection(ConnectionName) ->
lists:foreach(fun(ConnectionPid) ->
ConnectionPid ! {infos, self()},
receive
- {ConnectionPid, #{<<"name">> := ConnectionNameBin}} ->
+ {ConnectionPid, #{<<"connection_name">> := ConnectionNameBin}} ->
exit(ConnectionPid, kill);
{ConnectionPid, _ClientProperties} ->
ok
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
index 39ab1a6d8a..5b318e0635 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_manager.erl
@@ -95,8 +95,8 @@ validate_stream_queue_arguments([{<<"x-initial-cluster-size">>, long, ClusterSiz
error;
validate_stream_queue_arguments([{<<"x-queue-leader-locator">>, longstr, Locator} | T]) ->
case lists:member(Locator, [<<"client-local">>,
- <<"random">>,
- <<"least-leaders">>]) of
+ <<"random">>,
+ <<"least-leaders">>]) of
true ->
validate_stream_queue_arguments(T);
false ->
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index 5dc5d5fa38..6d94b43af9 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -60,7 +60,9 @@
heartbeat :: integer(),
heartbeater :: any(),
client_properties = #{} :: #{binary() => binary()},
- monitors = #{} :: #{reference() => binary()}
+ monitors = #{} :: #{reference() => binary()},
+ stats_timer :: reference(),
+ send_file_oct :: atomics:atomics_ref()
}).
-record(configuration, {
@@ -72,6 +74,20 @@
-define(RESPONSE_FRAME_SIZE, 10). % 2 (key) + 2 (version) + 4 (correlation ID) + 2 (response code)
-define(MAX_PERMISSION_CACHE_SIZE, 12).
+-define(CREATION_EVENT_KEYS,
+ [pid, name, port, peer_port, host,
+ peer_host, ssl, peer_cert_subject, peer_cert_issuer,
+ peer_cert_validity, auth_mechanism, ssl_protocol,
+ ssl_key_exchange, ssl_cipher, ssl_hash, protocol, user, vhost,
+ timeout, frame_max, channel_max, client_properties, connected_at,
+ node, user_who_performed_action]).
+-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
+-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels, garbage_collection,
+ timeout]).
+-define(AUTH_NOTIFICATION_INFO_KEYS,
+ [host, name, peer_host, peer_port, protocol, auth_mechanism,
+ ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject,
+ peer_cert_validity]).
%% API
-export([start_link/4, init/1, info/2]).
@@ -93,16 +109,19 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
case rabbit_net:connection_string(Sock, inbound) of
{ok, ConnStr} ->
Credits = atomics:new(1, [{signed, true}]),
+ SendFileOct = atomics:new(1, [{signed, false}]),
+ atomics:put(SendFileOct, 1, 0),
init_credit(Credits, InitialCredits),
{PeerHost, PeerPort, Host, Port} =
socket_op(Sock, fun (S) -> rabbit_net:socket_ends(S, inbound) end),
Connection = #stream_connection{
- name = ConnStr,
+ name = rabbit_data_coercion:to_binary(ConnStr),
host = Host,
peer_host = PeerHost,
port = Port,
peer_port = PeerPort,
connected_at = os:system_time(milli_seconds),
+ auth_mechanism = none,
helper_sup = KeepaliveSup,
socket = RealSocket,
stream_leaders = #{},
@@ -110,7 +129,8 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
credits = Credits,
authentication_state = none,
connection_step = tcp_connected,
- frame_max = FrameMax},
+ frame_max = FrameMax,
+ send_file_oct = SendFileOct},
State = #stream_connection_state{
consumers = #{}, blocked = false, data = none
},
@@ -173,7 +193,16 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta
% just meant to be able to close the connection remotely
% should be possible once the connections are available in ctl list_connections
pg_local:join(rabbit_stream_connections, self()),
- listen_loop_post_auth(Transport, Connection1, State1, Configuration);
+ Connection2 = rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer),
+ Connection3 = ensure_stats_timer(Connection2),
+ Infos = augment_infos_with_user_provided_connection_name(
+ infos(?CREATION_EVENT_KEYS, Connection3, State1),
+ Connection3
+ ),
+ rabbit_core_metrics:connection_created(self(), Infos),
+ rabbit_event:notify(connection_created, Infos),
+ rabbit_networking:register_non_amqp_connection(self()),
+ listen_loop_post_auth(Transport, Connection3, State1, Configuration);
failure ->
close(Transport, S);
_ ->
@@ -189,15 +218,25 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta
close(Transport, S)
end.
+augment_infos_with_user_provided_connection_name(Infos, #stream_connection{client_properties = ClientProperties}) ->
+ case ClientProperties of
+ #{<<"connection_name">> := UserProvidedConnectionName} ->
+ [{user_provided_name, UserProvidedConnectionName} | Infos];
+ _ ->
+ Infos
+ end.
+
close(Transport, S) ->
Transport:shutdown(S, write),
Transport:close(S).
listen_loop_post_auth(Transport, #stream_connection{socket = S,
stream_subscriptions = StreamSubscriptions, credits = Credits,
- heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties} = Connection,
+ heartbeater = Heartbeater, monitors = Monitors, client_properties = ClientProperties,
+ send_file_oct = SendFileOct} = Connection0,
#stream_connection_state{consumers = Consumers, blocked = Blocked} = State,
#configuration{credits_required_for_unblocking = CreditsRequiredForUnblocking} = Configuration) ->
+ Connection = ensure_stats_timer(Connection0),
{OK, Closed, Error} = Transport:messages(),
receive
{OK, S, Data} ->
@@ -205,7 +244,9 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
#stream_connection{connection_step = Step} = Connection1,
case Step of
closing ->
- close(Transport, S);
+ close(Transport, S),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection1, State1);
close_sent ->
rabbit_log:debug("Transitioned to close_sent ~n"),
Transport:setopts(S, [{active, once}]),
@@ -309,7 +350,8 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
_ ->
{{segment, Segment1}, {credit, Credit1}} = send_chunks(
Transport,
- Consumer
+ Consumer,
+ SendFileOct
),
Consumer#consumer{segment = Segment1, credit = Credit1}
end,
@@ -343,12 +385,36 @@ listen_loop_post_auth(Transport, #stream_connection{socket = S,
{'$gen_call', From, {info, Items}} ->
gen_server:reply(From, infos(Items, Connection, State)),
listen_loop_post_auth(Transport, Connection, State, Configuration);
+ emit_stats ->
+ Connection1 = emit_stats(Connection, State),
+ listen_loop_post_auth(Transport, Connection1, State, Configuration);
+ {'$gen_cast', {force_event_refresh, Ref}} ->
+ Infos = augment_infos_with_user_provided_connection_name(
+ infos(?CREATION_EVENT_KEYS, Connection, State),
+ Connection
+ ),
+ rabbit_event:notify(connection_created, Infos, Ref),
+ Connection1 = rabbit_event:init_stats_timer(Connection, #stream_connection.stats_timer),
+ listen_loop_post_auth(Transport, Connection1, State, Configuration);
+ {'$gen_call', From, {shutdown, Explanation}} ->
+ % likely closing call from the management plugin
+ gen_server:reply(From, ok),
+ rabbit_log:info("Forcing stream connection ~p closing: ~p~n", [self(), Explanation]),
+ demonitor_all_streams(Connection),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State),
+ close(Transport, S),
+ ok;
{Closed, S} ->
demonitor_all_streams(Connection),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State),
rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
ok;
{Error, S, Reason} ->
demonitor_all_streams(Connection),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State),
rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]);
M ->
rabbit_log:warning("Unknown message ~p~n", [M]),
@@ -368,17 +434,23 @@ listen_loop_post_close(Transport, #stream_connection{socket = S} = Connection, S
case Step of
closing_done ->
rabbit_log:debug("Received close confirmation from client"),
- close(Transport, S);
+ close(Transport, S),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection1, State1);
_ ->
Transport:setopts(S, [{active, once}]),
listen_loop_post_close(Transport, Connection1, State1, Configuration)
end;
{Closed, S} ->
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State),
rabbit_log:info("Socket ~w closed [~w]~n", [S, self()]),
ok;
{Error, S, Reason} ->
rabbit_log:info("Socket error ~p [~w]~n", [Reason, S, self()]),
- close(Transport, S);
+ close(Transport, S),
+ rabbit_networking:unregister_non_amqp_connection(self()),
+ notify_connection_closed(Connection, State);
M ->
rabbit_log:warning("Ignored message on closing ~p~n", [M])
end.
@@ -491,7 +563,10 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, St
Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]),
{Connection, State, Rest};
-handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_state = AuthState0} = Connection0, State,
+handle_frame_pre_auth(Transport,
+ #stream_connection{socket = S,
+ authentication_state = AuthState0,
+ host = Host} = Connection0, State,
<<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32,
MechanismLength:16, Mechanism:MechanismLength/binary,
SaslFragment/binary>>, Rest) ->
@@ -511,32 +586,46 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_s
AS ->
AS
end,
- {S1, FrameFragment} = case AuthMechanism:handle_response(SaslBin, AuthState) of
- {refused, _Username, Msg, Args} ->
- rabbit_log:warning(Msg, Args),
- {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
- {protocol_error, Msg, Args} ->
- rabbit_log:warning(Msg, Args),
- {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
- {challenge, Challenge, AuthState1} ->
- ChallengeSize = byte_size(Challenge),
- {Connection0#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
- <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
- };
- {ok, User = #user{username = Username}} ->
- case rabbit_access_control:check_user_loopback(Username, S) of
- ok ->
- {Connection0#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
- <<?RESPONSE_CODE_OK:16>>
- };
- not_allowed ->
- rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
- {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
- end
- end,
+ RemoteAddress = list_to_binary(inet:ntoa(Host)),
+ C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}},
+ {C2, FrameFragment} =
+ case AuthMechanism:handle_response(SaslBin, AuthState) of
+ {refused, Username, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
+ auth_fail(Username, Msg, Args, C1, State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
+ {protocol_error, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
+ notify_auth_result(none, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}],
+ C1, State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
+ {challenge, Challenge, AuthState1} ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
+ ChallengeSize = byte_size(Challenge),
+ {C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
+ <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
+ };
+ {ok, User = #user{username = Username}} ->
+ case rabbit_access_control:check_user_loopback(Username, S) of
+ ok ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream),
+ notify_auth_result(Username, user_authentication_success,
+ [], C1, State),
+ {C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
+ <<?RESPONSE_CODE_OK:16>>
+ };
+ not_allowed ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
+ rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
+ end
+ end,
Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>,
- frame(Transport, S1, Frame),
- {S1#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, Rest};
+ frame(Transport, C1, Frame),
+ {C2, Rest};
{error, _} ->
Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>,
frame(Transport, Connection0, Frame),
@@ -591,6 +680,20 @@ handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) ->
rabbit_log:warning("unknown frame ~p ~p, closing connection.~n", [Frame, Rest]),
{Connection#stream_connection{connection_step = failure}, State, Rest}.
+auth_fail(Username, Msg, Args, Connection, ConnectionState) ->
+ notify_auth_result(Username, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}], Connection, ConnectionState).
+
+notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState) ->
+ EventProps = [{connection_type, network},
+ {name, case Username of none -> ''; _ -> Username end}] ++
+ [case Item of
+ name -> {connection_name, i(name, Connection, ConnectionState)};
+ _ -> {Item, i(Item, Connection, ConnectionState)}
+ end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++
+ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+
handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits,
virtual_host = VirtualHost, user = User} = Connection, State,
<<?COMMAND_PUBLISH:16, ?VERSION_0:16,
@@ -621,7 +724,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credi
{Connection, State, Rest}
end;
handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
- stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, user = User} = Connection,
+ stream_subscriptions = StreamSubscriptions, virtual_host = VirtualHost, user = User,
+ send_file_oct = SendFileOct} = Connection,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_SUBSCRIBE:16, ?VERSION_0:16, CorrelationId:32, SubscriptionId:8/unsigned, StreamSize:16, Stream:StreamSize/binary,
OffsetType:16/signed, OffsetAndCredit/binary>>, Rest) ->
@@ -671,7 +775,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = Socket,
{{segment, Segment1}, {credit, Credit1}} = send_chunks(
Transport,
- ConsumerState
+ ConsumerState,
+ SendFileOct
),
Consumers1 = Consumers#{SubscriptionId => ConsumerState#consumer{segment = Segment1, credit = Credit1}},
@@ -725,7 +830,7 @@ handle_frame_post_auth(Transport, #stream_connection{stream_subscriptions = Stre
stream_leaders = StreamLeaders1
}, State#stream_connection_state{consumers = Consumers1}, Rest}
end;
-handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection,
+handle_frame_post_auth(Transport, #stream_connection{socket = S, send_file_oct = SendFileOct} = Connection,
#stream_connection_state{consumers = Consumers} = State,
<<?COMMAND_CREDIT:16, ?VERSION_0:16, SubscriptionId:8/unsigned, Credit:16/signed>>, Rest) ->
@@ -736,7 +841,8 @@ handle_frame_post_auth(Transport, #stream_connection{socket = S} = Connection,
{{segment, Segment1}, {credit, Credit1}} = send_chunks(
Transport,
Consumer,
- AvailableCredit + Credit
+ AvailableCredit + Credit,
+ SendFileOct
),
Consumer1 = Consumer#consumer{segment = Segment1, credit = Credit1},
@@ -945,6 +1051,16 @@ handle_frame_post_auth(Transport, Connection, State, Frame, Rest) ->
frame(Transport, Connection, CloseFrame),
{Connection#stream_connection{connection_step = close_sent}, State, Rest}.
+notify_connection_closed(#stream_connection{name = Name} = Connection, ConnectionState) ->
+ rabbit_core_metrics:connection_closed(self()),
+ ClientProperties = i(client_properties, Connection, ConnectionState),
+ EventProperties = [{name, Name},
+ {pid, self()},
+ {node, node()},
+ {client_properties, ClientProperties}],
+ rabbit_event:notify(connection_closed,
+ augment_infos_with_user_provided_connection_name(EventProperties, Connection)).
+
parse_map(<<>>, _Count) ->
{#{}, <<>>};
parse_map(Content, 0) ->
@@ -1072,38 +1188,40 @@ subscription_exists(StreamSubscriptions, SubscriptionId) ->
SubscriptionIds = lists:flatten(maps:values(StreamSubscriptions)),
lists:any(fun(Id) -> Id =:= SubscriptionId end, SubscriptionIds).
-send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}) ->
+send_file_callback(Transport, #consumer{socket = S, subscription_id = SubscriptionId}, Counter) ->
fun(Size) ->
FrameSize = 2 + 2 + 1 + Size,
FrameBeginning = <<FrameSize:32, ?COMMAND_DELIVER:16, ?VERSION_0:16, SubscriptionId:8/unsigned>>,
- Transport:send(S, FrameBeginning)
+ Transport:send(S, FrameBeginning),
+ atomics:add(Counter, 1, Size)
end.
-send_chunks(Transport, #consumer{credit = Credit} = State) ->
- send_chunks(Transport, State, Credit).
+send_chunks(Transport, #consumer{credit = Credit} = State, Counter) ->
+ send_chunks(Transport, State, Credit, Counter).
-send_chunks(_Transport, #consumer{segment = Segment}, 0) ->
+send_chunks(_Transport, #consumer{segment = Segment}, 0, _Counter) ->
{{segment, Segment}, {credit, 0}};
-send_chunks(Transport, #consumer{segment = Segment} = State, Credit) ->
- send_chunks(Transport, State, Segment, Credit, true).
+send_chunks(Transport, #consumer{segment = Segment} = State, Credit, Counter) ->
+ send_chunks(Transport, State, Segment, Credit, true, Counter).
-send_chunks(_Transport, _State, Segment, 0 = _Credit, _Retry) ->
+send_chunks(_Transport, _State, Segment, 0 = _Credit, _Retry, _Counter) ->
{{segment, Segment}, {credit, 0}};
-send_chunks(Transport, #consumer{socket = S} = State, Segment, Credit, Retry) ->
- case osiris_log:send_file(S, Segment, send_file_callback(Transport, State)) of
+send_chunks(Transport, #consumer{socket = S} = State, Segment, Credit, Retry, Counter) ->
+ case osiris_log:send_file(S, Segment, send_file_callback(Transport, State, Counter)) of
{ok, Segment1} ->
send_chunks(
Transport,
State,
Segment1,
Credit - 1,
- true
+ true,
+ Counter
);
{end_of_stream, Segment1} ->
case Retry of
true ->
timer:sleep(1),
- send_chunks(Transport, State, Segment1, Credit, false);
+ send_chunks(Transport, State, Segment1, Credit, false, Counter);
false ->
#consumer{member_pid = LocalMember} = State,
osiris:register_offset_listener(LocalMember, osiris_log:next_offset(Segment1)),
@@ -1142,6 +1260,19 @@ check_write_permitted(Resource, User, Context) ->
check_read_permitted(Resource, User, Context) ->
check_resource_access(User, Resource, read, Context).
+emit_stats(Connection, ConnectionState) ->
+ [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I
+ = infos(?SIMPLE_METRICS, Connection, ConnectionState),
+ Infos = infos(?OTHER_METRICS, Connection, ConnectionState),
+ rabbit_core_metrics:connection_stats(Pid, Infos),
+ rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
+ rabbit_event:notify(connection_stats, Infos ++ I),
+ Connection1 = rabbit_event:reset_stats_timer(Connection, #stream_connection.stats_timer),
+ ensure_stats_timer(Connection1).
+
+ensure_stats_timer(Connection = #stream_connection{}) ->
+ rabbit_event:ensure_stats_timer(Connection, #stream_connection.stats_timer, emit_stats).
+
info(Pid, InfoItems) ->
case InfoItems -- ?INFO_ITEMS of
[] ->
@@ -1151,19 +1282,57 @@ info(Pid, InfoItems) ->
infos(Items, Connection, State) -> [{Item, i(Item, Connection, State)} || Item <- Items].
+i(pid, _, _) -> self();
+i(node, _, _) -> node();
+i(SockStat, #stream_connection{socket = Sock, send_file_oct = Counter}, _) when
+ SockStat =:= send_oct -> % Number of bytes sent from the socket.
+ case rabbit_net:getstat(Sock, [SockStat]) of
+ {ok, [{_, N}]} when is_number(N) -> N + atomics:get(Counter, 1);
+ _ -> 0 + atomics:get(Counter, 1)
+ end;
+i(SockStat, #stream_connection{socket = Sock}, _) when
+ SockStat =:= recv_oct; % Number of bytes received by the socket.
+ SockStat =:= recv_cnt; % Number of packets received by the socket.
+ SockStat =:= send_cnt; % Number of packets sent from the socket.
+ SockStat =:= send_pend -> % Number of bytes waiting to be sent by the socket.
+ case rabbit_net:getstat(Sock, [SockStat]) of
+ {ok, [{_, N}]} when is_number(N) -> N;
+ _ -> 0
+ end;
+i(reductions, _, _) ->
+ {reductions, Reductions} = erlang:process_info(self(), reductions),
+ Reductions;
+i(garbage_collection, _, _) ->
+ rabbit_misc:get_gc_info(self());
+i(state, Connection, ConnectionState) -> i(connection_state, Connection, ConnectionState);
+i(timeout, Connection, ConnectionState) -> i(heartbeat, Connection, ConnectionState);
+i(name, Connection, ConnectionState) -> i(conn_name, Connection, ConnectionState);
i(conn_name, #stream_connection{name = Name}, _) -> Name;
i(port, #stream_connection{port = Port}, _) -> Port;
i(peer_port, #stream_connection{peer_port = PeerPort}, _) -> PeerPort;
i(host, #stream_connection{host = Host}, _) -> Host;
i(peer_host, #stream_connection{peer_host = PeerHost}, _) -> PeerHost;
+i(ssl, _, _) -> false;
+i(peer_cert_subject, _, _) -> '';
+i(peer_cert_issuer, _, _) -> '';
+i(peer_cert_validity, _, _) -> '';
+i(ssl_protocol, _, _) -> '';
+i(ssl_key_exchange, _, _) -> '';
+i(ssl_cipher, _, _) -> '';
+i(ssl_hash, _, _) -> '';
+i(channels, _, _) -> 0;
+i(protocol, _, _) -> {<<"stream">>, ""};
+i(user_who_performed_action, Connection, ConnectionState) -> i(user, Connection, ConnectionState);
i(user, #stream_connection{user = U}, _) -> U#user.username;
i(vhost, #stream_connection{virtual_host = VirtualHost}, _) -> VirtualHost;
i(subscriptions, _, #stream_connection_state{consumers = Consumers}) -> maps:size(Consumers);
i(connection_state, _Connection, #stream_connection_state{blocked = true}) -> blocked;
i(connection_state, _Connection, #stream_connection_state{blocked = false}) -> running;
+i(auth_mechanism, #stream_connection{auth_mechanism = none}, _) -> none;
i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> Name;
i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) -> Heartbeat;
i(frame_max, #stream_connection{frame_max = FrameMax}, _) -> FrameMax;
-i(client_properties, #stream_connection{client_properties = CP}, _) -> CP;
+i(channel_max, _, _) -> 0;
+i(client_properties, #stream_connection{client_properties = CP}, _) -> rabbit_misc:to_amqp_table(CP);
i(connected_at, #stream_connection{connected_at = T}, _) -> T;
i(Item, #stream_connection{}, _) -> throw({bad_argument, Item}). \ No newline at end of file