summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-05-10 17:46:15 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-05-10 17:46:15 +0100
commitc7379ae4d821d9dbf391ec2f24cad9c2dacd5599 (patch)
treeb4aa4bedc1d2966c2bf6c588bd32eada04cd6b2d
parent7e3713e13764ccba3e9edc76066f178981c771bf (diff)
parent8dbe0cd6721794063e1f043afdaabb0beb1afcb3 (diff)
downloadrabbitmq-server-c7379ae4d821d9dbf391ec2f24cad9c2dacd5599.tar.gz
Merging bug23559 to default
-rw-r--r--src/rabbit_net.erl26
-rw-r--r--src/rabbit_reader.erl197
2 files changed, 129 insertions, 94 deletions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index cbdadd16..b944ec81 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
-export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2,
- async_recv/3, port_command/2, send/2, close/1,
+ recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1,
sockname/1, peername/1, peercert/1]).
%%---------------------------------------------------------------------------
@@ -42,9 +42,15 @@
-spec(getstat/2 ::
(socket(), [stat_option()])
-> ok_val_or_error([{stat_option(), integer()}])).
+-spec(recv/1 :: (socket()) ->
+ {'data', [char()] | binary()} | 'closed' |
+ rabbit_types:error(any()) | {'other', any()}).
-spec(async_recv/3 ::
(socket(), integer(), timeout()) -> rabbit_types:ok(any())).
-spec(port_command/2 :: (socket(), iolist()) -> 'true').
+-spec(setopts/2 :: (socket(), [{atom(), any()} |
+ {raw, non_neg_integer(), non_neg_integer(),
+ binary()}]) -> ok_or_any_error()).
-spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()).
-spec(close/1 :: (socket()) -> ok_or_any_error()).
-spec(sockname/1 ::
@@ -80,6 +86,19 @@ getstat(Sock, Stats) when ?IS_SSL(Sock) ->
getstat(Sock, Stats) when is_port(Sock) ->
inet:getstat(Sock, Stats).
+recv(Sock) when ?IS_SSL(Sock) ->
+ recv(Sock#ssl_socket.ssl, {ssl, ssl_closed, ssl_error});
+recv(Sock) when is_port(Sock) ->
+ recv(Sock, {tcp, tcp_closed, tcp_error}).
+
+recv(S, {DataTag, ClosedTag, ErrorTag}) ->
+ receive
+ {DataTag, S, Data} -> {data, Data};
+ {ClosedTag, S} -> closed;
+ {ErrorTag, S, Reason} -> {error, Reason};
+ Other -> {other, Other}
+ end.
+
async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) ->
Pid = self(),
Ref = make_ref(),
@@ -103,6 +122,11 @@ port_command(Sock, Data) when ?IS_SSL(Sock) ->
port_command(Sock, Data) when is_port(Sock) ->
erlang:port_command(Sock, Data).
+setopts(Sock, Options) when ?IS_SSL(Sock) ->
+ ssl:setopts(Sock#ssl_socket.ssl, Options);
+setopts(Sock, Options) when is_port(Sock) ->
+ inet:setopts(Sock, Options).
+
send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data);
send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 42af91a8..f5214a77 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -38,10 +38,10 @@
%%--------------------------------------------------------------------------
--record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
+-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism,
- auth_state}).
+ channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
+ auth_mechanism, auth_state}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -192,7 +192,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
try
- mainloop(Deb, switch_callback(
+ recvloop(Deb, switch_callback(
#v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -204,8 +204,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
client_properties = none,
capabilities = []},
callback = uninitialized_callback,
- recv_length = 0,
- recv_ref = none,
+ recv_len = 0,
+ pending_recv = false,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
@@ -213,6 +213,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
rabbit_event:init_stats_timer(),
channel_sup_sup_pid = ChannelSupSupPid,
start_heartbeat_fun = StartHeartbeatFun,
+ buf = [],
+ buf_len = 0,
auth_mechanism = none,
auth_state = none
},
@@ -237,92 +239,104 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end,
done.
-mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
- receive
- {inet_async, Sock, Ref, {ok, Data}} ->
- mainloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{recv_ref = none}));
- {inet_async, Sock, Ref, {error, closed}} ->
- if State#v1.connection_state =:= closed ->
- State;
- true ->
- throw(connection_closed_abruptly)
- end;
- {inet_async, Sock, Ref, {error, Reason}} ->
- throw({inet_error, Reason});
- {conserve_memory, Conserve} ->
- mainloop(Deb, internal_conserve_memory(Conserve, State));
- {channel_closing, ChPid} ->
- ok = rabbit_channel:ready_for_close(ChPid),
- channel_cleanup(ChPid),
- mainloop(Deb, State);
- {'EXIT', Parent, Reason} ->
- terminate(io_lib:format("broker forced connection closure "
- "with reason '~w'", [Reason]), State),
- %% this is what we are expected to do according to
- %% http://www.erlang.org/doc/man/sys.html
- %%
- %% If we wanted to be *really* nice we should wait for a
- %% while for clients to close the socket at their end,
- %% just as we do in the ordinary error case. However,
- %% since this termination is initiated by our parent it is
- %% probably more important to exit quickly.
- exit(Reason);
- {channel_exit, _Channel, E = {writer, send_failed, _Error}} ->
- throw(E);
- {channel_exit, Channel, Reason} ->
- mainloop(Deb, handle_exception(State, Channel, Reason));
- {'DOWN', _MRef, process, ChPid, Reason} ->
- mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
- terminate_connection ->
- State;
- handshake_timeout ->
- if ?IS_RUNNING(State) orelse
- State#v1.connection_state =:= closing orelse
- State#v1.connection_state =:= closed ->
- mainloop(Deb, State);
- true ->
- throw({handshake_timeout, State#v1.callback})
- end;
- timeout ->
- case State#v1.connection_state of
- closed -> mainloop(Deb, State);
- S -> throw({timeout, S})
- end;
- {'$gen_call', From, {shutdown, Explanation}} ->
- {ForceTermination, NewState} = terminate(Explanation, State),
- gen_server:reply(From, ok),
- case ForceTermination of
- force -> ok;
- normal -> mainloop(Deb, NewState)
- end;
- {'$gen_call', From, info} ->
- gen_server:reply(From, infos(?INFO_KEYS, State)),
- mainloop(Deb, State);
- {'$gen_call', From, {info, Items}} ->
- gen_server:reply(From, try {ok, infos(Items, State)}
- catch Error -> {error, Error}
- end),
- mainloop(Deb, State);
- {'$gen_cast', emit_stats} ->
- State1 = internal_emit_stats(State),
- mainloop(Deb, State1);
- {system, From, Request} ->
- sys:handle_system_msg(Request, From,
- Parent, ?MODULE, Deb, State);
- Other ->
- %% internal error -> something worth dying for
- exit({unexpected_message, Other})
+recvloop(Deb, State = #v1{pending_recv = true}) ->
+ mainloop(Deb, State);
+recvloop(Deb, State = #v1{connection_state = blocked}) ->
+ mainloop(Deb, State);
+recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
+ when BufLen < RecvLen ->
+ ok = rabbit_net:setopts(Sock, [{active, once}]),
+ mainloop(Deb, State#v1{pending_recv = true});
+recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
+ {Data, Rest} = split_binary(case Buf of
+ [B] -> B;
+ _ -> list_to_binary(lists:reverse(Buf))
+ end, RecvLen),
+ recvloop(Deb, handle_input(State#v1.callback, Data,
+ State#v1{buf = [Rest],
+ buf_len = BufLen - RecvLen})).
+
+mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
+ case rabbit_net:recv(Sock) of
+ {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
+ buf_len = BufLen + size(Data),
+ pending_recv = false});
+ closed -> if State#v1.connection_state =:= closed ->
+ State;
+ true ->
+ throw(connection_closed_abruptly)
+ end;
+ {error, Reason} -> throw({inet_error, Reason});
+ {other, Other} -> handle_other(Other, Deb, State)
end.
+handle_other({conserve_memory, Conserve}, Deb, State) ->
+ recvloop(Deb, internal_conserve_memory(Conserve, State));
+handle_other({channel_closing, ChPid}, Deb, State) ->
+ ok = rabbit_channel:ready_for_close(ChPid),
+ channel_cleanup(ChPid),
+ mainloop(Deb, State);
+handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
+ terminate(io_lib:format("broker forced connection closure "
+ "with reason '~w'", [Reason]), State),
+ %% this is what we are expected to do according to
+ %% http://www.erlang.org/doc/man/sys.html
+ %%
+ %% If we wanted to be *really* nice we should wait for a while for
+ %% clients to close the socket at their end, just as we do in the
+ %% ordinary error case. However, since this termination is
+ %% initiated by our parent it is probably more important to exit
+ %% quickly.
+ exit(Reason);
+handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
+ _Deb, _State) ->
+ throw(E);
+handle_other({channel_exit, Channel, Reason}, Deb, State) ->
+ mainloop(Deb, handle_exception(State, Channel, Reason));
+handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) ->
+ mainloop(Deb, handle_dependent_exit(ChPid, Reason, State));
+handle_other(terminate_connection, _Deb, State) ->
+ State;
+handle_other(handshake_timeout, Deb, State)
+ when ?IS_RUNNING(State) orelse
+ State#v1.connection_state =:= closing orelse
+ State#v1.connection_state =:= closed ->
+ mainloop(Deb, State);
+handle_other(handshake_timeout, _Deb, State) ->
+ throw({handshake_timeout, State#v1.callback});
+handle_other(timeout, Deb, State = #v1{connection_state = closed}) ->
+ mainloop(Deb, State);
+handle_other(timeout, _Deb, #v1{connection_state = S}) ->
+ throw({timeout, S});
+handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
+ {ForceTermination, NewState} = terminate(Explanation, State),
+ gen_server:reply(From, ok),
+ case ForceTermination of
+ force -> ok;
+ normal -> mainloop(Deb, NewState)
+ end;
+handle_other({'$gen_call', From, info}, Deb, State) ->
+ gen_server:reply(From, infos(?INFO_KEYS, State)),
+ mainloop(Deb, State);
+handle_other({'$gen_call', From, {info, Items}}, Deb, State) ->
+ gen_server:reply(From, try {ok, infos(Items, State)}
+ catch Error -> {error, Error}
+ end),
+ mainloop(Deb, State);
+handle_other({'$gen_cast', emit_stats}, Deb, State) ->
+ mainloop(Deb, internal_emit_stats(State));
+handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
+ sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State);
+handle_other(Other, _Deb, _State) ->
+ %% internal error -> something worth dying for
+ exit({unexpected_message, Other}).
+
switch_callback(State = #v1{connection_state = blocked,
heartbeater = Heartbeater}, Callback, Length) ->
ok = rabbit_heartbeat:pause_monitor(Heartbeater),
- State#v1{callback = Callback, recv_length = Length, recv_ref = none};
+ State#v1{callback = Callback, recv_len = Length};
switch_callback(State, Callback, Length) ->
- Ref = inet_op(fun () -> rabbit_net:async_recv(
- State#v1.sock, Length, infinity) end),
- State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}.
+ State#v1{callback = Callback, recv_len = Length}.
terminate(Explanation, State) when ?IS_RUNNING(State) ->
{normal, send_exception(State, 0,
@@ -336,12 +350,9 @@ internal_conserve_memory(true, State = #v1{connection_state = running}) ->
internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
State#v1{connection_state = running};
internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater,
- callback = Callback,
- recv_length = Length,
- recv_ref = none}) ->
+ heartbeater = Heartbeater}) ->
ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- switch_callback(State#v1{connection_state = running}, Callback, Length);
+ State#v1{connection_state = running};
internal_conserve_memory(_Conserve, State) ->
State.
@@ -513,8 +524,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize},
PayloadAndMarker, State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
- handle_frame(Type, Channel, Payload,
- switch_callback(State, frame_header, 7));
+ switch_callback(handle_frame(Type, Channel, Payload, State),
+ frame_header, 7);
_ ->
throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
end;