diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-30 12:20:12 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-30 12:20:12 +0000 |
commit | 4e9f0dc61d494aded3e6600304215b40a3c8f32d (patch) | |
tree | 637582be639da3bcda98a6d64916bca25b1e632b | |
parent | 963d834477df9a4bedaca26b13430b6ee8ef2858 (diff) | |
download | rabbitmq-server-4e9f0dc61d494aded3e6600304215b40a3c8f32d.tar.gz |
experiment with uing {active,once} instead of prim_inet:async_recv
-rw-r--r-- | src/rabbit_net.erl | 10 | ||||
-rw-r--r-- | src/rabbit_reader.erl | 52 |
2 files changed, 39 insertions, 23 deletions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl index 0940dce2..c9e3cc47 100644 --- a/src/rabbit_net.erl +++ b/src/rabbit_net.erl @@ -34,7 +34,7 @@ -export([async_recv/3, close/1, controlling_process/2, getstat/2, peername/1, peercert/1, port_command/2, - send/2, sockname/1, is_ssl/1]). + send/2, sockname/1, is_ssl/1, setopts/2]). %%--------------------------------------------------------------------------- @@ -69,6 +69,9 @@ -spec(getstat/2 :: (socket(), [stat_option()]) -> ok_val_or_error([{stat_option(), integer()}])). +-spec(setopts/2 :: (socket(), [{atom(), any()} | + {raw, non_neg_integer(), non_neg_integer(), + binary()}]) -> ok_or_any_error()). -endif. @@ -137,3 +140,8 @@ sockname(Sock) when is_port(Sock) -> is_ssl(Sock) -> ?IS_SSL(Sock). + +setopts(Sock, Options) when ?IS_SSL(Sock) -> + ssl:setopts(Sock#ssl_socket.ssl, Options); +setopts(Sock, Options) when is_port(Sock) -> + inet:setopts(Sock, Options). diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4dd150a2..1d5b2021 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -54,9 +54,9 @@ %--------------------------------------------------------------------------- --record(v1, {parent, sock, connection, callback, recv_length, recv_ref, +-record(v1, {parent, sock, connection, callback, recv_length, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, - channel_sup_sup_pid, start_heartbeat_fun}). + channel_sup_sup_pid, start_heartbeat_fun, buf}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -275,7 +275,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(), handshake_timeout), try - mainloop(Deb, switch_callback( + recvloop(Deb, switch_callback( #v1{parent = Parent, sock = ClientSock, connection = #connection{ @@ -287,14 +287,15 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, client_properties = none}, callback = uninitialized_callback, recv_length = 0, - recv_ref = none, + pending_recv = false, connection_state = pre_init, queue_collector = Collector, heartbeater = none, stats_timer = rabbit_event:init_stats_timer(), channel_sup_sup_pid = ChannelSupSupPid, - start_heartbeat_fun = StartHeartbeatFun + start_heartbeat_fun = StartHeartbeatFun, + buf = [<<>>] }, handshake, 8)) catch @@ -317,21 +318,33 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, end, done. -mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> +recvloop(Deb, State = #v1{pending_recv = true}) -> + mainloop(Deb, State); +recvloop(Deb, State = #v1{connection_state = blocked}) -> + mainloop(Deb, State); +recvloop(Deb, State = #v1{sock = Sock, recv_length = Length, buf = Buf}) -> + case iolist_size(Buf) < Length of + true -> ok = rabbit_net:setopts(Sock, [{active, once}]), + mainloop(Deb, State#v1{pending_recv = true}); + false -> {Data, Rest} = split_binary( + list_to_binary(lists:reverse(Buf)), Length), + recvloop(Deb, handle_input(State#v1.callback, Data, + State#v1{buf = [Rest]})) + end. + +mainloop(Deb, State = #v1{parent = Parent, sock = Sock}) -> receive - {inet_async, Sock, Ref, {ok, Data}} -> - mainloop(Deb, handle_input(State#v1.callback, Data, - State#v1{recv_ref = none})); - {inet_async, Sock, Ref, {error, closed}} -> + {tcp, Sock, Data} -> + recvloop(Deb, State#v1{buf = [Data | State#v1.buf], + pending_recv = false}); + {tcp_closed, Sock} -> if State#v1.connection_state =:= closed -> State; true -> throw(connection_closed_abruptly) end; - {inet_async, Sock, Ref, {error, Reason}} -> - throw({inet_error, Reason}); {conserve_memory, Conserve} -> - mainloop(Deb, internal_conserve_memory(Conserve, State)); + recvloop(Deb, internal_conserve_memory(Conserve, State)); {'EXIT', Parent, Reason} -> terminate(io_lib:format("broker forced connection closure " "with reason '~w'", [Reason]), State), @@ -391,11 +404,9 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> switch_callback(State = #v1{connection_state = blocked, heartbeater = Heartbeater}, Callback, Length) -> ok = rabbit_heartbeat:pause_monitor(Heartbeater), - State#v1{callback = Callback, recv_length = Length, recv_ref = none}; + State#v1{callback = Callback, recv_length = Length}; switch_callback(State, Callback, Length) -> - Ref = inet_op(fun () -> rabbit_net:async_recv( - State#v1.sock, Length, infinity) end), - State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}. + State#v1{callback = Callback, recv_length = Length}. terminate(Explanation, State) when ?IS_RUNNING(State) -> {normal, send_exception(State, 0, @@ -409,12 +420,9 @@ internal_conserve_memory(true, State = #v1{connection_state = running}) -> internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> State#v1{connection_state = running}; internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater, - callback = Callback, - recv_length = Length, - recv_ref = none}) -> + heartbeater = Heartbeater}) -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), - switch_callback(State#v1{connection_state = running}, Callback, Length); + State#v1{connection_state = running}; internal_conserve_memory(_Conserve, State) -> State. |