summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-01-15 13:37:42 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-01-15 13:37:42 +0000
commitc7a01e7829f4dcb72610c69de4be2e78e3480a99 (patch)
treee142744cddbd6a125312af763b1dabb8e48a843e /src/rabbit_queue_consumers.erl
parent47d59e307b523862aa1aaa3990c93d150d87726c (diff)
downloadrabbitmq-server-c7a01e7829f4dcb72610c69de4be2e78e3480a99.tar.gz
Move credit args parsing into rabbit_queue_consumers, to reduce some nasty arities and increase locality.
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl24
1 files changed, 18 insertions, 6 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index c410a099..f864a5ba 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,7 +17,7 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/4,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
@@ -57,7 +57,6 @@
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
--type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
-spec new() -> state().
@@ -68,7 +67,7 @@
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- credit_args(), rabbit_framing:amqp_table(), boolean(),
+ rabbit_framing:amqp_table(), boolean(),
state()) -> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
@@ -121,8 +120,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args,
- IsEmpty, State = #state{consumers = Consumers}) ->
+add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+ State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -131,7 +130,7 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args,
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
- case CreditArgs of
+ case parse_credit_args(Args) of
none ->
C1;
{credit, Credit, Drain} ->
@@ -146,6 +145,19 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, Args,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
+parse_credit_args(Args) ->
+ case rabbit_misc:table_lookup(Args, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, C}, {bool, D}} -> {credit, C, D};
+ _ -> none
+ end;
+ undefined -> case rabbit_misc:table_lookup(Args, <<"x-prefetch">>) of
+ {_, Prefetch} -> {prefetch, Prefetch};
+ _ -> none
+ end
+ end.
+
remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
case lookup_ch(ChPid) of
not_found ->