summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-20 18:20:03 +0200
committerArnaud Cogoluègnes <acogoluegnes@gmail.com>2020-10-20 18:20:03 +0200
commit20681aca6363df24ddf395768e31ba436d5f8a00 (patch)
tree4aa16ac200998da5cb6d8a84b6a2abefa7cf49a8
parent184f177d7844d79957a2b990375297d3452ba6be (diff)
downloadrabbitmq-server-git-20681aca6363df24ddf395768e31ba436d5f8a00.tar.gz
Add event notification during authentication
-rw-r--r--deps/rabbitmq_stream/src/rabbit_stream_reader.erl105
1 files changed, 77 insertions, 28 deletions
diff --git a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
index f94a698042..57b21454b5 100644
--- a/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
+++ b/deps/rabbitmq_stream/src/rabbit_stream_reader.erl
@@ -84,6 +84,10 @@
-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, state, channels, garbage_collection,
timeout]).
+-define(AUTH_NOTIFICATION_INFO_KEYS,
+ [host, name, peer_host, peer_port, protocol, auth_mechanism,
+ ssl, ssl_protocol, ssl_cipher, peer_cert_issuer, peer_cert_subject,
+ peer_cert_validity]).
%% API
-export([start_link/4, init/1, info/2]).
@@ -117,6 +121,7 @@ init([KeepaliveSup, Transport, Ref, #{initial_credits := InitialCredits,
port = Port,
peer_port = PeerPort,
connected_at = os:system_time(milli_seconds),
+ auth_mechanism = none,
helper_sup = KeepaliveSup,
socket = RealSocket,
stream_leaders = #{},
@@ -190,7 +195,10 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta
pg_local:join(rabbit_stream_connections, self()),
Connection2 = rabbit_event:init_stats_timer(Connection1, #stream_connection.stats_timer),
Connection3 = rabbit_event:ensure_stats_timer(Connection2, #stream_connection.stats_timer, emit_stats),
- Infos = infos(?CREATION_EVENT_KEYS, Connection3, State1),
+ Infos = augment_infos_with_user_provided_connection_name(
+ infos(?CREATION_EVENT_KEYS, Connection3, State1),
+ Connection3
+ ),
rabbit_core_metrics:connection_created(self(), Infos),
rabbit_event:notify(connection_created, Infos),
@@ -214,6 +222,14 @@ listen_loop_pre_auth(Transport, #stream_connection{socket = S} = Connection, Sta
close(Transport, S)
end.
+augment_infos_with_user_provided_connection_name(Infos, #stream_connection{client_properties = ClientProperties}) ->
+ case ClientProperties of
+ #{<<"connection_name">> := UserProvidedConnectionName} ->
+ [{user_provided_name, UserProvidedConnectionName} | Infos];
+ _ ->
+ Infos
+ end.
+
close(Transport, S) ->
Transport:shutdown(S, write),
Transport:close(S).
@@ -527,7 +543,10 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S} = Connection, St
Transport:send(S, [<<FrameSize:32>>, <<Frame/binary>>]),
{Connection, State, Rest};
-handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_state = AuthState0} = Connection0, State,
+handle_frame_pre_auth(Transport,
+ #stream_connection{socket = S,
+ authentication_state = AuthState0,
+ host = Host} = Connection0, State,
<<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32,
MechanismLength:16, Mechanism:MechanismLength/binary,
SaslFragment/binary>>, Rest) ->
@@ -547,32 +566,46 @@ handle_frame_pre_auth(Transport, #stream_connection{socket = S, authentication_s
AS ->
AS
end,
- {S1, FrameFragment} = case AuthMechanism:handle_response(SaslBin, AuthState) of
- {refused, _Username, Msg, Args} ->
- rabbit_log:warning(Msg, Args),
- {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
- {protocol_error, Msg, Args} ->
- rabbit_log:warning(Msg, Args),
- {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
- {challenge, Challenge, AuthState1} ->
- ChallengeSize = byte_size(Challenge),
- {Connection0#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
- <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
- };
- {ok, User = #user{username = Username}} ->
- case rabbit_access_control:check_user_loopback(Username, S) of
- ok ->
- {Connection0#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
- <<?RESPONSE_CODE_OK:16>>
- };
- not_allowed ->
- rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
- {Connection0#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
- end
- end,
+ RemoteAddress = list_to_binary(inet:ntoa(Host)),
+ C1 = Connection0#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}},
+ {C2, FrameFragment} =
+ case AuthMechanism:handle_response(SaslBin, AuthState) of
+ {refused, Username, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
+ auth_fail(Username, Msg, Args, C1, State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_AUTHENTICATION_FAILURE:16>>};
+ {protocol_error, Msg, Args} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, <<>>, stream),
+ notify_auth_result(none, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}],
+ C1, State),
+ rabbit_log:warning(Msg, Args),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_ERROR:16>>};
+ {challenge, Challenge, AuthState1} ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, <<>>, stream),
+ ChallengeSize = byte_size(Challenge),
+ {C1#stream_connection{authentication_state = AuthState1, connection_step = authenticating},
+ <<?RESPONSE_SASL_CHALLENGE:16, ChallengeSize:32, Challenge/binary>>
+ };
+ {ok, User = #user{username = Username}} ->
+ case rabbit_access_control:check_user_loopback(Username, S) of
+ ok ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, Username, stream),
+ notify_auth_result(Username, user_authentication_success,
+ [], C1, State),
+ {C1#stream_connection{authentication_state = done, user = User, connection_step = authenticated},
+ <<?RESPONSE_CODE_OK:16>>
+ };
+ not_allowed ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, Username, stream),
+ rabbit_log:warning("User '~s' can only connect via localhost~n", [Username]),
+ {C1#stream_connection{connection_step = failure}, <<?RESPONSE_SASL_AUTHENTICATION_FAILURE_LOOPBACK:16>>}
+ end
+ end,
Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, FrameFragment/binary>>,
- frame(Transport, S1, Frame),
- {S1#stream_connection{auth_mechanism = {Mechanism, AuthMechanism}}, Rest};
+ frame(Transport, C1, Frame),
+ {C2, Rest};
{error, _} ->
Frame = <<?COMMAND_SASL_AUTHENTICATE:16, ?VERSION_0:16, CorrelationId:32, ?RESPONSE_SASL_MECHANISM_NOT_SUPPORTED:16>>,
frame(Transport, Connection0, Frame),
@@ -627,6 +660,20 @@ handle_frame_pre_auth(_Transport, Connection, State, Frame, Rest) ->
rabbit_log:warning("unknown frame ~p ~p, closing connection.~n", [Frame, Rest]),
{Connection#stream_connection{connection_step = failure}, State, Rest}.
+auth_fail(Username, Msg, Args, Connection, ConnectionState) ->
+ notify_auth_result(Username, user_authentication_failure,
+ [{error, rabbit_misc:format(Msg, Args)}], Connection, ConnectionState).
+
+notify_auth_result(Username, AuthResult, ExtraProps, Connection, ConnectionState) ->
+ EventProps = [{connection_type, network},
+ {name, case Username of none -> ''; _ -> Username end}] ++
+ [case Item of
+ name -> {connection_name, i(name, Connection, ConnectionState)};
+ _ -> {Item, i(Item, Connection, ConnectionState)}
+ end || Item <- ?AUTH_NOTIFICATION_INFO_KEYS] ++
+ ExtraProps,
+ rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
+
handle_frame_post_auth(Transport, #stream_connection{socket = S, credits = Credits,
virtual_host = VirtualHost, user = User} = Connection, State,
<<?COMMAND_PUBLISH:16, ?VERSION_0:16,
@@ -991,7 +1038,8 @@ notify_connection_closed(#stream_connection{name = Name} = Connection, Connectio
{pid, self()},
{node, node()},
{client_properties, ClientProperties}],
- rabbit_event:notify(connection_closed, EventProperties).
+ rabbit_event:notify(connection_closed,
+ augment_infos_with_user_provided_connection_name(EventProperties, Connection)).
parse_map(<<>>, _Count) ->
{#{}, <<>>};
@@ -1260,6 +1308,7 @@ i(vhost, #stream_connection{virtual_host = VirtualHost}, _) -> Virtu
i(subscriptions, _, #stream_connection_state{consumers = Consumers}) -> maps:size(Consumers);
i(connection_state, _Connection, #stream_connection_state{blocked = true}) -> blocked;
i(connection_state, _Connection, #stream_connection_state{blocked = false}) -> running;
+i(auth_mechanism, #stream_connection{auth_mechanism = none}, _) -> none;
i(auth_mechanism, #stream_connection{auth_mechanism = {Name, _Mod}}, _) -> Name;
i(heartbeat, #stream_connection{heartbeat = Heartbeat}, _) -> Heartbeat;
i(frame_max, #stream_connection{frame_max = FrameMax}, _) -> FrameMax;