summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-03 11:02:41 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-03 11:02:41 +0000
commit70edb03114b5ad926154fe2b5c2714c4f048c36b (patch)
tree06f28ab40d8e602415e6ea885f89d1c75e82573f
parent58bba5b4d071ca56e896754180f5f18957c58ad0 (diff)
parentdc5b87dfa71e9f2cace9c2dd1c120d51f35518d2 (diff)
downloadrabbitmq-server-70edb03114b5ad926154fe2b5c2714c4f048c36b.tar.gz
merge bug25939 into bug25943
-rw-r--r--src/rabbit_reader.erl64
1 files changed, 41 insertions, 23 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 32b52e6e..71c13af2 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -280,18 +280,29 @@ recvloop(Deb, State = #v1{pending_recv = true}) ->
mainloop(Deb, State);
recvloop(Deb, State = #v1{connection_state = blocked}) ->
mainloop(Deb, State);
+recvloop(Deb, State = #v1{connection_state = {become, F}}) ->
+ throw({become, F(Deb, State)});
recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
when BufLen < RecvLen ->
ok = rabbit_net:setopts(Sock, [{active, once}]),
mainloop(Deb, State#v1{pending_recv = true});
-recvloop(Deb, State = #v1{recv_len = RecvLen, buf = Buf, buf_len = BufLen}) ->
- {Data, Rest} = split_binary(case Buf of
- [B] -> B;
- _ -> list_to_binary(lists:reverse(Buf))
- end, RecvLen),
- recvloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{buf = [Rest],
- buf_len = BufLen - RecvLen})).
+recvloop(Deb, State = #v1{buf = [B]}) ->
+ {Rest, State1} = handle_input(State#v1.callback, B, State),
+ recvloop(Deb, State1#v1{buf = [Rest], buf_len = size(Rest)});
+recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = Buf}) ->
+ {DataLRev, RestLRev} = binlist_split(RecvLen, BufLen, Buf, []),
+ Data = list_to_binary(lists:reverse(DataLRev)),
+ {<<>>, State1} = handle_input(State#v1.callback, Data, State),
+ recvloop(Deb, State1#v1{buf = lists:reverse(RestLRev),
+ buf_len = BufLen - RecvLen}).
+
+binlist_split(N, N, L, Acc) ->
+ {L, Acc};
+binlist_split(N, Len, L, [Acc0|Acc]) when Len < N ->
+ {H, T} = split_binary(Acc0, N - Len),
+ {[H|L], [T|Acc]};
+binlist_split(N, Len, [H|T], Acc) ->
+ binlist_split(N, Len - size(H), T, [H|Acc]).
mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
case rabbit_net:recv(Sock) of
@@ -703,32 +714,36 @@ post_process_frame(_Frame, _ChPid, State) ->
%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
State = #v1{connection = #connection{frame_max = FrameMax}})
when FrameMax /= 0 andalso
PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
fatal_frame_error(
{frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
Type, Channel, <<>>, State);
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- ensure_stats_timer(
- switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1));
-
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
+ Payload:PayloadSize/binary, ?FRAME_END,
+ Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(handle_frame(Type, Channel, Payload, State))};
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(State,
+ {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1))};
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
- <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
- switch_callback(State1, frame_header, 7);
+ {Rest, switch_callback(State1, frame_header, 7)};
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
Type, Channel, Payload, State)
end;
-
-handle_input(handshake, <<"AMQP", A, B, C, D>>, State) ->
- handshake({A, B, C, D}, State);
-handle_input(handshake, Other, #v1{sock = Sock}) ->
+handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
+ {Rest, handshake({A, B, C, D}, State)};
+handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
-
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).
@@ -1105,8 +1120,11 @@ become_1_0(Id, State = #v1{sock = Sock}) ->
Sock, {unsupported_amqp1_0_protocol_id, Id},
{3, 1, 0, 0})
end,
- throw({become, {rabbit_amqp1_0_reader, init,
- [Mode, pack_for_1_0(State)]}})
+ F = fun (_Deb, S) ->
+ {rabbit_amqp1_0_reader, init,
+ [Mode, pack_for_1_0(S)]}
+ end,
+ State = #v1{connection_state = {become, F}}
end.
pack_for_1_0(#v1{parent = Parent,