summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-13 15:32:44 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-13 15:32:44 +0000
commit312715ffb05e9a85c4c830b1b5fe5f106b8d151c (patch)
tree0058d9a903e770981804acbdfc5bce2a8845501d /src/rabbit_queue_consumers.erl
parent065c6794c4f6279487d5db45ebbf610d57e70335 (diff)
parentebc2aa448dcf2825508e1a8a101844e133b88e5d (diff)
downloadrabbitmq-server-312715ffb05e9a85c4c830b1b5fe5f106b8d151c.tar.gz
Merge in default.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl65
1 files changed, 38 insertions, 27 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 3ae29d30..63de546c 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -20,8 +20,8 @@
unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/4,
possibly_unblock/3,
- resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4,
- utilisation/1]).
+ resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
+ credit/6, utilisation/1]).
%%----------------------------------------------------------------------------
@@ -84,11 +84,11 @@
%% -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'.
-spec possibly_unblock(cr_fun(), ch(), state()) ->
'unchanged' | {'unblocked', state()}.
--spec resume_fun() -> cr_fun().
--spec notify_sent_fun(non_neg_integer()) -> cr_fun().
--spec activate_limit_fun() -> cr_fun().
--spec credit_fun(boolean(), non_neg_integer(), boolean(),
- rabbit_types:ctag()) -> cr_fun().
+-spec resume_fun() -> cr_fun().
+-spec notify_sent_fun(non_neg_integer()) -> cr_fun().
+-spec activate_limit_fun() -> cr_fun().
+-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(),
+ state()) -> 'unchanged' | {'unblocked', state()}.
-spec utilisation(state()) -> ratio().
-endif.
@@ -128,19 +128,15 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args,
true -> rabbit_limiter:activate(Limiter);
false -> Limiter
end,
- Limiter2 = case CreditArgs of
- none -> Limiter1;
- {credit, C, D} -> rabbit_limiter:credit(
- Limiter1, ConsumerTag, C, IsEmpty, D);
- {prefetch , P} -> rabbit_limiter:set_consumer_prefetch(
- Limiter1, ConsumerTag, P)
- end,
- C1 = C#cr{consumer_count = Count + 1,
- limiter = Limiter2},
- update_ch_record(case IsEmpty of
- true -> send_drained(C1);
- false -> C1
- end),
+ C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
+ update_ch_record(
+ case CreditArgs of
+ none -> C1;
+ {credit, C, D} -> credit_and_drain(C1, ConsumerTag, C, D, IsEmpty);
+ {prefetch, P} -> Limiter2 = rabbit_limiter:set_consumer_prefetch(
+ Limiter1, ConsumerTag, P),
+ C1#cr{limiter = Limiter2}
+ end),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck,
args = Args},
@@ -314,13 +310,19 @@ activate_limit_fun() ->
C#cr{limiter = rabbit_limiter:activate(Limiter)}
end.
-credit_fun(IsEmpty, Credit, Drain, CTag) ->
- fun (C = #cr{limiter = Limiter}) ->
- C1 = C#cr{limiter = rabbit_limiter:credit(
- Limiter, CTag, Credit, IsEmpty, Drain)},
- case Drain andalso IsEmpty of
- true -> send_drained(C1);
- false -> C1
+credit(IsEmpty, Credit, Drain, ChPid, CTag, State) ->
+ case lookup_ch(ChPid) of
+ not_found ->
+ unchanged;
+ #cr{limiter = Limiter} = C ->
+ C1 = #cr{limiter = Limiter1} =
+ credit_and_drain(C, CTag, Credit, Drain, IsEmpty),
+ case is_ch_blocked(C1) orelse
+ (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse
+ rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of
+ true -> update_ch_record(C1),
+ unchanged;
+ false -> unblock(C1, State)
end
end.
@@ -388,6 +390,15 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) ->
C#cr{limiter = Limiter2}
end.
+credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter},
+ CTag, Credit, Drain, IsEmpty) ->
+ case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of
+ {true, Limiter1} -> rabbit_channel:send_drained(ChPid,
+ [{CTag, Credit}]),
+ C#cr{limiter = Limiter1};
+ {false, Limiter1} -> C#cr{limiter = Limiter1}
+ end.
+
tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList].
add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) ->