summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-02 00:04:40 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-02 00:04:40 +0000
commit23cf3e5b3dcd8b2b7cb88031da688285cdbf7068 (patch)
treef918edc47fee2957ecc01fa7da691f63c2681d45
parent6a50574330fbf3f0c6377788f4042d398659eaff (diff)
downloadrabbitmq-server-23cf3e5b3dcd8b2b7cb88031da688285cdbf7068.tar.gz
further optimise frame reading
Handle complete frames in one go, if possible, rather than header and payload separately. This essentially halves the amount of binary splitting in the framing code. Note that we only do this when the buffer contains just one binary. Tests have shown that attempting to introduce this optimisation when the buffer comprises multiple binaries hurts performance for large messages. That's presumably because we end up constructing larger intermediate binaries.
-rw-r--r--src/rabbit_reader.erl46
1 files changed, 25 insertions, 21 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 23150040..68602543 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -284,17 +284,15 @@ recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen})
when BufLen < RecvLen ->
ok = rabbit_net:setopts(Sock, [{active, once}]),
mainloop(Deb, State#v1{pending_recv = true});
-recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = [B]}) ->
- {Data, Rest} = split_binary(B, RecvLen),
- recvloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{buf = [Rest],
- buf_len = BufLen - RecvLen}));
+recvloop(Deb, State = #v1{buf = [B]}) ->
+ {Rest, State1} = handle_input(State#v1.callback, B, State),
+ recvloop(Deb, State1#v1{buf = [Rest], buf_len = size(Rest)});
recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = Buf}) ->
{DataLRev, RestLRev} = binlist_split(RecvLen, BufLen, Buf, []),
Data = list_to_binary(lists:reverse(DataLRev)),
- recvloop(Deb, handle_input(State#v1.callback, Data,
- State#v1{buf = lists:reverse(RestLRev),
- buf_len = BufLen - RecvLen})).
+ {<<>>, State1} = handle_input(State#v1.callback, Data, State),
+ recvloop(Deb, State1#v1{buf = lists:reverse(RestLRev),
+ buf_len = BufLen - RecvLen}).
binlist_split(N, N, L, Acc) ->
{L, Acc};
@@ -714,32 +712,38 @@ post_process_frame(_Frame, _ChPid, State) ->
%% a few get it wrong - off-by 1 or 8 (empty frame size) are typical.
-define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE).
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>,
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>,
State = #v1{connection = #connection{frame_max = FrameMax}})
when FrameMax /= 0 andalso
PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE ->
fatal_frame_error(
{frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE},
Type, Channel, <<>>, State);
-handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- ensure_stats_timer(
- switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1));
-
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32,
+ Payload:PayloadSize/binary, ?FRAME_END,
+ Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(handle_frame(Type, Channel, Payload, State),
+ frame_header, 7))};
+handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>,
+ State) ->
+ {Rest, ensure_stats_timer(
+ switch_callback(State,
+ {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1))};
handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) ->
- <<Payload:PayloadSize/binary, EndMarker>> = Data,
+ <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data,
case EndMarker of
?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State),
- switch_callback(State1, frame_header, 7);
+ {Rest, switch_callback(State1, frame_header, 7)};
_ -> fatal_frame_error({invalid_frame_end_marker, EndMarker},
Type, Channel, Payload, State)
end;
-
-handle_input(handshake, <<"AMQP", A, B, C, D>>, State) ->
- handshake({A, B, C, D}, State);
-handle_input(handshake, Other, #v1{sock = Sock}) ->
+handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) ->
+ {Rest, handshake({A, B, C, D}, State)};
+handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) ->
refuse_connection(Sock, {bad_header, Other});
-
handle_input(Callback, Data, _State) ->
throw({bad_input, Callback, Data}).