diff options
author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-20 18:20:03 +0200 |
---|---|---|
committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2020-10-20 18:20:03 +0200 |
commit | 20681aca6363df24ddf395768e31ba436d5f8a00 (patch) | |
tree | 4aa16ac200998da5cb6d8a84b6a2abefa7cf49a8 | |
parent | 184f177d7844d79957a2b990375297d3452ba6be (diff) | |
download | rabbitmq-server-git-20681aca6363df24ddf395768e31ba436d5f8a00.tar.gz |
Add event notification during authentication
-rw-r--r-- | deps/rabbitmq_stream/src/rabbit_stream_reader.erl | 105 |
1 files changed, 77 insertions, 28 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl index f94a698042..57b21454b5 100644 --- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl +++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl @@ -84,6 +84,10 @@ -define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]). -define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels, garbage_collection, timeout]). +-define(AUTH_NOTIFICATION_INFO_KEYS, + [host, name, peer_host, peer_port, protocol, auth_mechanism, + ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject, + peer_cert_validity]). %% API -export([start_link/4, init/1, info/2]). @@ -117,6 +121,7 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits, port = Port, peer_port = PeerPort, connected_at = os:system_time(milli_seconds), + auth_mechanism = none, helper_sup = KeepaliveSup, socket = RealSocket, stream_leaders = #{}, @@ -190,7 +195,10 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta pg_local:join(rabbit_stream_connections, self()), Connection2 = rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer), Connection3 = rabbit_event:ensure_stats_timer(Connection2, #stream_connection.stats_timer, emit_stats), - Infos = infos(?CREATION_EVENT_KEYS, Connection3, State1), + Infos = augment_infos_with_user_provided_connection_name( + infos(?CREATION_EVENT_KEYS, Connection3, State1), + Connection3 + ), rabbit_core_metrics:connection_created(self(), Infos), rabbit_event:notify(connection_created, Infos), @@ -214,6 +222,14 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta close(Transport, S) end. +augment_infos_with_user_provided_connection_name(Infos, #stream_connection{client_properties = ClientProperties}) -> + case ClientProperties of + #{<<"connection_name">> := UserProvidedConnectionName} -> + [{user_provided_name, UserProvidedConnectionName} | Infos]; + _ -> + Infos + end. + close(Transport, S) -> Transport:shutdown(S, write), Transport:close(S). @@ -527,7 +543,10 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, St Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]), {Connection, State, Rest}; -handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_state = AuthState0} = Connection0, State, +handle_frame_pre_auth(Transport, + #stream_connection{socket = S, + authentication_state = AuthState0, + host = Host} = Connection0, State, <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, MechanismLength:16, Mechanism:MechanismLength/binary, SaslFragment/binary>>, Rest) -> @@ -547,32 +566,46 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_s AS -> AS end, - {S1, FrameFragment} = case AuthMechanism:handle_response(SaslBin, AuthState) of - {refused, _Username, Msg, Args} -> - rabbit_log:warning(Msg, Args), - {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>}; - {protocol_error, Msg, Args} -> - rabbit_log:warning(Msg, Args), - {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>}; - {challenge, Challenge, AuthState1} -> - ChallengeSize = byte_size(Challenge), - {Connection0#stream_connection{authentication_state = AuthState1, connection_step = authenticating}, - <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>> - }; - {ok, User = #user{username = Username}} -> - case rabbit_access_control:check_user_loopback(Username, S) of - ok -> - {Connection0#stream_connection{authentication_state = done, user = User, connection_step = authenticated}, - <<?RESPONSE_CODE_OK:16>> - }; - not_allowed -> - rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]), - {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>} - end - end, + RemoteAddress = list_to_binary(inet:ntoa(Host)), + C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, + {C2, FrameFragment} = + case AuthMechanism:handle_response(SaslBin, AuthState) of + {refused, Username, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream), + auth_fail(Username, Msg, Args, C1, State), + rabbit_log:warning(Msg, Args), + {C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>}; + {protocol_error, Msg, Args} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream), + notify_auth_result(none, user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], + C1, State), + rabbit_log:warning(Msg, Args), + {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>}; + {challenge, Challenge, AuthState1} -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream), + ChallengeSize = byte_size(Challenge), + {C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating}, + <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>> + }; + {ok, User = #user{username = Username}} -> + case rabbit_access_control:check_user_loopback(Username, S) of + ok -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream), + notify_auth_result(Username, user_authentication_success, + [], C1, State), + {C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated}, + <<?RESPONSE_CODE_OK:16>> + }; + not_allowed -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream), + rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]), + {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>} + end + end, Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>, - frame(Transport, S1, Frame), - {S1#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, Rest}; + frame(Transport, C1, Frame), + {C2, Rest}; {error, _} -> Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>, frame(Transport, Connection0, Frame), @@ -627,6 +660,20 @@ handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) -> rabbit_log:warning("unknown frame ~p ~p, closing connection.~n", [Frame, Rest]), {Connection#stream_connection{connection_step = failure}, State, Rest}. +auth_fail(Username, Msg, Args, Connection, ConnectionState) -> + notify_auth_result(Username, user_authentication_failure, + [{error, rabbit_misc:format(Msg, Args)}], Connection, ConnectionState). + +notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState) -> + EventProps = [{connection_type, network}, + {name, case Username of none -> ''; _ -> Username end}] ++ + [case Item of + name -> {connection_name, i(name, Connection, ConnectionState)}; + _ -> {Item, i(Item, Connection, ConnectionState)} + end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++ + ExtraProps, + rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). + handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits, virtual_host = VirtualHost, user = User} = Connection, State, <<?COMMAND_PUBLISH:16, ?VERSION_0:16, @@ -991,7 +1038,8 @@ notify_connection_closed(#stream_connection{name = Name} = Connection, Connectio {pid, self()}, {node, node()}, {client_properties, ClientProperties}], - rabbit_event:notify(connection_closed, EventProperties). + rabbit_event:notify(connection_closed, + augment_infos_with_user_provided_connection_name(EventProperties, Connection)). parse_map(<<>>, _Count) -> {#{}, <<>>}; @@ -1260,6 +1308,7 @@ i(vhost, #stream_connection{virtual_host = VirtualHost}, _) -> Virtu i(subscriptions, _, #stream_connection_state{consumers = Consumers}) -> maps:size(Consumers); i(connection_state, _Connection, #stream_connection_state{blocked = true}) -> blocked; i(connection_state, _Connection, #stream_connection_state{blocked = false}) -> running; +i(auth_mechanism, #stream_connection{auth_mechanism = none}, _) -> none; i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> Name; i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) -> Heartbeat; i(frame_max, #stream_connection{frame_max = FrameMax}, _) -> FrameMax; |