summaryrefslogtreecommitdiff
path: root/src/rabbit_limiter.erl
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-03-02 18:45:17 +0000
committerMatthew Sackman <matthew@lshift.net>2010-03-02 18:45:17 +0000
commitb7841bd1ecd2134e809dfba2c9feedb0b7957ea1 (patch)
tree99753fc5a2ca2489e4e3e9ff299ad1baaef50071 /src/rabbit_limiter.erl
parenta5bddb9ebdc68ea26ad44c71009b084dbf6e648e (diff)
downloadrabbitmq-server-b7841bd1ecd2134e809dfba2c9feedb0b7957ea1.tar.gz
Incomplete implementation
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r--src/rabbit_limiter.erl35
1 files changed, 33 insertions, 2 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index d998499d..43f31511 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -37,7 +37,7 @@
handle_info/2]).
-export([start_link/2, shutdown/1]).
-export([limit/2, can_send/3, ack/2, register/2, unregister/2]).
--export([get_limit/1]).
+-export([get_limit/1, block/1, unblock/1]).
%%----------------------------------------------------------------------------
@@ -53,6 +53,8 @@
-spec(register/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok').
-spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()).
+-spec(block/1 :: (maybe_pid()) -> 'ok').
+-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped').
-endif.
@@ -60,6 +62,7 @@
-record(lim, {prefetch_count = 0,
ch_pid,
+ blocked = false,
queues = dict:new(), % QPid -> {MonitorRef, Notify}
volume = 0}).
%% 'Notify' is a boolean that indicates whether a queue should be
@@ -112,6 +115,16 @@ get_limit(Pid) ->
fun () -> 0 end,
fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end).
+block(undefined) ->
+ ok;
+block(LimiterPid) ->
+ gen_server2:call(LimiterPid, block, infinity).
+
+unblock(undefined) ->
+ ok;
+unblock(LimiterPid) ->
+ gen_server2:call(LimiterPid, unblock, infinity).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -119,6 +132,9 @@ get_limit(Pid) ->
init([ChPid, UnackedMsgCount]) ->
{ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}.
+handle_call({can_send, _QPid, _AckRequired}, _From,
+ State = #lim{blocked = true}) ->
+ {reply, false, State};
handle_call({can_send, QPid, AckRequired}, _From,
State = #lim{volume = Volume}) ->
case limit_reached(State) of
@@ -131,11 +147,23 @@ handle_call({can_send, QPid, AckRequired}, _From,
handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) ->
{reply, PrefetchCount, State};
+handle_call({limit, PrefetchCount}, _From, State = #lim{blocked = true}) ->
+ {reply, ok, State#lim{prefetch_count = PrefetchCount}};
handle_call({limit, PrefetchCount}, _From, State) ->
State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}),
case PrefetchCount == 0 of
true -> {stop, normal, stopped, State1};
false -> {reply, ok, State1}
+ end;
+
+handle_call(block, _From, State) ->
+ {reply, ok, State#lim{blocked = true}};
+
+handle_call(unblock, _From, State = #lim{prefetch_count = PrefetchCount}) ->
+ State1 = maybe_notify(State, State#lim{blocked = false}),
+ case PrefetchCount == 0 of
+ true -> {stop, normal, stopped, State1};
+ false -> {reply, ok, State1}
end.
handle_cast(shutdown, State) ->
@@ -167,7 +195,8 @@ code_change(_, State, _) ->
%%----------------------------------------------------------------------------
maybe_notify(OldState, NewState) ->
- case limit_reached(OldState) andalso not(limit_reached(NewState)) of
+ case (limit_reached(OldState) andalso not limit_reached(NewState)) orelse
+ (is_blocked(OldState) andalso not is_blocked(NewState)) of
true -> notify_queues(NewState);
false -> NewState
end.
@@ -175,6 +204,8 @@ maybe_notify(OldState, NewState) ->
limit_reached(#lim{prefetch_count = Limit, volume = Volume}) ->
Limit =/= 0 andalso Volume >= Limit.
+is_blocked(#lim{blocked = Blocked}) -> Blocked.
+
remember_queue(QPid, State = #lim{queues = Queues}) ->
case dict:is_key(QPid, Queues) of
false -> MRef = erlang:monitor(process, QPid),