summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2014-01-22 15:05:39 +0000
committerMatthias Radestock <matthias@rabbitmq.com>2014-01-22 15:05:39 +0000
commitd7b0e43af6060abf3e477b71b1137dff8889c6cb (patch)
treee598f7e50c6990c5bba853444f3f690fe9ed1362 /src/rabbit_queue_consumers.erl
parentaaec8e3c296797cc0c83a2ab2e408111df6f078e (diff)
downloadrabbitmq-server-d7b0e43af6060abf3e477b71b1137dff8889c6cb.tar.gz
refactor: basic_consume/10 -> /9
cherry-picked from bug24297
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl24
1 files changed, 16 insertions, 8 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 908e4783..dae8815e 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -17,7 +17,7 @@
-module(rabbit_queue_consumers).
-export([new/0, max_active_priority/1, inactive/1, all/1, count/0,
- unacknowledged_message_count/0, add/9, remove/3, erase_ch/2,
+ unacknowledged_message_count/0, add/8, remove/3, erase_ch/2,
send_drained/0, deliver/3, record_ack/3, subtract_acks/2,
possibly_unblock/3,
resume_fun/0, notify_sent_fun/1, activate_limit_fun/0,
@@ -57,7 +57,6 @@
-type ch() :: pid().
-type ack() :: non_neg_integer().
-type cr_fun() :: fun ((#cr{}) -> #cr{}).
--type credit_args() :: {non_neg_integer(), boolean()} | 'none'.
-type fetch_result() :: {rabbit_types:basic_message(), boolean(), ack()}.
-spec new() -> state().
@@ -68,8 +67,7 @@
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
- credit_args(), rabbit_framing:amqp_table(), boolean(),
- state()) -> state().
+ rabbit_framing:amqp_table(), boolean(), state()) -> state().
-spec remove(ch(), rabbit_types:ctag(), state()) ->
'not_found' | state().
-spec erase_ch(ch(), state()) ->
@@ -120,8 +118,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
- IsEmpty, State = #state{consumers = Consumers}) ->
+add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, Args, IsEmpty,
+ State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -129,14 +127,14 @@ add(ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, CreditArgs, OtherArgs,
false -> Limiter
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
- update_ch_record(case CreditArgs of
+ update_ch_record(case parse_credit_args(Args) of
none -> C1;
{Crd, Drain} -> credit_and_drain(
C1, ConsumerTag, Crd, Drain, IsEmpty)
end),
Consumer = #consumer{tag = ConsumerTag,
ack_required = not NoAck,
- args = OtherArgs},
+ args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.
remove(ChPid, ConsumerTag, State = #state{consumers = Consumers}) ->
@@ -324,6 +322,16 @@ utilisation(#state{use = {inactive, Since, Active, Avg}}) ->
%%----------------------------------------------------------------------------
+parse_credit_args(Arguments) ->
+ case rabbit_misc:table_lookup(Arguments, <<"x-credit">>) of
+ {table, T} -> case {rabbit_misc:table_lookup(T, <<"credit">>),
+ rabbit_misc:table_lookup(T, <<"drain">>)} of
+ {{long, Credit}, {bool, Drain}} -> {Credit, Drain};
+ _ -> none
+ end;
+ undefined -> none
+ end.
+
lookup_ch(ChPid) ->
case get({ch, ChPid}) of
undefined -> not_found;