diff options
author | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 15:05:39 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@rabbitmq.com> | 2014-01-22 15:05:39 +0000 |
commit | d7b0e43af6060abf3e477b71b1137dff8889c6cb (patch) | |
tree | e598f7e50c6990c5bba853444f3f690fe9ed1362 /src/rabbit_queue_consumers.erl | |
parent | aaec8e3c296797cc0c83a2ab2e408111df6f078e (diff) | |
download | rabbitmq-server-d7b0e43af6060abf3e477b71b1137dff8889c6cb.tar.gz |
refactor: basic_consume/10 -> /9
cherry-picked from bug24297
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 24 |
1 files changed, 16 insertions, 8 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 908e4783..dae8815e 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -17,7 +17,7 @@ -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, - unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, + unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/2, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, @@ -57,7 +57,6 @@ -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). --type credit_args() :: {non_neg_integer(), boolean()} | 'none'. -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. -spec new() -> state(). @@ -68,8 +67,7 @@ -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - credit_args(), rabbit_framing:amqp_table(), boolean(), - state()) -> state(). + rabbit_framing:amqp_table(), boolean(), state()) -> state(). -spec remove(ch(), rabbit_types:ctag(), state()) -> 'not_found' | state(). -spec erase_ch(ch(), state()) -> @@ -120,8 +118,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, - IsEmpty, State = #state{consumers = Consumers}) -> +add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, + State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -129,14 +127,14 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs, false -> Limiter end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, - update_ch_record(case CreditArgs of + update_ch_record(case parse_credit_args(Args) of none -> C1; {Crd, Drain} -> credit_and_drain( C1, ConsumerTag, Crd, Drain, IsEmpty) end), Consumer = #consumer{tag = ConsumerTag, ack_required = not NoAck, - args = OtherArgs}, + args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) -> @@ -324,6 +322,16 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) -> %%---------------------------------------------------------------------------- +parse_credit_args(Arguments) -> + case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, Credit}, {bool, Drain}} -> {Credit, Drain}; + _ -> none + end; + undefined -> none + end. + lookup_ch(ChPid) -> case get({ch, ChPid}) of undefined -> not_found; |