diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 10:23:41 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-07 10:23:41 +0000 |
commit | cf02ec9cc1dd98d619f4dfd31cf233a4b42cb564 (patch) | |
tree | bb013e7e52353e44b1e0a3df26d742c33531530e | |
parent | 74af36bb78adc729e06d09f8e03bf27065c212fb (diff) | |
parent | fa4fa757075a7ca533f700f5ab6157423706de44 (diff) | |
download | rabbitmq-server-cf02ec9cc1dd98d619f4dfd31cf233a4b42cb564.tar.gz |
Merge bug 25943
-rw-r--r-- | src/rabbit_reader.erl | 74 |
1 files changed, 34 insertions, 40 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index b0960c0f..d9879f1b 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -23,7 +23,7 @@ -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/2, mainloop/2, recvloop/2]). +-export([init/2, mainloop/4, recvloop/4]). -export([conserve_resources/3, server_properties/1]). @@ -38,8 +38,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, helper_sup, queue_collector, heartbeater, - stats_timer, channel_sup_sup_pid, buf, buf_len, channel_count, - throttle}). + stats_timer, channel_sup_sup_pid, channel_count, throttle}). -record(connection, {name, host, peer_host, port, peer_port, protocol, user, timeout_sec, frame_max, channel_max, vhost, @@ -92,9 +91,10 @@ rabbit_types:ok_or_error2( rabbit_net:socket(), any()))) -> no_return()). --spec(mainloop/2 :: (_,#v1{}) -> any()). +-spec(mainloop/4 :: (_,[binary()], non_neg_integer(), #v1{}) -> any()). -spec(system_code_change/4 :: (_,_,_,_) -> {'ok',_}). --spec(system_continue/3 :: (_,_,#v1{}) -> any()). +-spec(system_continue/3 :: (_,_,{[binary()], non_neg_integer(), #v1{}}) -> + any()). -spec(system_terminate/4 :: (_,_,_,_) -> none()). -endif. @@ -114,8 +114,8 @@ init(Parent, HelperSup) -> start_connection(Parent, HelperSup, Deb, Sock, SockTransform) end. -system_continue(Parent, Deb, State) -> - mainloop(Deb, State#v1{parent = Parent}). +system_continue(Parent, Deb, {Buf, BufLen, State}) -> + mainloop(Deb, Buf, BufLen, State#v1{parent = Parent}). system_terminate(Reason, _Parent, _Deb, _State) -> exit(Reason). @@ -239,8 +239,6 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> helper_sup = HelperSup, heartbeater = none, channel_sup_sup_pid = none, - buf = [], - buf_len = 0, channel_count = 0, throttle = #throttle{ alarmed_by = [], @@ -249,9 +247,9 @@ start_connection(Parent, HelperSup, Deb, Sock, SockTransform) -> blocked_sent = false}}, try run({?MODULE, recvloop, - [Deb, switch_callback(rabbit_event:init_stats_timer( - State, #v1.stats_timer), - handshake, 8)]}), + [Deb, [], 0, switch_callback(rabbit_event:init_stats_timer( + State, #v1.stats_timer), + handshake, 8)]}), log(info, "closing AMQP connection ~p (~s)~n", [self(), Name]) catch Ex -> log(case Ex of @@ -278,25 +276,24 @@ run({M, F, A}) -> catch {become, MFA} -> run(MFA) end. -recvloop(Deb, State = #v1{pending_recv = true}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{connection_state = blocked}) -> - mainloop(Deb, State); -recvloop(Deb, State = #v1{connection_state = {become, F}}) -> - throw({become, F(Deb, State)}); -recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) +recvloop(Deb, Buf, BufLen, State = #v1{pending_recv = true}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = blocked}) -> + mainloop(Deb, Buf, BufLen, State); +recvloop(Deb, Buf, BufLen, State = #v1{connection_state = {become, F}}) -> + throw({become, F(Deb, Buf, BufLen, State)}); +recvloop(Deb, Buf, BufLen, State = #v1{sock = Sock, recv_len = RecvLen}) when BufLen < RecvLen -> ok = rabbit_net:setopts(Sock, [{active, once}]), - mainloop(Deb, State#v1{pending_recv = true}); -recvloop(Deb, State = #v1{buf = [B]}) -> + mainloop(Deb, Buf, BufLen, State#v1{pending_recv = true}); +recvloop(Deb, [B], _BufLen, State) -> {Rest, State1} = handle_input(State#v1.callback, B, State), - recvloop(Deb, State1#v1{buf = [Rest], buf_len = size(Rest)}); -recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = Buf}) -> + recvloop(Deb, [Rest], size(Rest), State1); +recvloop(Deb, Buf, BufLen, State = #v1{recv_len = RecvLen}) -> {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []), Data = list_to_binary(lists:reverse(DataLRev)), {<<>>, State1} = handle_input(State#v1.callback, Data, State), - recvloop(Deb, State1#v1{buf = lists:reverse(RestLRev), - buf_len = BufLen - RecvLen}). + recvloop(Deb, lists:reverse(RestLRev), BufLen - RecvLen, State1). binlist_split(0, L, Acc) -> {L, Acc}; @@ -306,12 +303,11 @@ binlist_split(Len, L, [Acc0|Acc]) when Len < 0 -> binlist_split(Len, [H|T], Acc) -> binlist_split(Len - size(H), T, [H|Acc]). -mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> +mainloop(Deb, Buf, BufLen, State = #v1{sock = Sock}) -> case rabbit_net:recv(Sock) of {data, Data} -> - recvloop(Deb, State#v1{buf = [Data | Buf], - buf_len = BufLen + size(Data), - pending_recv = false}); + recvloop(Deb, [Data | Buf], BufLen + size(Data), + State#v1{pending_recv = false}); closed when State#v1.connection_state =:= closed -> ok; closed -> @@ -322,11 +318,11 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> throw({inet_error, Reason}); {other, {system, From, Request}} -> sys:handle_system_msg(Request, From, State#v1.parent, - ?MODULE, Deb, State); + ?MODULE, Deb, {Buf, BufLen, State}); {other, Other} -> case handle_other(Other, State) of stop -> ok; - NewState -> recvloop(Deb, NewState) + NewState -> recvloop(Deb, Buf, BufLen, NewState) end end. @@ -1143,18 +1139,16 @@ become_1_0(Id, State = #v1{sock = Sock}) -> Sock, {unsupported_amqp1_0_protocol_id, Id}, {3, 1, 0, 0}) end, - F = fun (_Deb, S) -> + F = fun (_Deb, Buf, BufLen, S) -> {rabbit_amqp1_0_reader, init, - [Mode, pack_for_1_0(S)]} + [Mode, pack_for_1_0(Buf, BufLen, S)]} end, State = #v1{connection_state = {become, F}} end. -pack_for_1_0(#v1{parent = Parent, - sock = Sock, - recv_len = RecvLen, - pending_recv = PendingRecv, - helper_sup = SupPid, - buf = Buf, - buf_len = BufLen}) -> +pack_for_1_0(Buf, BufLen, #v1{parent = Parent, + sock = Sock, + recv_len = RecvLen, + pending_recv = PendingRecv, + helper_sup = SupPid}) -> {Parent, Sock, RecvLen, PendingRecv, SupPid, Buf, BufLen}. |