summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-06 16:36:05 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-06 16:36:05 +0000
commit74af36bb78adc729e06d09f8e03bf27065c212fb (patch)
tree421fbb1ca426f7b9a02f34d030cc56eccf18a493
parent81b96105117379db9030472c1cb2789eed22cf71 (diff)
parent3ccc2f8eccf1add19bcc2de4917bbb879d4e1c28 (diff)
downloadrabbitmq-server-74af36bb78adc729e06d09f8e03bf27065c212fb.tar.gz
Merge bug25939
-rw-r--r--src/rabbit_reader.erl64
1 files changed, 41 insertions, 23 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index f07ce419..b0960c0f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -282,18 +282,29 @@ recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{connection_state = blocked}) ->
mainloop(Deb, State);
+recvloop(Deb, State = #v1{connection_state = {become, F}}) ->
+ throw({become, F(Deb, State)});
recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
when BufLen < RecvLen ->
ok = rabbit_net:setopts(Sock, [{active, once}]),
mainloop(Deb, State#v1{pending_recv = true});
-recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
- {Data, Rest} = split_binary(case Buf of
- [B] -> B;
- _ -> list_to_binary(lists:reverse(Buf))
- end, RecvLen),
- recvloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{buf = [Rest],
- buf_len = BufLen - RecvLen})).
+recvloop(Deb, State = #v1{buf = [B]}) ->
+ {Rest, State1} = handle_input(State#v1.callback, B, State),
+ recvloop(Deb, State1#v1{buf = [Rest], buf_len = size(Rest)});
+recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = Buf}) ->
+ {DataLRev, RestLRev} = binlist_split(BufLen - RecvLen, Buf, []),
+ Data = list_to_binary(lists:reverse(DataLRev)),
+ {<<>>, State1} = handle_input(State#v1.callback, Data, State),
+ recvloop(Deb, State1#v1{buf = lists:reverse(RestLRev),
+ buf_len = BufLen - RecvLen}).
+
+binlist_split(0, L, Acc) ->
+ {L, Acc};
+binlist_split(Len, L, [Acc0|Acc]) when Len < 0 ->
+ {H, T} = split_binary(Acc0, -Len),
+ {[H|L], [T|Acc]};
+binlist_split(Len, [H|T], Acc) ->
+ binlist_split(Len - size(H), T, [H|Acc]).
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
case rabbit_net:recv(Sock) of
@@ -715,32 +726,36 @@ post_process_frame(_Frame, _ChPid, State) ->
%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
State = #v1{connection = #connection{frame_max = FrameMax}})
when FrameMax /= 0 andalso
PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
fatal_frame_error(
{frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
Type, Channel, <<>>, State);
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- ensure_stats_timer(
- switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1));
-
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
+ Payload:PayloadSize/binary, ?FRAME_END,
+ Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))};
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(State,
+ {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1))};
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
- <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
- switch_callback(State1, frame_header, 7);
+ {Rest, switch_callback(State1, frame_header, 7)};
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
Type, Channel, Payload, State)
end;
-
-handle_input(handshake, <<"AMQP", A, B, C, D>>, State) ->
- handshake({A, B, C, D}, State);
-handle_input(handshake, Other, #v1{sock = Sock}) ->
+handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
+ {Rest, handshake({A, B, C, D}, State)};
+handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
-
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
@@ -1128,8 +1143,11 @@ become_1_0(Id, State = #v1{sock = Sock}) ->
Sock, {unsupported_amqp1_0_protocol_id, Id},
{3, 1, 0, 0})
end,
- throw({become, {rabbit_amqp1_0_reader, init,
- [Mode, pack_for_1_0(State)]}})
+ F = fun (_Deb, S) ->
+ {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(S)]}
+ end,
+ State = #v1{connection_state = {become, F}}
end.
pack_for_1_0(#v1{parent = Parent,