path: root/deps/rabbit_common/src/rabbit_net.erl
diff options
Diffstat (limited to 'deps/rabbit_common/src/rabbit_net.erl')
1 files changed, 321 insertions, 0 deletions
diff --git a/deps/rabbit_common/src/rabbit_net.erl b/deps/rabbit_common/src/rabbit_net.erl
new file mode 100644
index 0000000000..7685687ff0
--- /dev/null
+++ b/deps/rabbit_common/src/rabbit_net.erl
@@ -0,0 +1,321 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
+ recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2,
+ setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1,
+ peercert/1, connection_string/2, socket_ends/2, is_loopback/1,
+ tcp_host/1, unwrap_socket/1, maybe_get_proxy_socket/1,
+ hostname/0, getifaddrs/0]).
+-export_type([socket/0, ip_port/0, hostname/0]).
+-type stat_option() ::
+ 'recv_cnt' | 'recv_max' | 'recv_avg' | 'recv_oct' | 'recv_dvi' |
+ 'send_cnt' | 'send_max' | 'send_avg' | 'send_oct' | 'send_pend'.
+-type ok_val_or_error(A) :: rabbit_types:ok_or_error2(A, any()).
+-type ok_or_any_error() :: rabbit_types:ok_or_error(any()).
+-type socket() :: port() | ssl:sslsocket().
+-type opts() :: [{atom(), any()} |
+ {raw, non_neg_integer(), non_neg_integer(), binary()}].
+-type hostname() :: inet:hostname().
+-type ip_port() :: inet:port_number().
+% -type host_or_ip() :: binary() | inet:ip_address().
+-spec is_ssl(socket()) -> boolean().
+-spec ssl_info(socket()) -> 'nossl' | ok_val_or_error([{atom(), any()}]).
+-spec controlling_process(socket(), pid()) -> ok_or_any_error().
+-spec getstat(socket(), [stat_option()]) ->
+ ok_val_or_error([{stat_option(), integer()}]).
+-spec recv(socket()) ->
+ {'data', [char()] | binary()} |
+ 'closed' |
+ rabbit_types:error(any()) |
+ {'other', any()}.
+-spec sync_recv(socket(), integer()) ->
+ rabbit_types:ok(binary()) |
+ rabbit_types:error(any()).
+-spec async_recv(socket(), integer(), timeout()) ->
+ rabbit_types:ok(any()).
+-spec port_command(socket(), iolist()) -> 'true'.
+-spec getopts
+ (socket(),
+ [atom() |
+ {raw, non_neg_integer(), non_neg_integer(),
+ non_neg_integer() | binary()}]) ->
+ ok_val_or_error(opts()).
+-spec setopts(socket(), opts()) -> ok_or_any_error().
+-spec send(socket(), binary() | iolist()) -> ok_or_any_error().
+-spec close(socket()) -> ok_or_any_error().
+-spec fast_close(socket()) -> ok_or_any_error().
+-spec sockname(socket()) ->
+ ok_val_or_error({inet:ip_address(), ip_port()}).
+-spec peername(socket()) ->
+ ok_val_or_error({inet:ip_address(), ip_port()}).
+-spec peercert(socket()) ->
+ 'nossl' | ok_val_or_error(rabbit_ssl:certificate()).
+-spec connection_string(socket(), 'inbound' | 'outbound') ->
+ ok_val_or_error(string()).
+% -spec socket_ends(socket() | ranch_proxy:proxy_socket() | ranch_proxy_ssl:ssl_socket(),
+% 'inbound' | 'outbound') ->
+% ok_val_or_error({host_or_ip(), ip_port(),
+% host_or_ip(), ip_port()}).
+-spec is_loopback(socket() | inet:ip_address()) -> boolean().
+% -spec unwrap_socket(socket() | ranch_proxy:proxy_socket() | ranch_proxy_ssl:ssl_socket()) -> socket().
+-dialyzer({nowarn_function, [socket_ends/2, unwrap_socket/1]}).
+-define(SSL_CLOSE_TIMEOUT, 5000).
+-define(IS_SSL(Sock), is_tuple(Sock)
+ andalso (tuple_size(Sock) =:= 3)
+ andalso (element(1, Sock) =:= sslsocket)).
+is_ssl(Sock) -> ?IS_SSL(Sock).
+%% Seems hackish. Is hackish. But the structure is stable and
+%% kept this way for backward compatibility reasons. We need
+%% it for two reasons: there are no ssl:getstat(Sock) function,
+%% and no ssl:close(Timeout) function. Both of them are being
+%% worked on as we speak.
+ssl_get_socket(Sock) ->
+ element(2, element(2, Sock)).
+ssl_info(Sock) when ?IS_SSL(Sock) ->
+ ssl:connection_information(Sock);
+ssl_info(_Sock) ->
+ nossl.
+controlling_process(Sock, Pid) when ?IS_SSL(Sock) ->
+ ssl:controlling_process(Sock, Pid);
+controlling_process(Sock, Pid) when is_port(Sock) ->
+ gen_tcp:controlling_process(Sock, Pid).
+getstat(Sock, Stats) when ?IS_SSL(Sock) ->
+ inet:getstat(ssl_get_socket(Sock), Stats);
+getstat(Sock, Stats) when is_port(Sock) ->
+ inet:getstat(Sock, Stats);
+%% Used by Proxy protocol support in plugins
+getstat({rabbit_proxy_socket, Sock, _}, Stats) when ?IS_SSL(Sock) ->
+ inet:getstat(ssl_get_socket(Sock), Stats);
+getstat({rabbit_proxy_socket, Sock, _}, Stats) when is_port(Sock) ->
+ inet:getstat(Sock, Stats).
+recv(Sock) when ?IS_SSL(Sock) ->
+ recv(Sock, {ssl, ssl_closed, ssl_error});
+recv(Sock) when is_port(Sock) ->
+ recv(Sock, {tcp, tcp_closed, tcp_error}).
+recv(S, {DataTag, ClosedTag, ErrorTag}) ->
+ receive
+ {DataTag, S, Data} -> {data, Data};
+ {ClosedTag, S} -> closed;
+ {ErrorTag, S, Reason} -> {error, Reason};
+ Other -> {other, Other}
+ end.
+sync_recv(Sock, Length) when ?IS_SSL(Sock) ->
+ ssl:recv(Sock, Length);
+sync_recv(Sock, Length) ->
+ gen_tcp:recv(Sock, Length).
+async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
+ Pid = self(),
+ Ref = make_ref(),
+ spawn(fun () -> Pid ! {inet_async, Sock, Ref,
+ ssl:recv(Sock, Length, Timeout)}
+ end),
+ {ok, Ref};
+async_recv(Sock, Length, infinity) when is_port(Sock) ->
+ prim_inet:async_recv(Sock, Length, -1);
+async_recv(Sock, Length, Timeout) when is_port(Sock) ->
+ prim_inet:async_recv(Sock, Length, Timeout).
+port_command(Sock, Data) when ?IS_SSL(Sock) ->
+ case ssl:send(Sock, Data) of
+ ok -> self() ! {inet_reply, Sock, ok},
+ true;
+ {error, Reason} -> erlang:error(Reason)
+ end;
+port_command(Sock, Data) when is_port(Sock) ->
+ erlang:port_command(Sock, Data).
+getopts(Sock, Options) when ?IS_SSL(Sock) ->
+ ssl:getopts(Sock, Options);
+getopts(Sock, Options) when is_port(Sock) ->
+ inet:getopts(Sock, Options).
+setopts(Sock, Options) when ?IS_SSL(Sock) ->
+ ssl:setopts(Sock, Options);
+setopts(Sock, Options) when is_port(Sock) ->
+ inet:setopts(Sock, Options).
+send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock, Data);
+send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
+close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock);
+close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
+fast_close(Sock) when ?IS_SSL(Sock) ->
+ %% We cannot simply port_close the underlying tcp socket since the
+ %% TLS protocol is quite insistent that a proper closing handshake
+ %% should take place (see RFC 5245 s7.2.1). So we call ssl:close
+ %% instead, but that can block for a very long time, e.g. when
+ %% there is lots of pending output and there is tcp backpressure,
+ %% or the ssl_connection process has entered the the
+ %% workaround_transport_delivery_problems function during
+ %% termination, which, inexplicably, does a gen_tcp:recv(Socket,
+ %% 0), which may never return if the client doesn't send a FIN or
+ %% that gets swallowed by the network. Since there is no timeout
+ %% variant of ssl:close, we construct our own.
+ {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock) end),
+ erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}),
+ receive
+ {Pid, ssl_close_timeout} ->
+ erlang:demonitor(MRef, [flush]),
+ exit(Pid, kill);
+ {'DOWN', MRef, process, Pid, _Reason} ->
+ ok
+ end,
+ catch port_close(ssl_get_socket(Sock)),
+ ok;
+fast_close(Sock) when is_port(Sock) ->
+ catch port_close(Sock), ok.
+sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock);
+sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
+peername(Sock) when ?IS_SSL(Sock) -> ssl:peername(Sock);
+peername(Sock) when is_port(Sock) -> inet:peername(Sock).
+peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock);
+peercert(Sock) when is_port(Sock) -> nossl.
+connection_string(Sock, Direction) ->
+ case socket_ends(Sock, Direction) of
+ {ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
+ {ok, rabbit_misc:format(
+ "~s:~p -> ~s:~p",
+ [maybe_ntoab(FromAddress), FromPort,
+ maybe_ntoab(ToAddress), ToPort])};
+ Error ->
+ Error
+ end.
+socket_ends(Sock, Direction) when ?IS_SSL(Sock);
+ is_port(Sock) ->
+ {From, To} = sock_funs(Direction),
+ case {From(Sock), To(Sock)} of
+ {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
+ {ok, {rdns(FromAddress), FromPort,
+ rdns(ToAddress), ToPort}};
+ {{error, _Reason} = Error, _} ->
+ Error;
+ {_, {error, _Reason} = Error} ->
+ Error
+ end;
+socket_ends({rabbit_proxy_socket, CSocket, ProxyInfo}, Direction = inbound) ->
+ #{
+ src_address := FromAddress,
+ src_port := FromPort
+ } = ProxyInfo,
+ {_From, To} = sock_funs(Direction),
+ case To(CSocket) of
+ {ok, {ToAddress, ToPort}} ->
+ {ok, {rdns(FromAddress), FromPort,
+ rdns(ToAddress), ToPort}};
+ {error, _Reason} = Error ->
+ Error
+ end.
+maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
+maybe_ntoab(Host) -> Host.
+tcp_host({0,0,0,0}) ->
+ hostname();
+tcp_host({0,0,0,0,0,0,0,0}) ->
+ hostname();
+tcp_host(IPAddress) ->
+ case inet:gethostbyaddr(IPAddress) of
+ {ok, #hostent{h_name = Name}} -> Name;
+ {error, _Reason} -> rabbit_misc:ntoa(IPAddress)
+ end.
+hostname() ->
+ {ok, Hostname} = inet:gethostname(),
+ case inet:gethostbyname(Hostname) of
+ {ok, #hostent{h_name = Name}} -> Name;
+ {error, _Reason} -> Hostname
+ end.
+format_nic_attribute({Key, undefined}) ->
+ {Key, undefined};
+format_nic_attribute({Key = flags, List}) when is_list(List) ->
+ Val = string:join(lists:map(fun rabbit_data_coercion:to_list/1, List), ", "),
+ {Key, rabbit_data_coercion:to_binary(Val)};
+format_nic_attribute({Key, Tuple}) when is_tuple(Tuple) and (Key =:= addr orelse
+ Key =:= broadaddr orelse
+ Key =:= netmask orelse
+ Key =:= dstaddr) ->
+ Val = inet_parse:ntoa(Tuple),
+ {Key, rabbit_data_coercion:to_binary(Val)};
+format_nic_attribute({Key = hwaddr, List}) when is_list(List) ->
+ %% [140, 133, 144, 28, 241, 121] => 8C:85:90:1C:F1:79
+ Val = string:join(lists:map(fun(N) -> integer_to_list(N, 16) end, List), ":"),
+ {Key, rabbit_data_coercion:to_binary(Val)}.
+getifaddrs() ->
+ {ok, AddrList} = inet:getifaddrs(),
+ Addrs0 = maps:from_list(AddrList),
+ maps:map(fun (_Key, Proplist) ->
+ lists:map(fun format_nic_attribute/1, Proplist)
+ end, Addrs0).
+rdns(Addr) ->
+ case application:get_env(rabbit, reverse_dns_lookups) of
+ {ok, true} -> list_to_binary(tcp_host(Addr));
+ _ -> Addr
+ end.
+sock_funs(inbound) -> {fun peername/1, fun sockname/1};
+sock_funs(outbound) -> {fun sockname/1, fun peername/1}.
+is_loopback(Sock) when is_port(Sock) ; ?IS_SSL(Sock) ->
+ case sockname(Sock) of
+ {ok, {Addr, _Port}} -> is_loopback(Addr);
+ {error, _} -> false
+ end;
+%% We could parse the results of inet:getifaddrs() instead. But that
+%% would be more complex and less maybe Windows-compatible...
+is_loopback({127,_,_,_}) -> true;
+is_loopback({0,0,0,0,0,0,0,1}) -> true;
+is_loopback({0,0,0,0,0,65535,AB,CD}) -> is_loopback(ipv4(AB, CD));
+is_loopback(_) -> false.
+ipv4(AB, CD) -> {AB bsr 8, AB band 255, CD bsr 8, CD band 255}.
+unwrap_socket({rabbit_proxy_socket, Sock, _}) ->
+ Sock;
+unwrap_socket(Sock) ->
+ Sock.
+maybe_get_proxy_socket(Sock={rabbit_proxy_socket, _, _}) ->
+ Sock;
+maybe_get_proxy_socket(_Sock) ->
+ undefined.