diff options
author | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-10 17:46:15 +0100 |
---|---|---|
committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-05-10 17:46:15 +0100 |
commit | c7379ae4d821d9dbf391ec2f24cad9c2dacd5599 (patch) | |
tree | b4aa4bedc1d2966c2bf6c588bd32eada04cd6b2d | |
parent | 7e3713e13764ccba3e9edc76066f178981c771bf (diff) | |
parent | 8dbe0cd6721794063e1f043afdaabb0beb1afcb3 (diff) | |
download | rabbitmq-server-c7379ae4d821d9dbf391ec2f24cad9c2dacd5599.tar.gz |
Merging bug23559 to default
-rw-r--r-- | src/rabbit_net.erl | 26 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 197 |
2 files changed, 129 insertions, 94 deletions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index cbdadd16..b944ec81 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). -export([is_ssl/1, ssl_info/1, controlling_process/2, getstat/2, - async_recv/3, port_command/2, send/2, close/1, + recv/1, async_recv/3, port_command/2, setopts/2, send/2, close/1, sockname/1, peername/1, peercert/1]). %%--------------------------------------------------------------------------- @@ -42,9 +42,15 @@ -spec(getstat/2 :: (socket(), [stat_option()]) -> ok_val_or_error([{stat_option(), integer()}])). +-spec(recv/1 :: (socket()) -> + {'data', [char()] | binary()} | 'closed' | + rabbit_types:error(any()) | {'other', any()}). -spec(async_recv/3 :: (socket(), integer(), timeout()) -> rabbit_types:ok(any())). -spec(port_command/2 :: (socket(), iolist()) -> 'true'). +-spec(setopts/2 :: (socket(), [{atom(), any()} | + {raw, non_neg_integer(), non_neg_integer(), + binary()}]) -> ok_or_any_error()). -spec(send/2 :: (socket(), binary() | iolist()) -> ok_or_any_error()). -spec(close/1 :: (socket()) -> ok_or_any_error()). -spec(sockname/1 :: @@ -80,6 +86,19 @@ getstat(Sock, Stats) when ?IS_SSL(Sock) -> getstat(Sock, Stats) when is_port(Sock) -> inet:getstat(Sock, Stats). +recv(Sock) when ?IS_SSL(Sock) -> + recv(Sock#ssl_socket.ssl, {ssl, ssl_closed, ssl_error}); +recv(Sock) when is_port(Sock) -> + recv(Sock, {tcp, tcp_closed, tcp_error}). + +recv(S, {DataTag, ClosedTag, ErrorTag}) -> + receive + {DataTag, S, Data} -> {data, Data}; + {ClosedTag, S} -> closed; + {ErrorTag, S, Reason} -> {error, Reason}; + Other -> {other, Other} + end. + async_recv(Sock, Length, Timeout) when ?IS_SSL(Sock) -> Pid = self(), Ref = make_ref(), @@ -103,6 +122,11 @@ port_command(Sock, Data) when ?IS_SSL(Sock) -> port_command(Sock, Data) when is_port(Sock) -> erlang:port_command(Sock, Data). +setopts(Sock, Options) when ?IS_SSL(Sock) -> + ssl:setopts(Sock#ssl_socket.ssl, Options); +setopts(Sock, Options) when is_port(Sock) -> + inet:setopts(Sock, Options). + send(Sock, Data) when ?IS_SSL(Sock) -> ssl:send(Sock#ssl_socket.ssl, Data); send(Sock, Data) when is_port(Sock) -> gen_tcp:send(Sock, Data). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 42af91a8..f5214a77 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -38,10 +38,10 @@ %%-------------------------------------------------------------------------- --record(v1, {parent, sock, connection, callback, recv_length, recv_ref, +-record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun, auth_mechanism, - auth_state}). + channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, + auth_mechanism, auth_state}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -192,7 +192,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), try - mainloop(Deb, switch_callback( + recvloop(Deb, switch_callback( #v1{parent = Parent, sock = ClientSock, connection = #connection{ @@ -204,8 +204,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, client_properties = none, capabilities = []}, callback = uninitialized_callback, - recv_length = 0, - recv_ref = none, + recv_len = 0, + pending_recv = false, connection_state = pre_init, queue_collector = Collector, heartbeater = none, @@ -213,6 +213,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, rabbit_event:init_stats_timer(), channel_sup_sup_pid = ChannelSupSupPid, start_heartbeat_fun = StartHeartbeatFun, + buf = [], + buf_len = 0, auth_mechanism = none, auth_state = none }, @@ -237,92 +239,104 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, end, done. -mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> - receive - {inet_async, Sock, Ref, {ok, Data}} -> - mainloop(Deb, handle_input(State#v1.callback, Data, - State#v1{recv_ref = none})); - {inet_async, Sock, Ref, {error, closed}} -> - if State#v1.connection_state =:= closed -> - State; - true -> - throw(connection_closed_abruptly) - end; - {inet_async, Sock, Ref, {error, Reason}} -> - throw({inet_error, Reason}); - {conserve_memory, Conserve} -> - mainloop(Deb, internal_conserve_memory(Conserve, State)); - {channel_closing, ChPid} -> - ok = rabbit_channel:ready_for_close(ChPid), - channel_cleanup(ChPid), - mainloop(Deb, State); - {'EXIT', Parent, Reason} -> - terminate(io_lib:format("broker forced connection closure " - "with reason '~w'", [Reason]), State), - %% this is what we are expected to do according to - %% http://www.erlang.org/doc/man/sys.html - %% - %% If we wanted to be *really* nice we should wait for a - %% while for clients to close the socket at their end, - %% just as we do in the ordinary error case. However, - %% since this termination is initiated by our parent it is - %% probably more important to exit quickly. - exit(Reason); - {channel_exit, _Channel, E = {writer, send_failed, _Error}} -> - throw(E); - {channel_exit, Channel, Reason} -> - mainloop(Deb, handle_exception(State, Channel, Reason)); - {'DOWN', _MRef, process, ChPid, Reason} -> - mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); - terminate_connection -> - State; - handshake_timeout -> - if ?IS_RUNNING(State) orelse - State#v1.connection_state =:= closing orelse - State#v1.connection_state =:= closed -> - mainloop(Deb, State); - true -> - throw({handshake_timeout, State#v1.callback}) - end; - timeout -> - case State#v1.connection_state of - closed -> mainloop(Deb, State); - S -> throw({timeout, S}) - end; - {'$gen_call', From, {shutdown, Explanation}} -> - {ForceTermination, NewState} = terminate(Explanation, State), - gen_server:reply(From, ok), - case ForceTermination of - force -> ok; - normal -> mainloop(Deb, NewState) - end; - {'$gen_call', From, info} -> - gen_server:reply(From, infos(?INFO_KEYS, State)), - mainloop(Deb, State); - {'$gen_call', From, {info, Items}} -> - gen_server:reply(From, try {ok, infos(Items, State)} - catch Error -> {error, Error} - end), - mainloop(Deb, State); - {'$gen_cast', emit_stats} -> - State1 = internal_emit_stats(State), - mainloop(Deb, State1); - {system, From, Request} -> - sys:handle_system_msg(Request, From, - Parent, ?MODULE, Deb, State); - Other -> - %% internal error -> something worth dying for - exit({unexpected_message, Other}) +recvloop(Deb, State = #v1{pending_recv = true}) -> + mainloop(Deb, State); +recvloop(Deb, State = #v1{connection_state = blocked}) -> + mainloop(Deb, State); +recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) + when BufLen < RecvLen -> + ok = rabbit_net:setopts(Sock, [{active, once}]), + mainloop(Deb, State#v1{pending_recv = true}); +recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> + {Data, Rest} = split_binary(case Buf of + [B] -> B; + _ -> list_to_binary(lists:reverse(Buf)) + end, RecvLen), + recvloop(Deb, handle_input(State#v1.callback, Data, + State#v1{buf = [Rest], + buf_len = BufLen - RecvLen})). + +mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> + case rabbit_net:recv(Sock) of + {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], + buf_len = BufLen + size(Data), + pending_recv = false}); + closed -> if State#v1.connection_state =:= closed -> + State; + true -> + throw(connection_closed_abruptly) + end; + {error, Reason} -> throw({inet_error, Reason}); + {other, Other} -> handle_other(Other, Deb, State) end. +handle_other({conserve_memory, Conserve}, Deb, State) -> + recvloop(Deb, internal_conserve_memory(Conserve, State)); +handle_other({channel_closing, ChPid}, Deb, State) -> + ok = rabbit_channel:ready_for_close(ChPid), + channel_cleanup(ChPid), + mainloop(Deb, State); +handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> + terminate(io_lib:format("broker forced connection closure " + "with reason '~w'", [Reason]), State), + %% this is what we are expected to do according to + %% http://www.erlang.org/doc/man/sys.html + %% + %% If we wanted to be *really* nice we should wait for a while for + %% clients to close the socket at their end, just as we do in the + %% ordinary error case. However, since this termination is + %% initiated by our parent it is probably more important to exit + %% quickly. + exit(Reason); +handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}}, + _Deb, _State) -> + throw(E); +handle_other({channel_exit, Channel, Reason}, Deb, State) -> + mainloop(Deb, handle_exception(State, Channel, Reason)); +handle_other({'DOWN', _MRef, process, ChPid, Reason}, Deb, State) -> + mainloop(Deb, handle_dependent_exit(ChPid, Reason, State)); +handle_other(terminate_connection, _Deb, State) -> + State; +handle_other(handshake_timeout, Deb, State) + when ?IS_RUNNING(State) orelse + State#v1.connection_state =:= closing orelse + State#v1.connection_state =:= closed -> + mainloop(Deb, State); +handle_other(handshake_timeout, _Deb, State) -> + throw({handshake_timeout, State#v1.callback}); +handle_other(timeout, Deb, State = #v1{connection_state = closed}) -> + mainloop(Deb, State); +handle_other(timeout, _Deb, #v1{connection_state = S}) -> + throw({timeout, S}); +handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> + {ForceTermination, NewState} = terminate(Explanation, State), + gen_server:reply(From, ok), + case ForceTermination of + force -> ok; + normal -> mainloop(Deb, NewState) + end; +handle_other({'$gen_call', From, info}, Deb, State) -> + gen_server:reply(From, infos(?INFO_KEYS, State)), + mainloop(Deb, State); +handle_other({'$gen_call', From, {info, Items}}, Deb, State) -> + gen_server:reply(From, try {ok, infos(Items, State)} + catch Error -> {error, Error} + end), + mainloop(Deb, State); +handle_other({'$gen_cast', emit_stats}, Deb, State) -> + mainloop(Deb, internal_emit_stats(State)); +handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> + sys:handle_system_msg(Request, From, Parent, ?MODULE, Deb, State); +handle_other(Other, _Deb, _State) -> + %% internal error -> something worth dying for + exit({unexpected_message, Other}). + switch_callback(State = #v1{connection_state = blocked, heartbeater = Heartbeater}, Callback, Length) -> ok = rabbit_heartbeat:pause_monitor(Heartbeater), - State#v1{callback = Callback, recv_length = Length, recv_ref = none}; + State#v1{callback = Callback, recv_len = Length}; switch_callback(State, Callback, Length) -> - Ref = inet_op(fun () -> rabbit_net:async_recv( - State#v1.sock, Length, infinity) end), - State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}. + State#v1{callback = Callback, recv_len = Length}. terminate(Explanation, State) when ?IS_RUNNING(State) -> {normal, send_exception(State, 0, @@ -336,12 +350,9 @@ internal_conserve_memory(true, State = #v1{connection_state = running}) -> internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> State#v1{connection_state = running}; internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater, - callback = Callback, - recv_length = Length, - recv_ref = none}) -> + heartbeater = Heartbeater}) -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), - switch_callback(State#v1{connection_state = running}, Callback, Length); + State#v1{connection_state = running}; internal_conserve_memory(_Conserve, State) -> State. @@ -513,8 +524,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> - handle_frame(Type, Channel, Payload, - switch_callback(State, frame_header, 7)); + switch_callback(handle_frame(Type, Channel, Payload, State), + frame_header, 7); _ -> throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) end; |