From 9301c8fa6d9fad3aaab5870873797d342a0a97b6 Mon Sep 17 00:00:00 2001 From: Simon MacMullen Date: Tue, 10 Jan 2012 17:11:19 +0000 Subject: Flow control channel -> queue. --- src/rabbit_reader.erl | 66 +++++++++++++++++++++++++++++++++++---------------- 1 file changed, 46 insertions(+), 20 deletions(-) (limited to 'src/rabbit_reader.erl') diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4ac387c5..9e3b58aa 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -25,7 +25,7 @@ -export([init/4, mainloop/2]). --export([conserve_memory/2, server_properties/1]). +-export([conserve_memory/3, server_properties/1]). -export([process_channel_frame/5]). %% used by erlang-client @@ -40,7 +40,7 @@ -record(v1, {parent, sock, connection, callback, recv_len, pending_recv, connection_state, queue_collector, heartbeater, stats_timer, channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len, - auth_mechanism, auth_state}). + auth_mechanism, auth_state, blockers}). -define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt, send_pend, state, channels]). @@ -71,7 +71,7 @@ -spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()). -spec(force_event_refresh/1 :: (pid()) -> 'ok'). -spec(shutdown/2 :: (pid(), string()) -> 'ok'). --spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok'). +-spec(conserve_memory/3 :: (pid() | atom(), pid(), boolean()) -> 'ok'). -spec(server_properties/1 :: (rabbit_types:protocol()) -> rabbit_framing:amqp_table()). @@ -137,8 +137,8 @@ info(Pid, Items) -> force_event_refresh(Pid) -> gen_server:cast(Pid, force_event_refresh). -conserve_memory(Pid, Conserve) -> - Pid ! {conserve_memory, Conserve}, +conserve_memory(Blocker, Pid, Conserve) -> + Pid ! {conserve_memory, Blocker, Conserve}, ok. server_properties(Protocol) -> @@ -220,7 +220,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb, buf = [], buf_len = 0, auth_mechanism = none, - auth_state = none}, + auth_state = none, + blockers = sets:new()}, try recvloop(Deb, switch_callback(rabbit_event:init_stats_timer( State, #v1.stats_timer), @@ -276,8 +277,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {other, Other} -> handle_other(Other, Deb, State) end. -handle_other({conserve_memory, Conserve}, Deb, State) -> - recvloop(Deb, internal_conserve_memory(Conserve, State)); +handle_other({conserve_memory, Blocker, Conserve}, Deb, State) -> + recvloop(Deb, internal_conserve_memory(Conserve, Blocker, State)); handle_other({channel_closing, ChPid}, Deb, State) -> ok = rabbit_channel:ready_for_close(ChPid), channel_cleanup(ChPid), @@ -361,24 +362,47 @@ terminate(Explanation, State) when ?IS_RUNNING(State) -> terminate(_Explanation, State) -> {force, State}. -internal_conserve_memory(true, State = #v1{connection_state = running}) -> - State#v1{connection_state = blocking}; -internal_conserve_memory(false, State = #v1{connection_state = blocking}) -> - State#v1{connection_state = running}; -internal_conserve_memory(false, State = #v1{connection_state = blocked, - heartbeater = Heartbeater}) -> - ok = rabbit_heartbeat:resume_monitor(Heartbeater), - State#v1{connection_state = running}; -internal_conserve_memory(_Conserve, State) -> +internal_conserve_memory(true, Blocker, + State = #v1{connection_state = running, + blockers = Blockers}) -> + 0 = sets:size(Blockers), %% ASSERT + State#v1{connection_state = blocking, + blockers = sets:add_element(Blocker, Blockers)}; +internal_conserve_memory(true, Blocker, + State = #v1{blockers = Blockers}) -> + State#v1{blockers = sets:add_element(Blocker, Blockers)}; +internal_conserve_memory(false, Blocker, + State = #v1{connection_state = blocking, + blockers = Blockers}) -> + NewBlockers = sets:del_element(Blocker, Blockers), + case sets:size(NewBlockers) of + 0 -> State#v1{connection_state = running, + blockers = NewBlockers}; + + _ -> State#v1{blockers = NewBlockers} + end; +internal_conserve_memory(false, Blocker, + State = #v1{connection_state = blocked, + blockers = Blockers, + heartbeater = Heartbeater}) -> + NewBlockers = sets:del_element(Blocker, Blockers), + case sets:size(NewBlockers) of + 0 -> ok = rabbit_heartbeat:resume_monitor(Heartbeater), + State#v1{connection_state = running, + blockers = NewBlockers}; + + _ -> State#v1{blockers = NewBlockers} + end; +internal_conserve_memory(_Conserve, _Blocker, State) -> State. internal_bump_credit(Msg, State) -> rabbit_flow:bump(Msg), - internal_conserve_memory(false, State). + internal_conserve_memory(false, self(), State). internal_check_credit(State) when ?IS_RUNNING(State) -> case rabbit_flow:blocked() of - true -> internal_conserve_memory(true, State); + true -> internal_conserve_memory(true, self(), State); false -> State end. @@ -713,7 +737,9 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, NewConnection = Connection#connection{vhost = VHostPath}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), State1 = internal_conserve_memory( - rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}), + rabbit_alarm:register(self(), {?MODULE, conserve_memory, + [memory_alarm]}), + memory_alarm, State#v1{connection_state = running, connection = NewConnection}), rabbit_event:notify(connection_created, -- cgit v1.2.1