diff options
Diffstat (limited to 'src/rabbit_net.erl')
-rw-r--r-- | src/rabbit_net.erl | 94 |
1 files changed, 71 insertions, 23 deletions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index bedf5142..e8c96818 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -10,17 +10,17 @@ %% %% The Original Code is RabbitMQ. %% -%% The Initial Developer of the Original Code is VMware, Inc. -%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved. +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved. %% -module(rabbit_net). -include("rabbit.hrl"). -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, - recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2, - close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1, - tune_buffer_size/1, connection_string/2]). + recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2, + setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1, + peercert/1, connection_string/2, socket_ends/2]). %%--------------------------------------------------------------------------- @@ -36,7 +36,7 @@ -type(socket() :: port() | #ssl_socket{}). -type(opts() :: [{atom(), any()} | {raw, non_neg_integer(), non_neg_integer(), binary()}]). - +-type(host_or_ip() :: binary() | inet:ip_address()). -spec(is_ssl/1 :: (socket()) -> boolean()). -spec(ssl_info/1 :: (socket()) -> 'nossl' | ok_val_or_error( @@ -48,6 +48,8 @@ -spec(recv/1 :: (socket()) -> {'data', [char()] | binary()} | 'closed' | rabbit_types:error(any()) | {'other', any()}). +-spec(sync_recv/2 :: (socket(), integer()) -> rabbit_types:ok(binary()) | + rabbit_types:error(any())). -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). @@ -59,7 +61,7 @@ -spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()). -spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()). -spec(close/1 :: (socket()) -> ok_or_any_error()). --spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()). +-spec(fast_close/1 :: (socket()) -> ok_or_any_error()). -spec(sockname/1 :: (socket()) -> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})). @@ -69,14 +71,19 @@ -spec(peercert/1 :: (socket()) -> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())). --spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()). -spec(connection_string/2 :: (socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())). +-spec(socket_ends/2 :: + (socket(), 'inbound' | 'outbound') + -> ok_val_or_error({host_or_ip(), rabbit_networking:ip_port(), + host_or_ip(), rabbit_networking:ip_port()})). -endif. %%--------------------------------------------------------------------------- +-define(SSL_CLOSE_TIMEOUT, 5000). + -define(IS_SSL(Sock), is_record(Sock, ssl_socket)). is_ssl(Sock) -> ?IS_SSL(Sock). @@ -109,6 +116,11 @@ recv(S, {DataTag, ClosedTag, ErrorTag}) -> Other -> {other, Other} end. +sync_recv(Sock, Length) when ?IS_SSL(Sock) -> + ssl:recv(Sock#ssl_socket.ssl, Length); +sync_recv(Sock, Length) -> + gen_tcp:recv(Sock, Length). + async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), @@ -148,8 +160,31 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl); close(Sock) when is_port(Sock) -> gen_tcp:close(Sock). -maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok; -maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok. +fast_close(Sock) when ?IS_SSL(Sock) -> + %% We cannot simply port_close the underlying tcp socket since the + %% TLS protocol is quite insistent that a proper closing handshake + %% should take place (see RFC 5245 s7.2.1). So we call ssl:close + %% instead, but that can block for a very long time, e.g. when + %% there is lots of pending output and there is tcp backpressure, + %% or the ssl_connection process has entered the the + %% workaround_transport_delivery_problems function during + %% termination, which, inexplicably, does a gen_tcp:recv(Socket, + %% 0), which may never return if the client doesn't send a FIN or + %% that gets swallowed by the network. Since there is no timeout + %% variant of ssl:close, we construct our own. + {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock#ssl_socket.ssl) end), + erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}), + receive + {Pid, ssl_close_timeout} -> + erlang:demonitor(MRef, [flush]), + exit(Pid, kill); + {'DOWN', MRef, process, Pid, _Reason} -> + ok + end, + catch port_close(Sock#ssl_socket.tcp), + ok; +fast_close(Sock) when is_port(Sock) -> + catch port_close(Sock), ok. sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl); sockname(Sock) when is_port(Sock) -> inet:sockname(Sock). @@ -160,25 +195,38 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock). peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl); peercert(Sock) when is_port(Sock) -> nossl. -tune_buffer_size(Sock) -> - case getopts(Sock, [sndbuf, recbuf, buffer]) of - {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), - setopts(Sock, [{buffer, BufSz}]); - Err -> Err +connection_string(Sock, Direction) -> + case socket_ends(Sock, Direction) of + {ok, {FromAddress, FromPort, ToAddress, ToPort}} -> + {ok, rabbit_misc:format( + "~s:~p -> ~s:~p", + [maybe_ntoab(FromAddress), FromPort, + maybe_ntoab(ToAddress), ToPort])}; + Error -> + Error end. -connection_string(Sock, Direction) -> - {From, To} = case Direction of - inbound -> {fun peername/1, fun sockname/1}; - outbound -> {fun sockname/1, fun peername/1} - end, +socket_ends(Sock, Direction) -> + {From, To} = sock_funs(Direction), case {From(Sock), To(Sock)} of {{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} -> - {ok, rabbit_misc:format("~s:~p -> ~s:~p", - [rabbit_misc:ntoab(FromAddress), FromPort, - rabbit_misc:ntoab(ToAddress), ToPort])}; + {ok, {rdns(FromAddress), FromPort, + rdns(ToAddress), ToPort}}; {{error, _Reason} = Error, _} -> Error; {_, {error, _Reason} = Error} -> Error end. + +maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr); +maybe_ntoab(Host) -> Host. + +rdns(Addr) -> + {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups), + case Lookup of + true -> list_to_binary(rabbit_networking:tcp_host(Addr)); + _ -> Addr + end. + +sock_funs(inbound) -> {fun peername/1, fun sockname/1}; +sock_funs(outbound) -> {fun sockname/1, fun peername/1}. |