diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/rabbit_reader.erl | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 2efda4fb..2a1ae34e 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -57,8 +57,8 @@ %--------------------------------------------------------------------------- --record(v1, {sock, connection, callback, recv_ref, connection_state, - queue_collector, heartbeater}). +-record(v1, {sock, connection, callback, recv_length, recv_ref, + connection_state, queue_collector, heartbeater}). -define(INFO_KEYS, [pid, address, port, peer_address, peer_port, @@ -262,6 +262,7 @@ start_connection(Parent, Deb, Sock, SockTransform) -> client_properties = none, protocol = none}, callback = uninitialized_callback, + recv_length = 0, recv_ref = none, connection_state = pre_init, queue_collector = Collector, @@ -364,11 +365,11 @@ mainloop(Parent, Deb, State = #v1{sock= Sock, recv_ref = Ref}) -> switch_callback(State = #v1{connection_state = blocked, heartbeater = Heartbeater}, Callback, Length) -> ok = rabbit_heartbeat:pause_monitor(Heartbeater), - State#v1{callback = {Callback, Length}, recv_ref = none}; + State#v1{callback = Callback, recv_length = Length, recv_ref = none}; switch_callback(State, Callback, Length) -> Ref = inet_op(fun () -> rabbit_net:async_recv( State#v1.sock, Length, infinity) end), - State#v1{callback = Callback, recv_ref = Ref}. + State#v1{callback = Callback, recv_length = Length, recv_ref = Ref}. terminate(Explanation, State) when ?IS_RUNNING(State) -> {normal, send_exception(State, 0, @@ -382,9 +383,10 @@ internal_conserve_memory(true, State = #v1{connection_state = running}) -> internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> State#v1{connection_state = running}; internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater, - callback = {Callback, Length}, - recv_ref = none}) -> + heartbeater = Heartbeater, + callback = Callback, + recv_length = Length, + recv_ref = none}) -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), switch_callback(State#v1{connection_state = running}, Callback, Length); internal_conserve_memory(_Conserve, State) -> |