authorMatthias Radestock <>2011-04-07 14:30:47 +0100
committerMatthias Radestock <>2011-04-07 14:30:47 +0100
commita5fccc82365e8bd9b39dbc0202d14772d5b2d7aa (patch)
parent93674ad1716b7c5dd07bcbc255599d4e3fd12828 (diff)
make ssl work
...and handle socket errors It turns out that for active sockets the messages sent by tcp and ssl sockets to the controlling process differ gratuitously.
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -18,7 +18,7 @@
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
- async_recv/3, port_command/2, setopts/2, send/2, close/1,
+ recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
sockname/1, peername/1, peercert/1]).
@@ -42,6 +42,9 @@
-spec(getstat/2 ::
(socket(), [stat_option()])
-> ok_val_or_error([{stat_option(), integer()}])).
+-spec(recv/1 :: (socket()) ->
+ {'data', [char()] | binary()} | 'closed' |
+ rabbit_types:error(any()) | {'other', any()}).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
@@ -83,6 +86,23 @@ getstat(Sock, Stats) when ?IS_SSL(Sock) ->
getstat(Sock, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats).
+recv(Sock) when ?IS_SSL(Sock) ->
+ S = Sock#ssl_socket.ssl,
+ receive
+ {ssl, S, Data} -> {data, Data};
+ {ssl_closed, S} -> closed;
+ {ssl_error, S, Reason} -> {error, Reason};
+ Other -> {other, Other}
+ end;
+recv(Sock) ->
+ S = Sock,
+ receive
+ {tcp, S, Data} -> {data, Data};
+ {tcp_closed, S} -> closed;
+ {tcp_error, S, Reason} -> {error, Reason};
+ Other -> {other, Other}
+ end.
async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e210dba1..4dcb7446 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -252,82 +252,80 @@ recvloop(Deb, State = #v1{sock = Sock, recv_length = Length, buf = Buf}) ->
State#v1{buf = [Rest]}))
-mainloop(Deb, State = #v1{parent = Parent, sock = Sock}) ->
- receive
- {tcp, Sock, Data} ->
- recvloop(Deb, State#v1{buf = [Data | State#v1.buf],
- pending_recv = false});
- {tcp_closed, Sock} ->
- if State#v1.connection_state =:= closed ->
- State;
- true ->
- throw(connection_closed_abruptly)
- end;
- {conserve_memory, Conserve} ->
- recvloop(Deb, internal_conserve_memory(Conserve, State));
- {channel_closing, ChPid} ->
- ok = rabbit_channel:ready_for_close(ChPid),
- channel_cleanup(ChPid),
- mainloop(Deb, State);
- {'EXIT', Parent, Reason} ->
- terminate(io_lib:format("broker forced connection closure "
- "with reason '~w'", [Reason]), State),
- %% this is what we are expected to do according to
- %%
- %%
- %% If we wanted to be *really* nice we should wait for a
- %% while for clients to close the socket at their end,
- %% just as we do in the ordinary error case. However,
- %% since this termination is initiated by our parent it is
- %% probably more important to exit quickly.
- exit(Reason);
- {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
- throw(E);
- {channel_exit, Channel, Reason} ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
- {'DOWN', _MRef, process, ChPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
- terminate_connection ->
- State;
- handshake_timeout ->
- if ?IS_RUNNING(State) orelse
- State#v1.connection_state =:= closing orelse
- State#v1.connection_state =:= closed ->
- mainloop(Deb, State);
- true ->
- throw({handshake_timeout, State#v1.callback})
- end;
- timeout ->
- case State#v1.connection_state of
- closed -> mainloop(Deb, State);
- S -> throw({timeout, S})
- end;
- {'$gen_call', From, {shutdown, Explanation}} ->
- {ForceTermination, NewState} = terminate(Explanation, State),
- gen_server:reply(From, ok),
- case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
- end;
- {'$gen_call', From, info} ->
- gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
- {'$gen_call', From, {info, Items}} ->
- gen_server:reply(From, try {ok, infos(Items, State)}
- catch Error -> {error, Error}
- end),
- mainloop(Deb, State);
- {'$gen_cast', emit_stats} ->
- State1 = internal_emit_stats(State),
- mainloop(Deb, State1);
- {system, From, Request} ->
- sys:handle_system_msg(Request, From,
- Parent, ?MODULE, Deb, State);
- Other ->
- %% internal error -> something worth dying for
- exit({unexpected_message, Other})
+mainloop(Deb, State = #v1{sock = Sock}) ->
+ case rabbit_net:recv(Sock) of
+ {data, Data} -> recvloop(Deb, State#v1{buf = [Data | State#v1.buf],
+ pending_recv = false});
+ closed -> if State#v1.connection_state =:= closed ->
+ State;
+ true ->
+ throw(connection_closed_abruptly)
+ end;
+ {error, Reason} -> throw({inet_error, Reason});
+ {other, Other} -> handle_other(Other, Deb, State)
+handle_other({conserve_memory, Conserve}, Deb, State) ->
+ recvloop(Deb, internal_conserve_memory(Conserve, State));
+handle_other({channel_closing, ChPid}, Deb, State) ->
+ ok = rabbit_channel:ready_for_close(ChPid),
+ channel_cleanup(ChPid),
+ mainloop(Deb, State);
+handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
+ %% this is what we are expected to do according to
+ %%
+ %%
+ %% If we wanted to be *really* nice we should wait for a while for
+ %% clients to close the socket at their end, just as we do in the
+ %% ordinary error case. However, since this termination is
+ %% initiated by our parent it is probably more important to exit
+ %% quickly.
+ exit(Reason);
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
+ _Deb, _State) ->
+ throw(E);
+handle_other({channel_exit, Channel, Reason}, Deb, State) ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
+handle_other(terminate_connection, _Deb, State) ->
+ State;
+handle_other(handshake_timeout, Deb, State)
+ when ?IS_RUNNING(State) orelse
+ State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed ->
+ mainloop(Deb, State);
+handle_other(handshake_timeout, _Deb, State) ->
+ throw({handshake_timeout, State#v1.callback});
+handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
+ mainloop(Deb, State);
+handle_other(timeout, _Deb, #v1{connection_state = S}) ->
+ throw({timeout, S});
+handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> ok;
+ normal -> mainloop(Deb, NewState)
+ end;
+handle_other({'$gen_call', From, info}, Deb, State) ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ mainloop(Deb, State);
+handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ mainloop(Deb, State);
+handle_other({'$gen_cast', emit_stats}, Deb, State) ->
+ mainloop(Deb, internal_emit_stats(State));
+handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other(Other, _Deb, _State) ->
+ %% internal error -> something worth dying for
+ exit({unexpected_message, Other}).
switch_callback(State = #v1{connection_state = blocked,
heartbeater = Heartbeater}, Callback, Length) ->
ok = rabbit_heartbeat:pause_monitor(Heartbeater),