summaryrefslogtreecommitdiff
path: root/src/rabbit_net.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_net.erl')
-rw-r--r--src/rabbit_net.erl94
1 files changed, 71 insertions, 23 deletions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index bedf5142..e8c96818 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -10,17 +10,17 @@
%%
%% The Original Code is RabbitMQ.
%%
-%% The Initial Developer of the Original Code is VMware, Inc.
-%% Copyright (c) 2007-2012 VMware, Inc. All rights reserved.
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2013 GoPivotal, Inc. All rights reserved.
%%
-module(rabbit_net).
-include("rabbit.hrl").
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
- recv/1, async_recv/3, port_command/2, getopts/2, setopts/2, send/2,
- close/1, maybe_fast_close/1, sockname/1, peername/1, peercert/1,
- tune_buffer_size/1, connection_string/2]).
+ recv/1, sync_recv/2, async_recv/3, port_command/2, getopts/2,
+ setopts/2, send/2, close/1, fast_close/1, sockname/1, peername/1,
+ peercert/1, connection_string/2, socket_ends/2]).
%%---------------------------------------------------------------------------
@@ -36,7 +36,7 @@
-type(socket() :: port() | #ssl_socket{}).
-type(opts() :: [{atom(), any()} |
{raw, non_neg_integer(), non_neg_integer(), binary()}]).
-
+-type(host_or_ip() :: binary() | inet:ip_address()).
-spec(is_ssl/1 :: (socket()) -> boolean()).
-spec(ssl_info/1 :: (socket())
-> 'nossl' | ok_val_or_error(
@@ -48,6 +48,8 @@
-spec(recv/1 :: (socket()) ->
{'data', [char()] | binary()} | 'closed' |
rabbit_types:error(any()) | {'other', any()}).
+-spec(sync_recv/2 :: (socket(), integer()) -> rabbit_types:ok(binary()) |
+ rabbit_types:error(any())).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
@@ -59,7 +61,7 @@
-spec(setopts/2 :: (socket(), opts()) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
--spec(maybe_fast_close/1 :: (socket()) -> ok_or_any_error()).
+-spec(fast_close/1 :: (socket()) -> ok_or_any_error()).
-spec(sockname/1 ::
(socket())
-> ok_val_or_error({inet:ip_address(), rabbit_networking:ip_port()})).
@@ -69,14 +71,19 @@
-spec(peercert/1 ::
(socket())
-> 'nossl' | ok_val_or_error(rabbit_ssl:certificate())).
--spec(tune_buffer_size/1 :: (socket()) -> ok_or_any_error()).
-spec(connection_string/2 ::
(socket(), 'inbound' | 'outbound') -> ok_val_or_error(string())).
+-spec(socket_ends/2 ::
+ (socket(), 'inbound' | 'outbound')
+ -> ok_val_or_error({host_or_ip(), rabbit_networking:ip_port(),
+ host_or_ip(), rabbit_networking:ip_port()})).
-endif.
%%---------------------------------------------------------------------------
+-define(SSL_CLOSE_TIMEOUT, 5000).
+
-define(IS_SSL(Sock), is_record(Sock, ssl_socket)).
is_ssl(Sock) -> ?IS_SSL(Sock).
@@ -109,6 +116,11 @@ recv(S, {DataTag, ClosedTag, ErrorTag}) ->
Other -> {other, Other}
end.
+sync_recv(Sock, Length) when ?IS_SSL(Sock) ->
+ ssl:recv(Sock#ssl_socket.ssl, Length);
+sync_recv(Sock, Length) ->
+ gen_tcp:recv(Sock, Length).
+
async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
@@ -148,8 +160,31 @@ send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
close(Sock) when ?IS_SSL(Sock) -> ssl:close(Sock#ssl_socket.ssl);
close(Sock) when is_port(Sock) -> gen_tcp:close(Sock).
-maybe_fast_close(Sock) when ?IS_SSL(Sock) -> ok;
-maybe_fast_close(Sock) when is_port(Sock) -> erlang:port_close(Sock), ok.
+fast_close(Sock) when ?IS_SSL(Sock) ->
+ %% We cannot simply port_close the underlying tcp socket since the
+ %% TLS protocol is quite insistent that a proper closing handshake
+ %% should take place (see RFC 5245 s7.2.1). So we call ssl:close
+ %% instead, but that can block for a very long time, e.g. when
+ %% there is lots of pending output and there is tcp backpressure,
+ %% or the ssl_connection process has entered the the
+ %% workaround_transport_delivery_problems function during
+ %% termination, which, inexplicably, does a gen_tcp:recv(Socket,
+ %% 0), which may never return if the client doesn't send a FIN or
+ %% that gets swallowed by the network. Since there is no timeout
+ %% variant of ssl:close, we construct our own.
+ {Pid, MRef} = spawn_monitor(fun () -> ssl:close(Sock#ssl_socket.ssl) end),
+ erlang:send_after(?SSL_CLOSE_TIMEOUT, self(), {Pid, ssl_close_timeout}),
+ receive
+ {Pid, ssl_close_timeout} ->
+ erlang:demonitor(MRef, [flush]),
+ exit(Pid, kill);
+ {'DOWN', MRef, process, Pid, _Reason} ->
+ ok
+ end,
+ catch port_close(Sock#ssl_socket.tcp),
+ ok;
+fast_close(Sock) when is_port(Sock) ->
+ catch port_close(Sock), ok.
sockname(Sock) when ?IS_SSL(Sock) -> ssl:sockname(Sock#ssl_socket.ssl);
sockname(Sock) when is_port(Sock) -> inet:sockname(Sock).
@@ -160,25 +195,38 @@ peername(Sock) when is_port(Sock) -> inet:peername(Sock).
peercert(Sock) when ?IS_SSL(Sock) -> ssl:peercert(Sock#ssl_socket.ssl);
peercert(Sock) when is_port(Sock) -> nossl.
-tune_buffer_size(Sock) ->
- case getopts(Sock, [sndbuf, recbuf, buffer]) of
- {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]),
- setopts(Sock, [{buffer, BufSz}]);
- Err -> Err
+connection_string(Sock, Direction) ->
+ case socket_ends(Sock, Direction) of
+ {ok, {FromAddress, FromPort, ToAddress, ToPort}} ->
+ {ok, rabbit_misc:format(
+ "~s:~p -> ~s:~p",
+ [maybe_ntoab(FromAddress), FromPort,
+ maybe_ntoab(ToAddress), ToPort])};
+ Error ->
+ Error
end.
-connection_string(Sock, Direction) ->
- {From, To} = case Direction of
- inbound -> {fun peername/1, fun sockname/1};
- outbound -> {fun sockname/1, fun peername/1}
- end,
+socket_ends(Sock, Direction) ->
+ {From, To} = sock_funs(Direction),
case {From(Sock), To(Sock)} of
{{ok, {FromAddress, FromPort}}, {ok, {ToAddress, ToPort}}} ->
- {ok, rabbit_misc:format("~s:~p -> ~s:~p",
- [rabbit_misc:ntoab(FromAddress), FromPort,
- rabbit_misc:ntoab(ToAddress), ToPort])};
+ {ok, {rdns(FromAddress), FromPort,
+ rdns(ToAddress), ToPort}};
{{error, _Reason} = Error, _} ->
Error;
{_, {error, _Reason} = Error} ->
Error
end.
+
+maybe_ntoab(Addr) when is_tuple(Addr) -> rabbit_misc:ntoab(Addr);
+maybe_ntoab(Host) -> Host.
+
+rdns(Addr) ->
+ {ok, Lookup} = application:get_env(rabbit, reverse_dns_lookups),
+ case Lookup of
+ true -> list_to_binary(rabbit_networking:tcp_host(Addr));
+ _ -> Addr
+ end.
+
+sock_funs(inbound) -> {fun peername/1, fun sockname/1};
+sock_funs(outbound) -> {fun sockname/1, fun peername/1}.