summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2013-02-21 11:50:19 +0000
committerSimon MacMullen <simon@rabbitmq.com>2013-02-21 11:50:19 +0000
commit97757bbfc78bac5f02b5d7d9b1a4630cb41852f7 (patch)
treeaaa9ac48628e9292dc7f5f97a74126c58711e292
parent82cb8cff4c2a73b62ca014d21769729b6e14e2a5 (diff)
downloadrabbitmq-server-97757bbfc78bac5f02b5d7d9b1a4630cb41852f7.tar.gz
Remove blocked_ctags, and a few knock-on simplifications.
-rw-r--r--src/rabbit_amqqueue_process.erl6
-rw-r--r--src/rabbit_limiter.erl57
2 files changed, 24 insertions, 39 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c02fd6b5..79a98208 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -457,8 +457,8 @@ deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, State) ->
false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
Consumer#consumer.ack_required,
Consumer#consumer.tag) of
- {consumer_blocked, Limiter2} ->
- block_consumer(C#cr{limiter = Limiter2}, E),
+ consumer_blocked ->
+ block_consumer(C, E),
{false, State};
channel_blocked ->
block_consumer(C#cr{is_limit_active = true}, E),
@@ -1138,7 +1138,7 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
none ->
Limiter;
{Credit, Drain} ->
- rabbit_limiter:initial_credit(
+ rabbit_limiter:credit(
Limiter, ConsumerTag, Credit, Drain)
end,
C1 = update_consumer_count(C#cr{limiter = Limiter2}, +1),
diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl
index e76fc217..ece3d1a4 100644
--- a/src/rabbit_limiter.erl
+++ b/src/rabbit_limiter.erl
@@ -24,14 +24,13 @@
disable/1]).
-export([limit/2, can_send/4, ack/2, register/2, unregister/2]).
-export([get_limit/1, block/1, unblock/1, is_consumer_blocked/2, is_blocked/1]).
--export([initial_credit/4, credit/4, drained/1, forget_consumer/2,
- copy_queue_state/2]).
+-export([credit/4, drained/1, forget_consumer/2, copy_queue_state/2]).
-import(rabbit_misc, [serial_add/2, serial_diff/2]).
%%----------------------------------------------------------------------------
--record(token, {pid, enabled, credits, blocked_ctags}).
+-record(token, {pid, enabled, credits}).
-ifdef(use_specs).
@@ -56,8 +55,6 @@
-spec(unblock/1 :: (token()) -> 'ok' | {'disabled', token()}).
-spec(is_blocked/1 :: (token()) -> boolean()).
-spec(is_consumer_blocked/2 :: (token(), rabbit_types:ctag()) -> boolean()).
--spec(initial_credit/4 :: (token(), rabbit_types:ctag(),
- non_neg_integer(), boolean()) -> token()).
-spec(credit/4 :: (token(), rabbit_types:ctag(), non_neg_integer(), boolean())
-> token()).
-spec(drained/1 :: (token())
@@ -87,10 +84,9 @@
start_link() -> gen_server2:start_link(?MODULE, [], []).
make_token() -> make_token(undefined).
-make_token(Pid) -> #token{pid = Pid,
- enabled = false,
- credits = dict:new(),
- blocked_ctags = []}.
+make_token(Pid) -> #token{pid = Pid,
+ enabled = false,
+ credits = dict:new()}.
is_enabled(#token{enabled = Enabled}) -> Enabled.
@@ -107,21 +103,15 @@ limit(Limiter, PrefetchCount) ->
%% breaching a limit. Note that we don't use maybe_call here in order
%% to avoid always going through with_exit_handler/2, even when the
%% limiter is disabled.
-can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits,
- blocked_ctags = BCTags},
+can_send(Token = #token{pid = Pid, enabled = Enabled, credits = Credits},
QPid, AckReq, CTag) ->
- ConsAllows = case dict:find(CTag, Credits) of
- {ok, #credit{credit = C}} when C > 0 -> true;
- {ok, #credit{}} -> false;
- error -> true
- end,
- case ConsAllows of
- true -> case not Enabled orelse call_can_send(Pid, QPid, AckReq) of
+ case is_consumer_blocked(Token, CTag) of
+ false -> case not Enabled orelse call_can_send(Pid, QPid, AckReq) of
true -> Credits2 = record_send_q(CTag, Credits),
Token#token{credits = Credits2};
false -> channel_blocked
end;
- false -> {consumer_blocked, Token#token{blocked_ctags = [CTag|BCTags]}}
+ true -> consumer_blocked
end.
call_can_send(Pid, QPid, AckRequired) ->
@@ -150,21 +140,18 @@ block(Limiter) ->
unblock(Limiter) ->
maybe_call(Limiter, {unblock, Limiter}, ok).
-is_consumer_blocked(#token{blocked_ctags = BCTags}, CTag) ->
- lists:member(CTag, BCTags).
+is_consumer_blocked(#token{credits = Credits}, CTag) ->
+ case dict:find(CTag, Credits) of
+ {ok, #credit{credit = C}} when C > 0 -> false;
+ {ok, #credit{}} -> true;
+ error -> false
+ end.
is_blocked(Limiter) ->
maybe_call(Limiter, is_blocked, false).
-initial_credit(Limiter = #token{credits = Credits}, CTag, Credit, Drain) ->
- {[], Credits2} = update_credit(CTag, Credit, Drain, Credits),
- Limiter#token{credits = Credits2}.
-
-credit(Limiter = #token{credits = Credits, blocked_ctags = BCTags},
- CTag, Credit, Drain) ->
- {Unblock, Credits2} = update_credit(CTag, Credit, Drain, Credits),
- Limiter#token{credits = Credits2,
- blocked_ctags = BCTags -- Unblock}.
+credit(Limiter = #token{credits = Credits}, CTag, Credit, Drain) ->
+ Limiter#token{credits = update_credit(CTag, Credit, Drain, Credits)}.
drained(Limiter = #token{credits = Credits}) ->
{CTagCredits, Credits2} =
@@ -176,13 +163,11 @@ drained(Limiter = #token{credits = Credits}) ->
end, {[], Credits}, Credits),
{CTagCredits, Limiter#token{credits = Credits2}}.
-forget_consumer(Limiter = #token{credits = Credits,
- blocked_ctags = BCTags}, CTag) ->
- Limiter#token{credits = dict:erase(CTag, Credits),
- blocked_ctags = BCTags -- [CTag]}.
+forget_consumer(Limiter = #token{credits = Credits}, CTag) ->
+ Limiter#token{credits = dict:erase(CTag, Credits)}.
-copy_queue_state(#token{credits = Credits, blocked_ctags = BCTags}, Token) ->
- Token#token{credits = Credits, blocked_ctags = BCTags}.
+copy_queue_state(#token{credits = Credits}, Token) ->
+ Token#token{credits = Credits}.
%%----------------------------------------------------------------------------
%% Queue-local code