summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl31
1 files changed, 24 insertions, 7 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 7e68b3ed..ce26c11a 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -166,14 +166,27 @@ teardown_profiling(Value) ->
fprof:analyse([{dest, []}, {cols, 100}])
end.
+inet_op(F) -> rabbit_misc:throw_on_error(inet_error, F).
+
+peername(Sock) ->
+ try
+ {Address, Port} = inet_op(fun () -> inet:peername(Sock) end),
+ AddressS = inet_parse:ntoa(Address),
+ {AddressS, Port}
+ catch
+ Ex -> rabbit_log:error("error on TCP connection ~p:~p~n",
+ [self(), Ex]),
+ rabbit_log:info("closing TCP connection ~p", [self()]),
+ exit(normal)
+ end.
+
start_connection(Parent, Deb, ClientSock) ->
- ProfilingValue = setup_profiling(),
process_flag(trap_exit, true),
- {ok, {PeerAddress, PeerPort}} = inet:peername(ClientSock),
- PeerAddressS = inet_parse:ntoa(PeerAddress),
- rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
- [self(), PeerAddressS, PeerPort]),
+ {PeerAddressS, PeerPort} = peername(ClientSock),
+ ProfilingValue = setup_profiling(),
try
+ rabbit_log:info("starting TCP connection ~p from ~s:~p~n",
+ [self(), PeerAddressS, PeerPort]),
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
mainloop(Parent, Deb, switch_callback(
@@ -266,7 +279,8 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) ->
end.
switch_callback(OldState, NewCallback, Length) ->
- {ok, Ref} = prim_inet:async_recv(OldState#v1.sock, Length, -1),
+ Ref = inet_op(fun () -> prim_inet:async_recv(
+ OldState#v1.sock, Length, -1) end),
OldState#v1{callback = NewCallback,
recv_ref = Ref}.
@@ -472,7 +486,10 @@ handle_input(handshake, <<"AMQP",1,1,ProtocolMajor,ProtocolMinor>>,
end;
handle_input(handshake, Other, #v1{sock = Sock}) ->
- ok = gen_tcp:send(Sock, <<"AMQP",1,1,?PROTOCOL_VERSION_MAJOR,?PROTOCOL_VERSION_MINOR>>),
+ ok = inet_op(fun () -> gen_tcp:send(
+ Sock, <<"AMQP",1,1,
+ ?PROTOCOL_VERSION_MAJOR,
+ ?PROTOCOL_VERSION_MINOR>>) end),
throw({bad_header, Other});
handle_input(Callback, Data, _State) ->