diff options
author | Matthew Sackman <matthew@lshift.net> | 2010-03-02 18:45:17 +0000 |
---|---|---|
committer | Matthew Sackman <matthew@lshift.net> | 2010-03-02 18:45:17 +0000 |
commit | b7841bd1ecd2134e809dfba2c9feedb0b7957ea1 (patch) | |
tree | 99753fc5a2ca2489e4e3e9ff299ad1baaef50071 /src/rabbit_limiter.erl | |
parent | a5bddb9ebdc68ea26ad44c71009b084dbf6e648e (diff) | |
download | rabbitmq-server-b7841bd1ecd2134e809dfba2c9feedb0b7957ea1.tar.gz |
Incomplete implementation
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 35 |
1 files changed, 33 insertions, 2 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index d998499d..43f31511 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -37,7 +37,7 @@ handle_info/2]). -export([start_link/2, shutdown/1]). -export([limit/2, can_send/3, ack/2, register/2, unregister/2]). --export([get_limit/1]). +-export([get_limit/1, block/1, unblock/1]). %%---------------------------------------------------------------------------- @@ -53,6 +53,8 @@ -spec(register/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(unregister/2 :: (maybe_pid(), pid()) -> 'ok'). -spec(get_limit/1 :: (maybe_pid()) -> non_neg_integer()). +-spec(block/1 :: (maybe_pid()) -> 'ok'). +-spec(unblock/1 :: (maybe_pid()) -> 'ok' | 'stopped'). -endif. @@ -60,6 +62,7 @@ -record(lim, {prefetch_count = 0, ch_pid, + blocked = false, queues = dict:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% 'Notify' is a boolean that indicates whether a queue should be @@ -112,6 +115,16 @@ get_limit(Pid) -> fun () -> 0 end, fun () -> gen_server2:pcall(Pid, 9, get_limit, infinity) end). +block(undefined) -> + ok; +block(LimiterPid) -> + gen_server2:call(LimiterPid, block, infinity). + +unblock(undefined) -> + ok; +unblock(LimiterPid) -> + gen_server2:call(LimiterPid, unblock, infinity). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -119,6 +132,9 @@ get_limit(Pid) -> init([ChPid, UnackedMsgCount]) -> {ok, #lim{ch_pid = ChPid, volume = UnackedMsgCount}}. +handle_call({can_send, _QPid, _AckRequired}, _From, + State = #lim{blocked = true}) -> + {reply, false, State}; handle_call({can_send, QPid, AckRequired}, _From, State = #lim{volume = Volume}) -> case limit_reached(State) of @@ -131,11 +147,23 @@ handle_call({can_send, QPid, AckRequired}, _From, handle_call(get_limit, _From, State = #lim{prefetch_count = PrefetchCount}) -> {reply, PrefetchCount, State}; +handle_call({limit, PrefetchCount}, _From, State = #lim{blocked = true}) -> + {reply, ok, State#lim{prefetch_count = PrefetchCount}}; handle_call({limit, PrefetchCount}, _From, State) -> State1 = maybe_notify(State, State#lim{prefetch_count = PrefetchCount}), case PrefetchCount == 0 of true -> {stop, normal, stopped, State1}; false -> {reply, ok, State1} + end; + +handle_call(block, _From, State) -> + {reply, ok, State#lim{blocked = true}}; + +handle_call(unblock, _From, State = #lim{prefetch_count = PrefetchCount}) -> + State1 = maybe_notify(State, State#lim{blocked = false}), + case PrefetchCount == 0 of + true -> {stop, normal, stopped, State1}; + false -> {reply, ok, State1} end. handle_cast(shutdown, State) -> @@ -167,7 +195,8 @@ code_change(_, State, _) -> %%---------------------------------------------------------------------------- maybe_notify(OldState, NewState) -> - case limit_reached(OldState) andalso not(limit_reached(NewState)) of + case (limit_reached(OldState) andalso not limit_reached(NewState)) orelse + (is_blocked(OldState) andalso not is_blocked(NewState)) of true -> notify_queues(NewState); false -> NewState end. @@ -175,6 +204,8 @@ maybe_notify(OldState, NewState) -> limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. +is_blocked(#lim{blocked = Blocked}) -> Blocked. + remember_queue(QPid, State = #lim{queues = Queues}) -> case dict:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), |