summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDavid Ansari <david.ansari@gmx.de>2023-01-30 12:12:31 +0000
committerDavid Ansari <david.ansari@gmx.de>2023-01-30 12:17:19 +0000
commitcbb389bb2aba0e88ec389846e95460657960e830 (patch)
tree0fd7f27dba0e10d723e047de46921749d39a7fc4
parent251be71ebe5aaad17c89a13f23aa911eaffc8f01 (diff)
downloadrabbitmq-server-git-cbb389bb2aba0e88ec389846e95460657960e830.tar.gz
Remove MQTT processor field peer_addr
as it seems to always match peer_host. Commit 7e09b85426959883c2cbe5f409aeaa299427fd8e adds peer address provided by WebMQTT plugin. However, this seems unnecessary since function rabbit_net:peername/1 on the unwrapped socket provides the same address. The peer address was the address of the proxy if the proxy protocol is enabled. This commit simplifies code and reduces memory consumption.
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl49
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl7
-rw-r--r--deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl15
3 files changed, 35 insertions, 36 deletions
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
index 58bccaa2f5..952b26e961 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -8,7 +8,7 @@
%% This module contains code that is common to MQTT and Web MQTT connections.
-module(rabbit_mqtt_processor).
--export([info/2, initial_state/2, initial_state/4,
+-export([info/2, initial_state/2, initial_state/3,
process_packet/2, serialise/2,
terminate/4, handle_pre_hibernate/0,
handle_ra_event/2, handle_down/2, handle_queue_event/2,
@@ -21,7 +21,8 @@
-export_type([state/0]).
-import(rabbit_mqtt_util, [mqtt_to_amqp/1,
- amqp_to_mqtt/1]).
+ amqp_to_mqtt/1,
+ ip_address_to_binary/1]).
-include_lib("kernel/include/logger.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
@@ -57,7 +58,6 @@
prefetch :: non_neg_integer(),
client_id :: option(binary()),
conn_name :: option(binary()),
- peer_addr :: inet:ip_address(),
host :: inet:ip_address(),
port :: inet:port_number(),
peer_host :: inet:ip_address(),
@@ -87,21 +87,19 @@
-opaque state() :: #state{}.
--spec initial_state(Socket :: any(), ConnectionName :: binary()) ->
+-spec initial_state(Socket :: rabbit_net:socket(),
+ ConnectionName :: binary()) ->
state().
initial_state(Socket, ConnectionName) ->
- {ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(Socket),
initial_state(Socket,
ConnectionName,
- fun serialise_and_send_to_client/2,
- PeerAddr).
+ fun serialise_and_send_to_client/2).
--spec initial_state(Socket :: any(),
+-spec initial_state(Socket :: rabbit_net:socket(),
ConnectionName :: binary(),
- SendFun :: fun((mqtt_packet(), state()) -> any()),
- PeerAddr :: inet:ip_address()) ->
+ SendFun :: fun((mqtt_packet(), state()) -> any())) ->
state().
-initial_state(Socket, ConnectionName, SendFun, PeerAddr) ->
+initial_state(Socket, ConnectionName, SendFun) ->
Flow = case rabbit_misc:get_env(rabbit, mirroring_flow_control, true) of
true -> flow;
false -> noflow
@@ -114,7 +112,6 @@ initial_state(Socket, ConnectionName, SendFun, PeerAddr) ->
prefetch = rabbit_mqtt_util:env(prefetch),
delivery_flow = Flow,
connected_at = os:system_time(milli_seconds),
- peer_addr = PeerAddr,
peer_host = PeerHost,
peer_port = PeerPort,
host = Host,
@@ -393,8 +390,8 @@ check_client_id(_) ->
check_credentials(Packet = #mqtt_packet_connect{username = Username,
password = Password},
State = #state{cfg = #cfg{ssl_login_name = SslLoginName,
- peer_addr = PeerAddr}}) ->
- Ip = list_to_binary(inet:ntoa(PeerAddr)),
+ peer_host = PeerHost}}) ->
+ Ip = ip_address_to_binary(PeerHost),
case creds(Username, Password, SslLoginName) of
nocreds ->
rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
@@ -699,13 +696,15 @@ make_will_msg(#mqtt_packet_connect{will_retain = Retain,
payload = Msg}.
process_login(_UserBin, _PassBin, ClientId,
- #state{cfg = #cfg{peer_addr = Addr},
+ #state{cfg = #cfg{peer_host = PeerHost},
auth_state = #auth_state{username = Username,
user = User,
vhost = VHost
}} = State)
when Username =/= undefined, User =/= undefined, VHost =/= underfined ->
- rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
+ rabbit_core_metrics:auth_attempt_failed(ip_address_to_binary(PeerHost),
+ Username,
+ mqtt),
?LOG_ERROR(
"MQTT detected duplicate connect attempt for client ID '~ts', user '~ts', vhost '~ts'",
[ClientId, Username, VHost]),
@@ -714,13 +713,13 @@ process_login(UserBin, PassBin, ClientId,
#state{auth_state = undefined,
cfg = #cfg{socket = Sock,
ssl_login_name = SslLoginName,
- peer_addr = Addr
+ peer_host = PeerHost
}} = State0) ->
{ok, {_PeerHost, _PeerPort, _Host, Port}} = rabbit_net:socket_ends(Sock, inbound),
{VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, Port),
?LOG_DEBUG("MQTT vhost picked using ~s",
[human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
- RemoteIpAddressBin = list_to_binary(inet:ntoa(Addr)),
+ Ip = ip_address_to_binary(PeerHost),
Input = #{vhost => VHost,
username_bin => UsernameBin,
pass_bin => PassBin,
@@ -736,10 +735,10 @@ process_login(UserBin, PassBin, ClientId,
],
Input, State0) of
{ok, _Output, State} ->
- rabbit_core_metrics:auth_attempt_succeeded(RemoteIpAddressBin, UsernameBin, mqtt),
+ rabbit_core_metrics:auth_attempt_succeeded(Ip, UsernameBin, mqtt),
{ok, State};
{error, _ConnectionRefusedReturnCode, _State} = Err ->
- rabbit_core_metrics:auth_attempt_failed(RemoteIpAddressBin, UsernameBin, mqtt),
+ rabbit_core_metrics:auth_attempt_failed(Ip, UsernameBin, mqtt),
Err
end.
@@ -837,12 +836,12 @@ check_vhost_access(#{vhost := VHost,
client_id := ClientId,
user := User = #user{username = Username}
} = In,
- #state{cfg = #cfg{peer_addr = PeerAddr}} = State) ->
+ #state{cfg = #cfg{peer_host = PeerHost}} = State) ->
AuthzCtx = #{<<"client_id">> => ClientId},
try rabbit_access_control:check_vhost_access(
User,
VHost,
- {ip, PeerAddr},
+ {ip, PeerHost},
AuthzCtx) of
ok ->
{ok, maps:put(authz_ctx, AuthzCtx, In), State}
@@ -859,8 +858,8 @@ check_user_loopback(#{vhost := VHost,
user := User,
authz_ctx := AuthzCtx
},
- #state{cfg = #cfg{peer_addr = PeerAddr}} = State) ->
- case rabbit_access_control:check_user_loopback(UsernameBin, PeerAddr) of
+ #state{cfg = #cfg{peer_host = PeerHost}} = State) ->
+ case rabbit_access_control:check_user_loopback(UsernameBin, PeerHost) of
ok ->
AuthState = #auth_state{user = User,
username = UsernameBin,
@@ -1964,7 +1963,6 @@ format_status(
prefetch = Prefetch,
client_id = ClientID,
conn_name = ConnName,
- peer_addr = PeerAddr,
host = Host,
port = Port,
peer_host = PeerHost,
@@ -1986,7 +1984,6 @@ format_status(
prefetch => Prefetch,
client_id => ClientID,
conn_name => ConnName,
- peer_addr => PeerAddr,
host => Host,
port => Port,
peer_host => PeerHost,
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl
index d098d3ff93..75637dc95d 100644
--- a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl
@@ -23,7 +23,8 @@
init_sparkplug/0,
mqtt_to_amqp/1,
amqp_to_mqtt/1,
- truncate_binary/2
+ truncate_binary/2,
+ ip_address_to_binary/1
]).
-define(MAX_TOPIC_TRANSLATION_CACHE_SIZE, 12).
@@ -209,3 +210,7 @@ truncate_binary(Bin, Size)
truncate_binary(Bin, Size)
when is_binary(Bin) ->
binary:part(Bin, 0, Size).
+
+-spec ip_address_to_binary(inet:ip_address()) -> binary().
+ip_address_to_binary(IpAddress) ->
+ list_to_binary(inet:ntoa(IpAddress)).
diff --git a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
index 1fb022ee44..31e823dae3 100644
--- a/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
+++ b/deps/rabbitmq_web_mqtt/src/rabbit_web_mqtt_handler.erl
@@ -57,7 +57,7 @@ upgrade(Req, Env, Handler, HandlerState) ->
upgrade(Req, Env, Handler, HandlerState, Opts) ->
cowboy_websocket:upgrade(Req, Env, Handler, HandlerState, Opts).
-takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState, PeerAddr}}) ->
+takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, HandlerState}) ->
Sock = case HandlerState#state.socket of
undefined ->
Socket;
@@ -65,7 +65,7 @@ takeover(Parent, Ref, Socket, Transport, Opts, Buffer, {Handler, {HandlerState,
{rabbit_proxy_socket, Socket, ProxyInfo}
end,
cowboy_websocket:takeover(Parent, Ref, Socket, Transport, Opts, Buffer,
- {Handler, {HandlerState#state{socket = Sock}, PeerAddr}}).
+ {Handler, HandlerState#state{socket = Sock}}).
%% cowboy_websocket
init(Req, Opts) ->
@@ -73,7 +73,6 @@ init(Req, Opts) ->
undefined ->
no_supported_sub_protocol(undefined, Req);
Protocol ->
- {PeerAddr, _PeerPort} = maps:get(peer, Req),
WsOpts0 = proplists:get_value(ws_opts, Opts, #{}),
WsOpts = maps:merge(#{compress => true}, WsOpts0),
case lists:member(<<"mqtt">>, Protocol) of
@@ -82,16 +81,15 @@ init(Req, Opts) ->
true ->
{?MODULE,
cowboy_req:set_resp_header(<<"sec-websocket-protocol">>, <<"mqtt">>, Req),
- {#state{socket = maps:get(proxy_header, Req, undefined)},
- PeerAddr},
+ #state{socket = maps:get(proxy_header, Req, undefined)},
WsOpts}
end
end.
--spec websocket_init({state(), PeerAddr :: binary()}) ->
+-spec websocket_init(state()) ->
{cowboy_websocket:commands(), state()} |
{cowboy_websocket:commands(), state(), hibernate}.
-websocket_init({State0 = #state{socket = Sock}, PeerAddr}) ->
+websocket_init(State0 = #state{socket = Sock}) ->
logger:set_process_metadata(#{domain => ?RMQLOG_DOMAIN_CONN ++ [web_mqtt]}),
ok = file_handle_cache:obtain(),
case rabbit_net:connection_string(Sock, inbound) of
@@ -102,8 +100,7 @@ websocket_init({State0 = #state{socket = Sock}, PeerAddr}) ->
PState = rabbit_mqtt_processor:initial_state(
rabbit_net:unwrap_socket(Sock),
ConnName,
- fun send_reply/2,
- PeerAddr),
+ fun send_reply/2),
State1 = State0#state{conn_name = ConnName,
proc_state = PState},
State = rabbit_event:init_stats_timer(State1, #state.stats_timer),