summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2010-11-17 23:17:24 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2010-11-17 23:17:24 +0000
commit4b82f5cf92c92a585eb820b2f432ca38cb414ce9 (patch)
tree5270d95beb6e370920d5dcee6222857ca99c9fa3
parent908e2ec1c57d6af9c0db409048e8cde96f0037be (diff)
downloadrabbitmq-server-4b82f5cf92c92a585eb820b2f432ca38cb414ce9.tar.gz
minor optimisation: ask for next piece of data before processing
This reduces the likelihood of the reader process stalling, i.e. ending up in 'receive' when there is nothing in the mailbox.
-rw-r--r--src/rabbit_reader.erl24
1 files changed, 12 insertions, 12 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index e68fbca8..d93a23fe 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -320,10 +320,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
mainloop(Deb, State = #v1{parent = Parent, sock= Sock, recv_ref = Ref}) ->
receive
{inet_async, Sock, Ref, {ok, Data}} ->
- {State1, Callback1, Length1} =
- handle_input(State#v1.callback, Data,
- State#v1{recv_ref = none}),
- mainloop(Deb, switch_callback(State1, Callback1, Length1));
+ mainloop(Deb, handle_input(State#v1.callback, Data,
+ State#v1{recv_ref = none}));
{inet_async, Sock, Ref, {error, closed}} ->
if State#v1.connection_state =:= closed ->
State;
@@ -626,14 +624,16 @@ analyze_frame(_Type, _Body, _Protocol) ->
error.
handle_input(frame_header, <<Type:8,Channel:16,PayloadSize:32>>, State) ->
- {ensure_stats_timer(State), {frame_payload, Type, Channel, PayloadSize},
- PayloadSize + 1};
+ ensure_stats_timer(
+ switch_callback(State, {frame_payload, Type, Channel, PayloadSize},
+ PayloadSize + 1};
handle_input({frame_payload, Type, Channel, PayloadSize},
PayloadAndMarker, State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
- {handle_frame(Type, Channel, Payload, State), frame_header, 7};
+ handle_frame(Type, Channel, Payload,
+ switch_callback(State, frame_header, 7));
_ ->
throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
end;
@@ -686,11 +686,11 @@ start_connection({ProtocolMajor, ProtocolMinor, _ProtocolRevision},
mechanisms = <<"PLAIN AMQPLAIN">>,
locales = <<"en_US">> },
ok = send_on_channel0(Sock, Start, Protocol),
- {State#v1{connection = Connection#connection{
- timeout_sec = ?NORMAL_TIMEOUT,
- protocol = Protocol},
- connection_state = starting},
- frame_header, 7}.
+ switch_callback(State#v1{connection = Connection#connection{
+ timeout_sec = ?NORMAL_TIMEOUT,
+ protocol = Protocol},
+ connection_state = starting},
+ frame_header, 7).
refuse_connection(Sock, Exception) ->
ok = inet_op(fun () -> rabbit_net:send(Sock, <<"AMQP",0,0,9,1>>) end),