diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-02 00:04:40 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-02 00:04:40 +0000 |
commit | 23cf3e5b3dcd8b2b7cb88031da688285cdbf7068 (patch) | |
tree | f918edc47fee2957ecc01fa7da691f63c2681d45 | |
parent | 6a50574330fbf3f0c6377788f4042d398659eaff (diff) | |
download | rabbitmq-server-23cf3e5b3dcd8b2b7cb88031da688285cdbf7068.tar.gz |
further optimise frame reading
Handle complete frames in one go, if possible, rather than header and
payload separately. This essentially halves the amount of binary
splitting in the framing code.
Note that we only do this when the buffer contains just one
binary. Tests have shown that attempting to introduce this
optimisation when the buffer comprises multiple binaries hurts
performance for large messages. That's presumably because we end up
constructing larger intermediate binaries.
-rw-r--r-- | src/rabbit_reader.erl | 46 |
1 files changed, 25 insertions, 21 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 23150040..68602543 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -284,17 +284,15 @@ recvloop(Deb, State = #v1{sock = Sock, recv_len = RecvLen, buf_len = BufLen}) when BufLen < RecvLen -> ok = rabbit_net:setopts(Sock, [{active, once}]), mainloop(Deb, State#v1{pending_recv = true}); -recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = [B]}) -> - {Data, Rest} = split_binary(B, RecvLen), - recvloop(Deb, handle_input(State#v1.callback, Data, - State#v1{buf = [Rest], - buf_len = BufLen - RecvLen})); +recvloop(Deb, State = #v1{buf = [B]}) -> + {Rest, State1} = handle_input(State#v1.callback, B, State), + recvloop(Deb, State1#v1{buf = [Rest], buf_len = size(Rest)}); recvloop(Deb, State = #v1{recv_len = RecvLen, buf_len = BufLen, buf = Buf}) -> {DataLRev, RestLRev} = binlist_split(RecvLen, BufLen, Buf, []), Data = list_to_binary(lists:reverse(DataLRev)), - recvloop(Deb, handle_input(State#v1.callback, Data, - State#v1{buf = lists:reverse(RestLRev), - buf_len = BufLen - RecvLen})). + {<<>>, State1} = handle_input(State#v1.callback, Data, State), + recvloop(Deb, State1#v1{buf = lists:reverse(RestLRev), + buf_len = BufLen - RecvLen}). binlist_split(N, N, L, Acc) -> {L, Acc}; @@ -714,32 +712,38 @@ post_process_frame(_Frame, _ChPid, State) -> %% a few get it wrong - off-by 1 or 8 (empty frame size) are typical. -define(FRAME_SIZE_FUDGE, ?EMPTY_FRAME_SIZE). -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, _/binary>>, State = #v1{connection = #connection{frame_max = FrameMax}}) when FrameMax /= 0 andalso PayloadSize > FrameMax - ?EMPTY_FRAME_SIZE + ?FRAME_SIZE_FUDGE -> fatal_frame_error( {frame_too_large, PayloadSize, FrameMax - ?EMPTY_FRAME_SIZE}, Type, Channel, <<>>, State); -handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> - ensure_stats_timer( - switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, - PayloadSize + 1)); - +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, + Payload:PayloadSize/binary, ?FRAME_END, + Rest/binary>>, + State) -> + {Rest, ensure_stats_timer( + switch_callback(handle_frame(Type, Channel, Payload, State), + frame_header, 7))}; +handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32, Rest/binary>>, + State) -> + {Rest, ensure_stats_timer( + switch_callback(State, + {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1))}; handle_input({frame_payload, Type, Channel, PayloadSize}, Data, State) -> - <<Payload:PayloadSize/binary, EndMarker>> = Data, + <<Payload:PayloadSize/binary, EndMarker, Rest/binary>> = Data, case EndMarker of ?FRAME_END -> State1 = handle_frame(Type, Channel, Payload, State), - switch_callback(State1, frame_header, 7); + {Rest, switch_callback(State1, frame_header, 7)}; _ -> fatal_frame_error({invalid_frame_end_marker, EndMarker}, Type, Channel, Payload, State) end; - -handle_input(handshake, <<"AMQP", A, B, C, D>>, State) -> - handshake({A, B, C, D}, State); -handle_input(handshake, Other, #v1{sock = Sock}) -> +handle_input(handshake, <<"AMQP", A, B, C, D, Rest/binary>>, State) -> + {Rest, handshake({A, B, C, D}, State)}; +handle_input(handshake, <<Other:8/binary, _/binary>>, #v1{sock = Sock}) -> refuse_connection(Sock, {bad_header, Other}); - handle_input(Callback, Data, _State) -> throw({bad_input, Callback, Data}). |