diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 13:37:42 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-01-15 13:37:42 +0000 |
commit | c7a01e7829f4dcb72610c69de4be2e78e3480a99 (patch) | |
tree | e142744cddbd6a125312af763b1dabb8e48a843e /src/rabbit_queue_consumers.erl | |
parent | 47d59e307b523862aa1aaa3990c93d150d87726c (diff) | |
download | rabbitmq-server-c7a01e7829f4dcb72610c69de4be2e78e3480a99.tar.gz |
Move credit args parsing into rabbit_queue_consumers, to reduce some nasty arities and increase locality.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 24 |
1 files changed, 18 insertions, 6 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index c410a099..f864a5ba 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -17,7 +17,7 @@ -module(rabbit_queue_consumers). -export([new/0, max_active_priority/1, inactive/1, all/1, count/0, - unacknowledged_message_count/0, add/9, remove/3, erase_ch/2, + unacknowledged_message_count/0, add/8, remove/3, erase_ch/2, send_drained/0, deliver/3, record_ack/3, subtract_acks/4, possibly_unblock/3, resume_fun/0, notify_sent_fun/1, activate_limit_fun/0, @@ -57,7 +57,6 @@ -type ch() :: pid(). -type ack() :: non_neg_integer(). -type cr_fun() :: fun ((#cr{}) -> #cr{}). --type credit_args() :: {non_neg_integer(), boolean()} | 'none'. -type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}. -spec new() -> state(). @@ -68,7 +67,7 @@ -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), - credit_args(), rabbit_framing:amqp_table(), boolean(), + rabbit_framing:amqp_table(), boolean(), state()) -> state(). -spec remove(ch(), rabbit_types:ctag(), state()) -> 'not_found' | state(). @@ -121,8 +120,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args, - IsEmpty, State = #state{consumers = Consumers}) -> +add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty, + State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -131,7 +130,7 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args, end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( - case CreditArgs of + case parse_credit_args(Args) of none -> C1; {credit, Credit, Drain} -> @@ -146,6 +145,19 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. +parse_credit_args(Args) -> + case rabbit_misc:table_lookup(Args, <<"x-credit">>) of + {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>), + rabbit_misc:table_lookup(T, <<"drain">>)} of + {{long, C}, {bool, D}} -> {credit, C, D}; + _ -> none + end; + undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of + {_, Prefetch} -> {prefetch, Prefetch}; + _ -> none + end + end. + remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) -> case lookup_ch(ChPid) of not_found -> |