diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-03 11:12:10 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-03 11:12:10 +0000 |
commit | c6ae0855ab1303e6d07c9528242f92391182277e (patch) | |
tree | 6744530496866e679e60e175bd7d198f701df118 | |
parent | 70edb03114b5ad926154fe2b5c2714c4f048c36b (diff) | |
download | rabbitmq-server-c6ae0855ab1303e6d07c9528242f92391182277e.tar.gz |
lift buffer vars from state
This improves performance by reducing state update and also makes it
clear that these vars are only updated in the two loop functions.
-rw-r--r-- | src/rabbit_reader.erl | 68 |
1 files changed, 32 insertions, 36 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 71c13af2..bbb0b185 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -23,7 +23,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/2, mainloop/2, recvloop/2]). +-export([init/2, mainloop/4, recvloop/4]). -export([conserve_resources/3, server_properties/1]). @@ -37,7 +37,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, helper_sup, queue_collector, heartbeater, - stats_timer, channel_sup_sup_pid, buf, buf_len, throttle}). + stats_timer, channel_sup_sup_pid, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, vhost, @@ -91,9 +91,10 @@ rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). --spec(mainloop/2 :: (_,#v1{}) -> any()). +-spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). --spec(system_continue/3 :: (_,_,#v1{}) -> any()). +-spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) -> + any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). -endif. @@ -113,8 +114,8 @@ init(Parent, HelperSup) -> start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. -system_continue(Parent, Deb, State) -> - mainloop(Deb, State#v1{parent = Parent}). +system_continue(Parent, Deb, {Buf, BufLen, State}) -> + mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -238,8 +239,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> helper_sup = HelperSup, heartbeater = none, channel_sup_sup_pid = none, - buf = [], - buf_len = 0, throttle = #throttle{ alarmed_by = [], last_blocked_by = none, @@ -247,9 +246,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> blocked_sent = false}}, try run({?MODULE, recvloop, - [Deb, switch_callback(rabbit_event:init_stats_timer( - State, #v1.stats_timer), - handshake, 8)]}), + [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)]}), log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of @@ -276,25 +275,24 @@ run({M, F, A}) -> catch {become, MFA} -> run(MFA) end. -recvloop(Deb, State = #v1{pending_recv = true}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{connection_state = blocked}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{connection_state = {become, F}}) -> - throw({become, F(Deb, State)}); -recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) +recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> + throw({become, F(Deb, Buf, BufLen, State)}); +recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> ok = rabbit_net:setopts(Sock, [{active, once}]), - mainloop(Deb, State#v1{pending_recv = true}); -recvloop(Deb, State = #v1{buf = [B]}) -> + mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}); +recvloop(Deb, [B], _BufLen, State) -> {Rest, State1} = handle_input(State#v1.callback, B, State), - recvloop(Deb, State1#v1{buf = [Rest], buf_len = size(Rest)}); -recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = Buf}) -> + recvloop(Deb, [Rest], size(Rest), State1); +recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) -> {DataLRev, RestLRev} = binlist_split(RecvLen, BufLen, Buf, []), Data = list_to_binary(lists:reverse(DataLRev)), {<<>>, State1} = handle_input(State#v1.callback, Data, State), - recvloop(Deb, State1#v1{buf = lists:reverse(RestLRev), - buf_len = BufLen - RecvLen}). + recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1). binlist_split(N, N, L, Acc) -> {L, Acc}; @@ -304,12 +302,11 @@ binlist_split(N, Len, L, [Acc0|Acc]) when Len < N -> binlist_split(N, Len, [H|T], Acc) -> binlist_split(N, Len - size(H), T, [H|Acc]). -mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> +mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> case rabbit_net:recv(Sock) of {data, Data} -> - recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); + recvloop(Deb, [Data | Buf], BufLen + size(Data), + State#v1{pending_recv = false}); closed when State#v1.connection_state =:= closed -> ok; closed -> @@ -320,11 +317,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> throw({inet_error, Reason}); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, - ?MODULE, Deb, State); + ?MODULE, Deb, {Buf, BufLen, State}); {other, Other} -> case handle_other(Other, State) of stop -> ok; - NewState -> recvloop(Deb, NewState) + NewState -> recvloop(Deb, Buf, BufLen, NewState) end end. @@ -1120,18 +1117,17 @@ become_1_0(Id, State = #v1{sock = Sock}) -> Sock, {unsupported_amqp1_0_protocol_id, Id}, {3, 1, 0, 0}) end, - F = fun (_Deb, S) -> + F = fun (_Deb, Buf, BufLen, S) -> {rabbit_amqp1_0_reader, init, - [Mode, pack_for_1_0(S)]} + [Mode, pack_for_1_0(Buf, BufLen, S)]} end, State = #v1{connection_state = {become, F}} end. -pack_for_1_0(#v1{parent = Parent, +pack_for_1_0(Buf, BufLen, + #v1{parent = Parent, sock = Sock, recv_len = RecvLen, pending_recv = PendingRecv, - helper_sup = SupPid, - buf = Buf, - buf_len = BufLen}) -> + helper_sup = SupPid}) -> {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}. |