diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-17 23:17:24 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-11-17 23:17:24 +0000 |
commit | 4b82f5cf92c92a585eb820b2f432ca38cb414ce9 (patch) | |
tree | 5270d95beb6e370920d5dcee6222857ca99c9fa3 | |
parent | 908e2ec1c57d6af9c0db409048e8cde96f0037be (diff) | |
download | rabbitmq-server-4b82f5cf92c92a585eb820b2f432ca38cb414ce9.tar.gz |
minor optimisation: ask for next piece of data before processing
This reduces the likelihood of the reader process stalling,
i.e. ending up in 'receive' when there is nothing in the mailbox.
-rw-r--r-- | src/rabbit_reader.erl | 24 |
1 files changed, 12 insertions, 12 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index e68fbca8..d93a23fe 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -320,10 +320,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) -> receive {inet_async, Sock, Ref, {ok, Data}} -> - {State1, Callback1, Length1} = - handle_input(State#v1.callback, Data, - State#v1{recv_ref = none}), - mainloop(Deb, switch_callback(State1, Callback1, Length1)); + mainloop(Deb, handle_input(State#v1.callback, Data, + State#v1{recv_ref = none})); {inet_async, Sock, Ref, {error, closed}} -> if State#v1.connection_state =:= closed -> State; @@ -626,14 +624,16 @@ analyze_frame(_Type, _Body, _Protocol) -> error. handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) -> - {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize}, - PayloadSize + 1}; + ensure_stats_timer( + switch_callback(State, {frame_payload, Type, Channel, PayloadSize}, + PayloadSize + 1}; handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> - {handle_frame(Type, Channel, Payload, State), frame_header, 7}; + handle_frame(Type, Channel, Payload, + switch_callback(State, frame_header, 7)); _ -> throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) end; @@ -686,11 +686,11 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision}, mechanisms = <<"PLAIN AMQPLAIN">>, locales = <<"en_US">> }, ok = send_on_channel0(Sock, Start, Protocol), - {State#v1{connection = Connection#connection{ - timeout_sec = ?NORMAL_TIMEOUT, - protocol = Protocol}, - connection_state = starting}, - frame_header, 7}. + switch_callback(State#v1{connection = Connection#connection{ + timeout_sec = ?NORMAL_TIMEOUT, + protocol = Protocol}, + connection_state = starting}, + frame_header, 7). refuse_connection(Sock, Exception) -> ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end), |