summaryrefslogtreecommitdiff
path: root/src/rabbit_queue_consumers.erl
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-04 14:29:23 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-04 14:29:23 +0000
commitb2e24166a4b22c0794a2e058f1cfca98829bc811 (patch)
treeba5336889639f7f5252c46c6f048af2a760a9443 /src/rabbit_queue_consumers.erl
parent75715c5369f5ec5a71423cb2f701e5ec9ad6bf24 (diff)
downloadrabbitmq-server-b2e24166a4b22c0794a2e058f1cfca98829bc811.tar.gz
Management information on per-consumer prefetch
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r--src/rabbit_queue_consumers.erl16
1 files changed, 9 insertions, 7 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 2086c856..47b4ef3a 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -32,7 +32,7 @@
-record(state, {consumers, use}).
--record(consumer, {tag, ack_required, args}).
+-record(consumer, {tag, ack_required, prefetch, args}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -66,7 +66,7 @@
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
-spec inactive(state()) -> boolean().
-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}].
+ non_neg_integer(), rabbit_framing:amqp_table()}].
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
@@ -114,8 +114,9 @@ all(#state{consumers = Consumers}) ->
consumers(Consumers, Acc) ->
priority_queue:fold(
fun ({ChPid, Consumer}, _P, Acc1) ->
- #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
- [{ChPid, CTag, Ack, Args} | Acc1]
+ #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
+ args = Args} = Consumer,
+ [{ChPid, CTag, Ack, Prefetch, Args} | Acc1]
end, Acc, Consumers).
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
@@ -123,8 +124,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args,
- IsEmpty, State = #state{consumers = Consumers}) ->
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
+ State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -133,7 +134,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args,
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
- case parse_credit_args(ConsumerPrefetchCount, Args) of
+ case parse_credit_args(Prefetch, Args) of
none -> C1;
{0, auto} -> C1;
{_Credit, auto} when NoAck -> C1;
@@ -142,6 +143,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args,
end),
Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
+ prefetch = Prefetch,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.