summaryrefslogtreecommitdiff
path: root/src/rabbit_reader.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2012-01-10 17:11:19 +0000
committerSimon MacMullen <simon@rabbitmq.com>2012-01-10 17:11:19 +0000
commit9301c8fa6d9fad3aaab5870873797d342a0a97b6 (patch)
tree5d97bb868d70e6fcbbee5403c5b0c007a267aee9 /src/rabbit_reader.erl
parent616ffc3385107785f89589f2cd9759424c264c3d (diff)
downloadrabbitmq-server-9301c8fa6d9fad3aaab5870873797d342a0a97b6.tar.gz
Flow control channel -> queue.
Diffstat (limited to 'src/rabbit_reader.erl')
-rw-r--r--src/rabbit_reader.erl66
1 files changed, 46 insertions, 20 deletions
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4ac387c5..9e3b58aa 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -25,7 +25,7 @@
-export([init/4, mainloop/2]).
--export([conserve_memory/2, server_properties/1]).
+-export([conserve_memory/3, server_properties/1]).
-export([process_channel_frame/5]). %% used by erlang-client
@@ -40,7 +40,7 @@
-record(v1, {parent, sock, connection, callback, recv_len, pending_recv,
connection_state, queue_collector, heartbeater, stats_timer,
channel_sup_sup_pid, start_heartbeat_fun, buf, buf_len,
- auth_mechanism, auth_state}).
+ auth_mechanism, auth_state, blockers}).
-define(STATISTICS_KEYS, [pid, recv_oct, recv_cnt, send_oct, send_cnt,
send_pend, state, channels]).
@@ -71,7 +71,7 @@
-spec(info/2 :: (pid(), rabbit_types:info_keys()) -> rabbit_types:infos()).
-spec(force_event_refresh/1 :: (pid()) -> 'ok').
-spec(shutdown/2 :: (pid(), string()) -> 'ok').
--spec(conserve_memory/2 :: (pid(), boolean()) -> 'ok').
+-spec(conserve_memory/3 :: (pid() | atom(), pid(), boolean()) -> 'ok').
-spec(server_properties/1 :: (rabbit_types:protocol()) ->
rabbit_framing:amqp_table()).
@@ -137,8 +137,8 @@ info(Pid, Items) ->
force_event_refresh(Pid) ->
gen_server:cast(Pid, force_event_refresh).
-conserve_memory(Pid, Conserve) ->
- Pid ! {conserve_memory, Conserve},
+conserve_memory(Blocker, Pid, Conserve) ->
+ Pid ! {conserve_memory, Blocker, Conserve},
ok.
server_properties(Protocol) ->
@@ -220,7 +220,8 @@ start_connection(Parent, ChannelSupSupPid, Collector, StartHeartbeatFun, Deb,
buf = [],
buf_len = 0,
auth_mechanism = none,
- auth_state = none},
+ auth_state = none,
+ blockers = sets:new()},
try
recvloop(Deb, switch_callback(rabbit_event:init_stats_timer(
State, #v1.stats_timer),
@@ -276,8 +277,8 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{other, Other} -> handle_other(Other, Deb, State)
end.
-handle_other({conserve_memory, Conserve}, Deb, State) ->
- recvloop(Deb, internal_conserve_memory(Conserve, State));
+handle_other({conserve_memory, Blocker, Conserve}, Deb, State) ->
+ recvloop(Deb, internal_conserve_memory(Conserve, Blocker, State));
handle_other({channel_closing, ChPid}, Deb, State) ->
ok = rabbit_channel:ready_for_close(ChPid),
channel_cleanup(ChPid),
@@ -361,24 +362,47 @@ terminate(Explanation, State) when ?IS_RUNNING(State) ->
terminate(_Explanation, State) ->
{force, State}.
-internal_conserve_memory(true, State = #v1{connection_state = running}) ->
- State#v1{connection_state = blocking};
-internal_conserve_memory(false, State = #v1{connection_state = blocking}) ->
- State#v1{connection_state = running};
-internal_conserve_memory(false, State = #v1{connection_state = blocked,
- heartbeater = Heartbeater}) ->
- ok = rabbit_heartbeat:resume_monitor(Heartbeater),
- State#v1{connection_state = running};
-internal_conserve_memory(_Conserve, State) ->
+internal_conserve_memory(true, Blocker,
+ State = #v1{connection_state = running,
+ blockers = Blockers}) ->
+ 0 = sets:size(Blockers), %% ASSERT
+ State#v1{connection_state = blocking,
+ blockers = sets:add_element(Blocker, Blockers)};
+internal_conserve_memory(true, Blocker,
+ State = #v1{blockers = Blockers}) ->
+ State#v1{blockers = sets:add_element(Blocker, Blockers)};
+internal_conserve_memory(false, Blocker,
+ State = #v1{connection_state = blocking,
+ blockers = Blockers}) ->
+ NewBlockers = sets:del_element(Blocker, Blockers),
+ case sets:size(NewBlockers) of
+ 0 -> State#v1{connection_state = running,
+ blockers = NewBlockers};
+
+ _ -> State#v1{blockers = NewBlockers}
+ end;
+internal_conserve_memory(false, Blocker,
+ State = #v1{connection_state = blocked,
+ blockers = Blockers,
+ heartbeater = Heartbeater}) ->
+ NewBlockers = sets:del_element(Blocker, Blockers),
+ case sets:size(NewBlockers) of
+ 0 -> ok = rabbit_heartbeat:resume_monitor(Heartbeater),
+ State#v1{connection_state = running,
+ blockers = NewBlockers};
+
+ _ -> State#v1{blockers = NewBlockers}
+ end;
+internal_conserve_memory(_Conserve, _Blocker, State) ->
State.
internal_bump_credit(Msg, State) ->
rabbit_flow:bump(Msg),
- internal_conserve_memory(false, State).
+ internal_conserve_memory(false, self(), State).
internal_check_credit(State) when ?IS_RUNNING(State) ->
case rabbit_flow:blocked() of
- true -> internal_conserve_memory(true, State);
+ true -> internal_conserve_memory(true, self(), State);
false -> State
end.
@@ -713,7 +737,9 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
NewConnection = Connection#connection{vhost = VHostPath},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
State1 = internal_conserve_memory(
- rabbit_alarm:register(self(), {?MODULE, conserve_memory, []}),
+ rabbit_alarm:register(self(), {?MODULE, conserve_memory,
+ [memory_alarm]}),
+ memory_alarm,
State#v1{connection_state = running,
connection = NewConnection}),
rabbit_event:notify(connection_created,