diff options
author | David Ansari <david.ansari@gmx.de> | 2023-01-30 12:12:31 +0000 |
---|---|---|
committer | David Ansari <david.ansari@gmx.de> | 2023-01-30 12:17:19 +0000 |
commit | cbb389bb2aba0e88ec389846e95460657960e830 (patch) | |
tree | 0fd7f27dba0e10d723e047de46921749d39a7fc4 | |
parent | 251be71ebe5aaad17c89a13f23aa911eaffc8f01 (diff) | |
download | rabbitmq-server-git-cbb389bb2aba0e88ec389846e95460657960e830.tar.gz |
Remove MQTT processor field peer_addr
as it seems to always match peer_host.
Commit 7e09b85426959883c2cbe5f409aeaa299427fd8e adds peer address
provided by WebMQTT plugin.
However, this seems unnecessary since function rabbit_net:peername/1 on
the unwrapped socket provides the same address.
The peer address was the address of the proxy if the proxy protocol is
enabled.
This commit simplifies code and reduces memory consumption.
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl | 49 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl | 7 | ||||
-rw-r--r-- | deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl | 15 |
3 files changed, 35 insertions, 36 deletions
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl index 58bccaa2f5..952b26e961 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -8,7 +8,7 @@ %% This module contains code that is common to MQTT and Web MQTT connections. -module(rabbit_mqtt_processor). --export([info/2, initial_state/2, initial_state/4, +-export([info/2, initial_state/2, initial_state/3, process_packet/2, serialise/2, terminate/4, handle_pre_hibernate/0, handle_ra_event/2, handle_down/2, handle_queue_event/2, @@ -21,7 +21,8 @@ -export_type([state/0]). -import(rabbit_mqtt_util, [mqtt_to_amqp/1, - amqp_to_mqtt/1]). + amqp_to_mqtt/1, + ip_address_to_binary/1]). -include_lib("kernel/include/logger.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -57,7 +58,6 @@ prefetch :: non_neg_integer(), client_id :: option(binary()), conn_name :: option(binary()), - peer_addr :: inet:ip_address(), host :: inet:ip_address(), port :: inet:port_number(), peer_host :: inet:ip_address(), @@ -87,21 +87,19 @@ -opaque state() :: #state{}. --spec initial_state(Socket :: any(), ConnectionName :: binary()) -> +-spec initial_state(Socket :: rabbit_net:socket(), + ConnectionName :: binary()) -> state(). initial_state(Socket, ConnectionName) -> - {ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(Socket), initial_state(Socket, ConnectionName, - fun serialise_and_send_to_client/2, - PeerAddr). + fun serialise_and_send_to_client/2). --spec initial_state(Socket :: any(), +-spec initial_state(Socket :: rabbit_net:socket(), ConnectionName :: binary(), - SendFun :: fun((mqtt_packet(), state()) -> any()), - PeerAddr :: inet:ip_address()) -> + SendFun :: fun((mqtt_packet(), state()) -> any())) -> state(). -initial_state(Socket, ConnectionName, SendFun, PeerAddr) -> +initial_state(Socket, ConnectionName, SendFun) -> Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of true -> flow; false -> noflow @@ -114,7 +112,6 @@ initial_state(Socket, ConnectionName, SendFun, PeerAddr) -> prefetch = rabbit_mqtt_util:env(prefetch), delivery_flow = Flow, connected_at = os:system_time(milli_seconds), - peer_addr = PeerAddr, peer_host = PeerHost, peer_port = PeerPort, host = Host, @@ -393,8 +390,8 @@ check_client_id(_) -> check_credentials(Packet = #mqtt_packet_connect{username = Username, password = Password}, State = #state{cfg = #cfg{ssl_login_name = SslLoginName, - peer_addr = PeerAddr}}) -> - Ip = list_to_binary(inet:ntoa(PeerAddr)), + peer_host = PeerHost}}) -> + Ip = ip_address_to_binary(PeerHost), case creds(Username, Password, SslLoginName) of nocreds -> rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt), @@ -699,13 +696,15 @@ make_will_msg(#mqtt_packet_connect{will_retain = Retain, payload = Msg}. process_login(_UserBin, _PassBin, ClientId, - #state{cfg = #cfg{peer_addr = Addr}, + #state{cfg = #cfg{peer_host = PeerHost}, auth_state = #auth_state{username = Username, user = User, vhost = VHost }} = State) when Username =/= undefined, User =/= undefined, VHost =/= underfined -> - rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt), + rabbit_core_metrics:auth_attempt_failed(ip_address_to_binary(PeerHost), + Username, + mqtt), ?LOG_ERROR( "MQTT detected duplicate connect attempt for client ID '~ts', user '~ts', vhost '~ts'", [ClientId, Username, VHost]), @@ -714,13 +713,13 @@ process_login(UserBin, PassBin, ClientId, #state{auth_state = undefined, cfg = #cfg{socket = Sock, ssl_login_name = SslLoginName, - peer_addr = Addr + peer_host = PeerHost }} = State0) -> {ok, {_PeerHost, _PeerPort, _Host, Port}} = rabbit_net:socket_ends(Sock, inbound), {VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, Port), ?LOG_DEBUG("MQTT vhost picked using ~s", [human_readable_vhost_lookup_strategy(VHostPickedUsing)]), - RemoteIpAddressBin = list_to_binary(inet:ntoa(Addr)), + Ip = ip_address_to_binary(PeerHost), Input = #{vhost => VHost, username_bin => UsernameBin, pass_bin => PassBin, @@ -736,10 +735,10 @@ process_login(UserBin, PassBin, ClientId, ], Input, State0) of {ok, _Output, State} -> - rabbit_core_metrics:auth_attempt_succeeded(RemoteIpAddressBin, UsernameBin, mqtt), + rabbit_core_metrics:auth_attempt_succeeded(Ip, UsernameBin, mqtt), {ok, State}; {error, _ConnectionRefusedReturnCode, _State} = Err -> - rabbit_core_metrics:auth_attempt_failed(RemoteIpAddressBin, UsernameBin, mqtt), + rabbit_core_metrics:auth_attempt_failed(Ip, UsernameBin, mqtt), Err end. @@ -837,12 +836,12 @@ check_vhost_access(#{vhost := VHost, client_id := ClientId, user := User = #user{username = Username} } = In, - #state{cfg = #cfg{peer_addr = PeerAddr}} = State) -> + #state{cfg = #cfg{peer_host = PeerHost}} = State) -> AuthzCtx = #{<<"client_id">> => ClientId}, try rabbit_access_control:check_vhost_access( User, VHost, - {ip, PeerAddr}, + {ip, PeerHost}, AuthzCtx) of ok -> {ok, maps:put(authz_ctx, AuthzCtx, In), State} @@ -859,8 +858,8 @@ check_user_loopback(#{vhost := VHost, user := User, authz_ctx := AuthzCtx }, - #state{cfg = #cfg{peer_addr = PeerAddr}} = State) -> - case rabbit_access_control:check_user_loopback(UsernameBin, PeerAddr) of + #state{cfg = #cfg{peer_host = PeerHost}} = State) -> + case rabbit_access_control:check_user_loopback(UsernameBin, PeerHost) of ok -> AuthState = #auth_state{user = User, username = UsernameBin, @@ -1964,7 +1963,6 @@ format_status( prefetch = Prefetch, client_id = ClientID, conn_name = ConnName, - peer_addr = PeerAddr, host = Host, port = Port, peer_host = PeerHost, @@ -1986,7 +1984,6 @@ format_status( prefetch => Prefetch, client_id => ClientID, conn_name => ConnName, - peer_addr => PeerAddr, host => Host, port => Port, peer_host => PeerHost, diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl index d098d3ff93..75637dc95d 100644 --- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl @@ -23,7 +23,8 @@ init_sparkplug/0, mqtt_to_amqp/1, amqp_to_mqtt/1, - truncate_binary/2 + truncate_binary/2, + ip_address_to_binary/1 ]). -define(MAX_TOPIC_TRANSLATION_CACHE_SIZE, 12). @@ -209,3 +210,7 @@ truncate_binary(Bin, Size) truncate_binary(Bin, Size) when is_binary(Bin) -> binary:part(Bin, 0, Size). + +-spec ip_address_to_binary(inet:ip_address()) -> binary(). +ip_address_to_binary(IpAddress) -> + list_to_binary(inet:ntoa(IpAddress)). diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl index 1fb022ee44..31e823dae3 100644 --- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl +++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl @@ -57,7 +57,7 @@ upgrade(Req, Env, Handler, HandlerState) -> upgrade(Req, Env, Handler, HandlerState, Opts) -> cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts). -takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState, PeerAddr}}) -> +takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) -> Sock = case HandlerState#state.socket of undefined -> Socket; @@ -65,7 +65,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState, {rabbit_proxy_socket, Socket, ProxyInfo} end, cowboy_websocket:takeover(Parent, Ref, Socket, Transport, Opts, Buffer, - {Handler, {HandlerState#state{socket = Sock}, PeerAddr}}). + {Handler, HandlerState#state{socket = Sock}}). %% cowboy_websocket init(Req, Opts) -> @@ -73,7 +73,6 @@ init(Req, Opts) -> undefined -> no_supported_sub_protocol(undefined, Req); Protocol -> - {PeerAddr, _PeerPort} = maps:get(peer, Req), WsOpts0 = proplists:get_value(ws_opts, Opts, #{}), WsOpts = maps:merge(#{compress => true}, WsOpts0), case lists:member(<<"mqtt">>, Protocol) of @@ -82,16 +81,15 @@ init(Req, Opts) -> true -> {?MODULE, cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req), - {#state{socket = maps:get(proxy_header, Req, undefined)}, - PeerAddr}, + #state{socket = maps:get(proxy_header, Req, undefined)}, WsOpts} end end. --spec websocket_init({state(), PeerAddr :: binary()}) -> +-spec websocket_init(state()) -> {cowboy_websocket:commands(), state()} | {cowboy_websocket:commands(), state(), hibernate}. -websocket_init({State0 = #state{socket = Sock}, PeerAddr}) -> +websocket_init(State0 = #state{socket = Sock}) -> logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [web_mqtt]}), ok = file_handle_cache:obtain(), case rabbit_net:connection_string(Sock, inbound) of @@ -102,8 +100,7 @@ websocket_init({State0 = #state{socket = Sock}, PeerAddr}) -> PState = rabbit_mqtt_processor:initial_state( rabbit_net:unwrap_socket(Sock), ConnName, - fun send_reply/2, - PeerAddr), + fun send_reply/2), State1 = State0#state{conn_name = ConnName, proc_state = PState}, State = rabbit_event:init_stats_timer(State1, #state.stats_timer), |