summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-04-07 13:01:17 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-04-07 13:01:17 +0100
commit93674ad1716b7c5dd07bcbc255599d4e3fd12828 (patch)
tree12f9ab57a99e15965cf6253267047478a4e1ef86
parent7452bf152ed2030df9071b6ae9a5ffacdc5654fb (diff)
downloadrabbitmq-server-93674ad1716b7c5dd07bcbc255599d4e3fd12828.tar.gz
get heartbeat monitor to pause when it should
On 'default' we kick off another prim_inet:async_recv *before* handling the frame we've just received. This is done for performance reasons - essentially we are reading ahead - and leads to the following sequence of events: 1. receive memory alarm -> change state to 'blocking' 2. receive a 'publish' method frame 3. kick off another prim_inet:async_recv 4. handle frame, detecting that it is a 'publish' frame and thus changing the state to 'blocked' 5. receive the frame header for another frame (e.g. the message header, or could be something on another channel, or a heartbeat) 6. since the state is 'blocked' and we pause the heartbeat monitor and *don't* kick off another prim_inet:async_recv On this branch we don't read ahead since a) that would complicate the logic a fair bit, and b) we could end up draining a fair chunk of data from the socket, rather than just a frame header. As a result we need to make sure the heartbeat monitor gets paused as soon as we transition to the 'blocked' state.
-rw-r--r--src/rabbit_reader.erl4
1 files changed, 2 insertions, 2 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 79210268..e210dba1 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -521,8 +521,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize},
PayloadAndMarker, State) ->
case PayloadAndMarker of
<<Payload:PayloadSize/binary, ?FRAME_END>> ->
- handle_frame(Type, Channel, Payload,
- switch_callback(State, frame_header, 7));
+ switch_callback(handle_frame(Type, Channel, Payload, State),
+ frame_header, 7);
_ ->
throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker})
end;