summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-11-30 12:20:12 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2010-11-30 12:20:12 +0000
commit4e9f0dc61d494aded3e6600304215b40a3c8f32d (patch)
tree637582be639da3bcda98a6d64916bca25b1e632b
parent963d834477df9a4bedaca26b13430b6ee8ef2858 (diff)
downloadrabbitmq-server-4e9f0dc61d494aded3e6600304215b40a3c8f32d.tar.gz
experiment with uing {active,once} instead of prim_inet:async_recv
-rw-r--r--src/rabbit_net.erl10
-rw-r--r--src/rabbit_reader.erl52
2 files changed, 39 insertions, 23 deletions
diff --git a/src/rabbit_net.erl b/src/rabbit_net.erl
index 0940dce2..c9e3cc47 100644
--- a/src/rabbit_net.erl
+++ b/src/rabbit_net.erl
@@ -34,7 +34,7 @@
-export([async_recv/3, close/1, controlling_process/2,
getstat/2, peername/1, peercert/1, port_command/2,
- send/2, sockname/1, is_ssl/1]).
+ send/2, sockname/1, is_ssl/1, setopts/2]).
%%---------------------------------------------------------------------------
@@ -69,6 +69,9 @@
-spec(getstat/2 ::
(socket(), [stat_option()])
-> ok_val_or_error([{stat_option(), integer()}])).
+-spec(setopts/2 :: (socket(), [{atom(), any()} |
+ {raw, non_neg_integer(), non_neg_integer(),
+ binary()}]) -> ok_or_any_error()).
-endif.
@@ -137,3 +140,8 @@ sockname(Sock) when is_port(Sock) ->
is_ssl(Sock) ->
?IS_SSL(Sock).
+
+setopts(Sock, Options) when ?IS_SSL(Sock) ->
+ ssl:setopts(Sock#ssl_socket.ssl, Options);
+setopts(Sock, Options) when is_port(Sock) ->
+ inet:setopts(Sock, Options).
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4dd150a2..1d5b2021 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -54,9 +54,9 @@
%---------------------------------------------------------------------------
--record(v1, {parent, sock, connection, callback, recv_length, recv_ref,
+-record(v1, {parent, sock, connection, callback, recv_length, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
- channel_sup_sup_pid, start_heartbeat_fun}).
+ channel_sup_sup_pid, start_heartbeat_fun, buf}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -275,7 +275,7 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
erlang:send_after(?HANDSHAKE_TIMEOUT * 1000, self(),
handshake_timeout),
try
- mainloop(Deb, switch_callback(
+ recvloop(Deb, switch_callback(
#v1{parent = Parent,
sock = ClientSock,
connection = #connection{
@@ -287,14 +287,15 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
client_properties = none},
callback = uninitialized_callback,
recv_length = 0,
- recv_ref = none,
+ pending_recv = false,
connection_state = pre_init,
queue_collector = Collector,
heartbeater = none,
stats_timer =
rabbit_event:init_stats_timer(),
channel_sup_sup_pid = ChannelSupSupPid,
- start_heartbeat_fun = StartHeartbeatFun
+ start_heartbeat_fun = StartHeartbeatFun,
+ buf = [<<>>]
},
handshake, 8))
catch
@@ -317,21 +318,33 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
end,
done.
-mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
+recvloop(Deb, State = #v1{pending_recv = true}) ->
+ mainloop(Deb, State);
+recvloop(Deb, State = #v1{connection_state = blocked}) ->
+ mainloop(Deb, State);
+recvloop(Deb, State = #v1{sock = Sock, recv_length = Length, buf = Buf}) ->
+ case iolist_size(Buf) < Length of
+ true -> ok = rabbit_net:setopts(Sock, [{active, once}]),
+ mainloop(Deb, State#v1{pending_recv = true});
+ false -> {Data, Rest} = split_binary(
+ list_to_binary(lists:reverse(Buf)), Length),
+ recvloop(Deb, handle_input(State#v1.callback, Data,
+ State#v1{buf = [Rest]}))
+ end.
+
+mainloop(Deb, State = #v1{parent = Parent, sock = Sock}) ->
receive
- {inet_async, Sock, Ref, {ok, Data}} ->
- mainloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{recv_ref = none}));
- {inet_async, Sock, Ref, {error, closed}} ->
+ {tcp, Sock, Data} ->
+ recvloop(Deb, State#v1{buf = [Data | State#v1.buf],
+ pending_recv = false});
+ {tcp_closed, Sock} ->
if State#v1.connection_state =:= closed ->
State;
true ->
throw(connection_closed_abruptly)
end;
- {inet_async, Sock, Ref, {error, Reason}} ->
- throw({inet_error, Reason});
{conserve_memory, Conserve} ->
- mainloop(Deb, internal_conserve_memory(Conserve, State));
+ recvloop(Deb, internal_conserve_memory(Conserve, State));
{'EXIT', Parent, Reason} ->
terminate(io_lib:format("broker forced connection closure "
"with reason '~w'", [Reason]), State),
@@ -391,11 +404,9 @@ mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
switch_callback(State = #v1{connection_state = blocked,
heartbeater = Heartbeater}, Callback, Length) ->
ok = rabbit_heartbeat:pause_monitor(Heartbeater),
- State#v1{callback = Callback, recv_length = Length, recv_ref = none};
+ State#v1{callback = Callback, recv_length = Length};
switch_callback(State, Callback, Length) ->
- Ref = inet_op(fun () -> rabbit_net:async_recv(
- State#v1.sock, Length, infinity) end),
- State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}.
+ State#v1{callback = Callback, recv_length = Length}.
terminate(Explanation, State) when ?IS_RUNNING(State) ->
{normal, send_exception(State, 0,
@@ -409,12 +420,9 @@ internal_conserve_memory(true, State = #v1{connection_state = running}) ->
internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
State#v1{connection_state = running};
internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater,
- callback = Callback,
- recv_length = Length,
- recv_ref = none}) ->
+ heartbeater = Heartbeater}) ->
ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- switch_callback(State#v1{connection_state = running}, Callback, Length);
+ State#v1{connection_state = running};
internal_conserve_memory(_Conserve, State) ->
State.