diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-13 15:32:44 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-13 15:32:44 +0000 |
commit | 312715ffb05e9a85c4c830b1b5fe5f106b8d151c (patch) | |
tree | 0058d9a903e770981804acbdfc5bce2a8845501d /src/rabbit_queue_consumers.erl | |
parent | 065c6794c4f6279487d5db45ebbf610d57e70335 (diff) | |
parent | ebc2aa448dcf2825508e1a8a101844e133b88e5d (diff) | |
download | rabbitmq-server-312715ffb05e9a85c4c830b1b5fe5f106b8d151c.tar.gz |
Merge in default.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 65 |
1 files changed, 38 insertions, 27 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 3ae29d30..63de546c 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -20,8 +20,8 @@ unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/4, possibly_unblock/3, - resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, credit_fun/4, - utilisation/1]). + resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, + credit/6, utilisation/1]). %%---------------------------------------------------------------------------- @@ -84,11 +84,11 @@ %% -spec subtract_acks(ch(), [ack()]) -> 'not_found' | 'ok'. -spec possibly_unblock(cr_fun(), ch(), state()) -> 'unchanged' | {'unblocked', state()}. --spec resume_fun() -> cr_fun(). --spec notify_sent_fun(non_neg_integer()) -> cr_fun(). --spec activate_limit_fun() -> cr_fun(). --spec credit_fun(boolean(), non_neg_integer(), boolean(), - rabbit_types:ctag()) -> cr_fun(). +-spec resume_fun() -> cr_fun(). +-spec notify_sent_fun(non_neg_integer()) -> cr_fun(). +-spec activate_limit_fun() -> cr_fun(). +-spec credit(boolean(), integer(), boolean(), ch(), rabbit_types:ctag(), + state()) -> 'unchanged' | {'unblocked', state()}. -spec utilisation(state()) -> ratio(). -endif. @@ -128,19 +128,15 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args, true -> rabbit_limiter:activate(Limiter); false -> Limiter end, - Limiter2 = case CreditArgs of - none -> Limiter1; - {credit, C, D} -> rabbit_limiter:credit( - Limiter1, ConsumerTag, C, IsEmpty, D); - {prefetch , P} -> rabbit_limiter:set_consumer_prefetch( - Limiter1, ConsumerTag, P) - end, - C1 = C#cr{consumer_count = Count + 1, - limiter = Limiter2}, - update_ch_record(case IsEmpty of - true -> send_drained(C1); - false -> C1 - end), + C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, + update_ch_record( + case CreditArgs of + none -> C1; + {credit, C, D} -> credit_and_drain(C1, ConsumerTag, C, D, IsEmpty); + {prefetch, P} -> Limiter2 = rabbit_limiter:set_consumer_prefetch( + Limiter1, ConsumerTag, P), + C1#cr{limiter = Limiter2} + end), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck, args = Args}, @@ -314,13 +310,19 @@ activate_limit_fun() -> C#cr{limiter = rabbit_limiter:activate(Limiter)} end. -credit_fun(IsEmpty, Credit, Drain, CTag) -> - fun (C = #cr{limiter = Limiter}) -> - C1 = C#cr{limiter = rabbit_limiter:credit( - Limiter, CTag, Credit, IsEmpty, Drain)}, - case Drain andalso IsEmpty of - true -> send_drained(C1); - false -> C1 +credit(IsEmpty, Credit, Drain, ChPid, CTag, State) -> + case lookup_ch(ChPid) of + not_found -> + unchanged; + #cr{limiter = Limiter} = C -> + C1 = #cr{limiter = Limiter1} = + credit_and_drain(C, CTag, Credit, Drain, IsEmpty), + case is_ch_blocked(C1) orelse + (not rabbit_limiter:is_consumer_blocked(Limiter, CTag)) orelse + rabbit_limiter:is_consumer_blocked(Limiter1, CTag) of + true -> update_ch_record(C1), + unchanged; + false -> unblock(C1, State) end end. @@ -388,6 +390,15 @@ send_drained(C = #cr{ch_pid = ChPid, limiter = Limiter}) -> C#cr{limiter = Limiter2} end. +credit_and_drain(C = #cr{ch_pid = ChPid, limiter = Limiter}, + CTag, Credit, Drain, IsEmpty) -> + case rabbit_limiter:credit(Limiter, CTag, Credit, Drain, IsEmpty) of + {true, Limiter1} -> rabbit_channel:send_drained(ChPid, + [{CTag, Credit}]), + C#cr{limiter = Limiter1}; + {false, Limiter1} -> C#cr{limiter = Limiter1} + end. + tags(CList) -> [CTag || {_P, {_ChPid, #consumer{tag = CTag}}} <- CList]. add_consumer({ChPid, Consumer = #consumer{args = Args}}, Queue) -> |