diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-03 11:02:41 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-03 11:02:41 +0000 |
commit | 70edb03114b5ad926154fe2b5c2714c4f048c36b (patch) | |
tree | 06f28ab40d8e602415e6ea885f89d1c75e82573f | |
parent | 58bba5b4d071ca56e896754180f5f18957c58ad0 (diff) | |
parent | dc5b87dfa71e9f2cace9c2dd1c120d51f35518d2 (diff) | |
download | rabbitmq-server-70edb03114b5ad926154fe2b5c2714c4f048c36b.tar.gz |
merge bug25939 into bug25943
-rw-r--r-- | src/rabbit_reader.erl | 64 |
1 files changed, 41 insertions, 23 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 32b52e6e..71c13af2 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -280,18 +280,29 @@ recvloop(Deb, State = #v1{pending_recv = true}) -> mainloop(Deb, State); recvloop(Deb, State = #v1{connection_state = blocked}) -> mainloop(Deb, State); +recvloop(Deb, State = #v1{connection_state = {become, F}}) -> + throw({become, F(Deb, State)}); recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) when BufLen < RecvLen -> ok = rabbit_net:setopts(Sock, [{active, once}]), mainloop(Deb, State#v1{pending_recv = true}); -recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) -> - {Data, Rest} = split_binary(case Buf of - [B] -> B; - _ -> list_to_binary(lists:reverse(Buf)) - end, RecvLen), - recvloop(Deb, handle_input(State#v1.callback, Data, - State#v1{buf = [Rest], - buf_len = BufLen - RecvLen})). +recvloop(Deb, State = #v1{buf = [B]}) -> + {Rest, State1} = handle_input(State#v1.callback, B, State), + recvloop(Deb, State1#v1{buf = [Rest], buf_len = size(Rest)}); +recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = Buf}) -> + {DataLRev, RestLRev} = binlist_split(RecvLen, BufLen, Buf, []), + Data = list_to_binary(lists:reverse(DataLRev)), + {<<>>, State1} = handle_input(State#v1.callback, Data, State), + recvloop(Deb, State1#v1{buf = lists:reverse(RestLRev), + buf_len = BufLen - RecvLen}). + +binlist_split(N, N, L, Acc) -> + {L, Acc}; +binlist_split(N, Len, L, [Acc0|Acc]) when Len < N -> + {H, T} = split_binary(Acc0, N - Len), + {[H|L], [T|Acc]}; +binlist_split(N, Len, [H|T], Acc) -> + binlist_split(N, Len - size(H), T, [H|Acc]). mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> case rabbit_net:recv(Sock) of @@ -703,32 +714,36 @@ post_process_frame(_Frame, _ChPid, State) -> %% a few get it wrong - off-by 1 or 8 (empty frame size) are typical. -define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE). -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>, State = #v1{connection = #connection{frame_max = FrameMax}}) when FrameMax /= 0 andalso PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> fatal_frame_error( {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, Type, Channel, <<>>, State); -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> - ensure_stats_timer( - switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, - PayloadSize + 1)); - +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, + Payload:PayloadSize/binary, ?FRAME_END, + Rest/binary>>, + State) -> + {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))}; +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>, + State) -> + {Rest, ensure_stats_timer( + switch_callback(State, + {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1))}; handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> - <<Payload:PayloadSize/binary, EndMarker>> = Data, + <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data, case EndMarker of ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), - switch_callback(State1, frame_header, 7); + {Rest, switch_callback(State1, frame_header, 7)}; _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, Type, Channel, Payload, State) end; - -handle_input(handshake, <<"AMQP", A, B, C, D>>, State) -> - handshake({A, B, C, D}, State); -handle_input(handshake, Other, #v1{sock = Sock}) -> +handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) -> + {Rest, handshake({A, B, C, D}, State)}; +handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_header, Other}); - handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). @@ -1105,8 +1120,11 @@ become_1_0(Id, State = #v1{sock = Sock}) -> Sock, {unsupported_amqp1_0_protocol_id, Id}, {3, 1, 0, 0}) end, - throw({become, {rabbit_amqp1_0_reader, init, - [Mode, pack_for_1_0(State)]}}) + F = fun (_Deb, S) -> + {rabbit_amqp1_0_reader, init, + [Mode, pack_for_1_0(S)]} + end, + State = #v1{connection_state = {become, F}} end. pack_for_1_0(#v1{parent = Parent, |