diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-02-21 11:50:19 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-02-21 11:50:19 +0000 |
commit | 97757bbfc78bac5f02b5d7d9b1a4630cb41852f7 (patch) | |
tree | aaa9ac48628e9292dc7f5f97a74126c58711e292 /src/rabbit_limiter.erl | |
parent | 82cb8cff4c2a73b62ca014d21769729b6e14e2a5 (diff) | |
download | rabbitmq-server-97757bbfc78bac5f02b5d7d9b1a4630cb41852f7.tar.gz |
Remove blocked_ctags, and a few knock-on simplifications.
Diffstat (limited to 'src/rabbit_limiter.erl')
-rw-r--r-- | src/rabbit_limiter.erl | 57 |
1 files changed, 21 insertions, 36 deletions
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e76fc217..ece3d1a4 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -24,14 +24,13 @@ disable/1]). -export([limit/2, can_send/4, ack/2, register/2, unregister/2]). -export([get_limit/1, block/1, unblock/1, is_consumer_blocked/2, is_blocked/1]). --export([initial_credit/4, credit/4, drained/1, forget_consumer/2, - copy_queue_state/2]). +-export([credit/4, drained/1, forget_consumer/2, copy_queue_state/2]). -import(rabbit_misc, [serial_add/2, serial_diff/2]). %%---------------------------------------------------------------------------- --record(token, {pid, enabled, credits, blocked_ctags}). +-record(token, {pid, enabled, credits}). -ifdef(use_specs). @@ -56,8 +55,6 @@ -spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}). -spec(is_blocked/1 :: (token()) -> boolean()). -spec(is_consumer_blocked/2 :: (token(), rabbit_types:ctag()) -> boolean()). --spec(initial_credit/4 :: (token(), rabbit_types:ctag(), - non_neg_integer(), boolean()) -> token()). -spec(credit/4 :: (token(), rabbit_types:ctag(), non_neg_integer(), boolean()) -> token()). -spec(drained/1 :: (token()) @@ -87,10 +84,9 @@ start_link() -> gen_server2:start_link(?MODULE, [], []). make_token() -> make_token(undefined). -make_token(Pid) -> #token{pid = Pid, - enabled = false, - credits = dict:new(), - blocked_ctags = []}. +make_token(Pid) -> #token{pid = Pid, + enabled = false, + credits = dict:new()}. is_enabled(#token{enabled = Enabled}) -> Enabled. @@ -107,21 +103,15 @@ limit(Limiter, PrefetchCount) -> %% breaching a limit. Note that we don't use maybe_call here in order %% to avoid always going through with_exit_handler/2, even when the %% limiter is disabled. -can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits, - blocked_ctags = BCTags}, +can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits}, QPid, AckReq, CTag) -> - ConsAllows = case dict:find(CTag, Credits) of - {ok, #credit{credit = C}} when C > 0 -> true; - {ok, #credit{}} -> false; - error -> true - end, - case ConsAllows of - true -> case not Enabled orelse call_can_send(Pid, QPid, AckReq) of + case is_consumer_blocked(Token, CTag) of + false -> case not Enabled orelse call_can_send(Pid, QPid, AckReq) of true -> Credits2 = record_send_q(CTag, Credits), Token#token{credits = Credits2}; false -> channel_blocked end; - false -> {consumer_blocked, Token#token{blocked_ctags = [CTag|BCTags]}} + true -> consumer_blocked end. call_can_send(Pid, QPid, AckRequired) -> @@ -150,21 +140,18 @@ block(Limiter) -> unblock(Limiter) -> maybe_call(Limiter, {unblock, Limiter}, ok). -is_consumer_blocked(#token{blocked_ctags = BCTags}, CTag) -> - lists:member(CTag, BCTags). +is_consumer_blocked(#token{credits = Credits}, CTag) -> + case dict:find(CTag, Credits) of + {ok, #credit{credit = C}} when C > 0 -> false; + {ok, #credit{}} -> true; + error -> false + end. is_blocked(Limiter) -> maybe_call(Limiter, is_blocked, false). -initial_credit(Limiter = #token{credits = Credits}, CTag, Credit, Drain) -> - {[], Credits2} = update_credit(CTag, Credit, Drain, Credits), - Limiter#token{credits = Credits2}. - -credit(Limiter = #token{credits = Credits, blocked_ctags = BCTags}, - CTag, Credit, Drain) -> - {Unblock, Credits2} = update_credit(CTag, Credit, Drain, Credits), - Limiter#token{credits = Credits2, - blocked_ctags = BCTags -- Unblock}. +credit(Limiter = #token{credits = Credits}, CTag, Credit, Drain) -> + Limiter#token{credits = update_credit(CTag, Credit, Drain, Credits)}. drained(Limiter = #token{credits = Credits}) -> {CTagCredits, Credits2} = @@ -176,13 +163,11 @@ drained(Limiter = #token{credits = Credits}) -> end, {[], Credits}, Credits), {CTagCredits, Limiter#token{credits = Credits2}}. -forget_consumer(Limiter = #token{credits = Credits, - blocked_ctags = BCTags}, CTag) -> - Limiter#token{credits = dict:erase(CTag, Credits), - blocked_ctags = BCTags -- [CTag]}. +forget_consumer(Limiter = #token{credits = Credits}, CTag) -> + Limiter#token{credits = dict:erase(CTag, Credits)}. -copy_queue_state(#token{credits = Credits, blocked_ctags = BCTags}, Token) -> - Token#token{credits = Credits, blocked_ctags = BCTags}. +copy_queue_state(#token{credits = Credits}, Token) -> + Token#token{credits = Credits}. %%---------------------------------------------------------------------------- %% Queue-local code |