summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-03 11:12:10 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-03 11:12:10 +0000
commitc6ae0855ab1303e6d07c9528242f92391182277e (patch)
tree6744530496866e679e60e175bd7d198f701df118
parent70edb03114b5ad926154fe2b5c2714c4f048c36b (diff)
downloadrabbitmq-server-c6ae0855ab1303e6d07c9528242f92391182277e.tar.gz
lift buffer vars from state
This improves performance by reducing state update and also makes it clear that these vars are only updated in the two loop functions.
-rw-r--r--src/rabbit_reader.erl68
1 files changed, 32 insertions, 36 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 71c13af2..bbb0b185 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -23,7 +23,7 @@
-export([system_continue/3, system_terminate/4, system_code_change/4]).
--export([init/2, mainloop/2, recvloop/2]).
+-export([init/2, mainloop/4, recvloop/4]).
-export([conserve_resources/3, server_properties/1]).
@@ -37,7 +37,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, helper_sup, queue_collector, heartbeater,
- stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}).
+ stats_timer, channel_sup_sup_pid, throttle}).
-record(connection, {name, host, peer_host, port, peer_port,
protocol, user, timeout_sec, frame_max, vhost,
@@ -91,9 +91,10 @@
rabbit_types:ok_or_error2(
rabbit_net:socket(), any()))) -> no_return()).
--spec(mainloop/2 :: (_,#v1{}) -> any()).
+-spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()).
-spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}).
--spec(system_continue/3 :: (_,_,#v1{}) -> any()).
+-spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) ->
+ any()).
-spec(system_terminate/4 :: (_,_,_,_) -> none()).
-endif.
@@ -113,8 +114,8 @@ init(Parent, HelperSup) ->
start_connection(Parent, HelperSup, Deb, Sock, SockTransform)
end.
-system_continue(Parent, Deb, State) ->
- mainloop(Deb, State#v1{parent = Parent}).
+system_continue(Parent, Deb, {Buf, BufLen, State}) ->
+ mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}).
system_terminate(Reason, _Parent, _Deb, _State) ->
exit(Reason).
@@ -238,8 +239,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
helper_sup = HelperSup,
heartbeater = none,
channel_sup_sup_pid = none,
- buf = [],
- buf_len = 0,
throttle = #throttle{
alarmed_by = [],
last_blocked_by = none,
@@ -247,9 +246,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) ->
blocked_sent = false}},
try
run({?MODULE, recvloop,
- [Deb, switch_callback(rabbit_event:init_stats_timer(
- State, #v1.stats_timer),
- handshake, 8)]}),
+ [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer(
+ State, #v1.stats_timer),
+ handshake, 8)]}),
log(info, "closing AMQP connection ~p (~s)~n", [self(), Name])
catch
Ex -> log(case Ex of
@@ -276,25 +275,24 @@ run({M, F, A}) ->
catch {become, MFA} -> run(MFA)
end.
-recvloop(Deb, State = #v1{pending_recv = true}) ->
- mainloop(Deb, State);
-recvloop(Deb, State = #v1{connection_state = blocked}) ->
- mainloop(Deb, State);
-recvloop(Deb, State = #v1{connection_state = {become, F}}) ->
- throw({become, F(Deb, State)});
-recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
+recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) ->
+ mainloop(Deb, Buf, BufLen, State);
+recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) ->
+ throw({become, F(Deb, Buf, BufLen, State)});
+recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen})
when BufLen < RecvLen ->
ok = rabbit_net:setopts(Sock, [{active, once}]),
- mainloop(Deb, State#v1{pending_recv = true});
-recvloop(Deb, State = #v1{buf = [B]}) ->
+ mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true});
+recvloop(Deb, [B], _BufLen, State) ->
{Rest, State1} = handle_input(State#v1.callback, B, State),
- recvloop(Deb, State1#v1{buf = [Rest], buf_len = size(Rest)});
-recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = Buf}) ->
+ recvloop(Deb, [Rest], size(Rest), State1);
+recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) ->
{DataLRev, RestLRev} = binlist_split(RecvLen, BufLen, Buf, []),
Data = list_to_binary(lists:reverse(DataLRev)),
{<<>>, State1} = handle_input(State#v1.callback, Data, State),
- recvloop(Deb, State1#v1{buf = lists:reverse(RestLRev),
- buf_len = BufLen - RecvLen}).
+ recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1).
binlist_split(N, N, L, Acc) ->
{L, Acc};
@@ -304,12 +302,11 @@ binlist_split(N, Len, L, [Acc0|Acc]) when Len < N ->
binlist_split(N, Len, [H|T], Acc) ->
binlist_split(N, Len - size(H), T, [H|Acc]).
-mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
+mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) ->
case rabbit_net:recv(Sock) of
{data, Data} ->
- recvloop(Deb, State#v1{buf = [Data | Buf],
- buf_len = BufLen + size(Data),
- pending_recv = false});
+ recvloop(Deb, [Data | Buf], BufLen + size(Data),
+ State#v1{pending_recv = false});
closed when State#v1.connection_state =:= closed ->
ok;
closed ->
@@ -320,11 +317,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
throw({inet_error, Reason});
{other, {system, From, Request}} ->
sys:handle_system_msg(Request, From, State#v1.parent,
- ?MODULE, Deb, State);
+ ?MODULE, Deb, {Buf, BufLen, State});
{other, Other} ->
case handle_other(Other, State) of
stop -> ok;
- NewState -> recvloop(Deb, NewState)
+ NewState -> recvloop(Deb, Buf, BufLen, NewState)
end
end.
@@ -1120,18 +1117,17 @@ become_1_0(Id, State = #v1{sock = Sock}) ->
Sock, {unsupported_amqp1_0_protocol_id, Id},
{3, 1, 0, 0})
end,
- F = fun (_Deb, S) ->
+ F = fun (_Deb, Buf, BufLen, S) ->
{rabbit_amqp1_0_reader, init,
- [Mode, pack_for_1_0(S)]}
+ [Mode, pack_for_1_0(Buf, BufLen, S)]}
end,
State = #v1{connection_state = {become, F}}
end.
-pack_for_1_0(#v1{parent = Parent,
+pack_for_1_0(Buf, BufLen,
+ #v1{parent = Parent,
sock = Sock,
recv_len = RecvLen,
pending_recv = PendingRecv,
- helper_sup = SupPid,
- buf = Buf,
- buf_len = BufLen}) ->
+ helper_sup = SupPid}) ->
{Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}.