diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-07 13:01:17 +0100 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2011-04-07 13:01:17 +0100 |
commit | 93674ad1716b7c5dd07bcbc255599d4e3fd12828 (patch) | |
tree | 12f9ab57a99e15965cf6253267047478a4e1ef86 | |
parent | 7452bf152ed2030df9071b6ae9a5ffacdc5654fb (diff) | |
download | rabbitmq-server-93674ad1716b7c5dd07bcbc255599d4e3fd12828.tar.gz |
get heartbeat monitor to pause when it should
On 'default' we kick off another prim_inet:async_recv *before*
handling the frame we've just received. This is done for performance
reasons - essentially we are reading ahead - and leads to the following
sequence of events:
1. receive memory alarm -> change state to 'blocking'
2. receive a 'publish' method frame
3. kick off another prim_inet:async_recv
4. handle frame, detecting that it is a 'publish' frame and thus
changing the state to 'blocked'
5. receive the frame header for another frame (e.g. the message
header, or could be something on another channel, or a heartbeat)
6. since the state is 'blocked' and we pause the heartbeat
monitor and *don't* kick off another prim_inet:async_recv
On this branch we don't read ahead since a) that would complicate the
logic a fair bit, and b) we could end up draining a fair chunk of data
from the socket, rather than just a frame header. As a result we need
to make sure the heartbeat monitor gets paused as soon as we
transition to the 'blocked' state.
-rw-r--r-- | src/rabbit_reader.erl | 4 |
1 files changed, 2 insertions, 2 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 79210268..e210dba1 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -521,8 +521,8 @@ handle_input({frame_payload, Type, Channel, PayloadSize}, PayloadAndMarker, State) -> case PayloadAndMarker of <<Payload:PayloadSize/binary, ?FRAME_END>> -> - handle_frame(Type, Channel, Payload, - switch_callback(State, frame_header, 7)); + switch_callback(handle_frame(Type, Channel, Payload, State), + frame_header, 7); _ -> throw({bad_payload, Type, Channel, PayloadSize, PayloadAndMarker}) end; |