diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 14:29:23 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 14:29:23 +0000 |
commit | b2e24166a4b22c0794a2e058f1cfca98829bc811 (patch) | |
tree | ba5336889639f7f5252c46c6f048af2a760a9443 /src/rabbit_queue_consumers.erl | |
parent | 75715c5369f5ec5a71423cb2f701e5ec9ad6bf24 (diff) | |
download | rabbitmq-server-b2e24166a4b22c0794a2e058f1cfca98829bc811.tar.gz |
Management information on per-consumer prefetch
Diffstat (limited to 'src/rabbit_queue_consumers.erl')
-rw-r--r-- | src/rabbit_queue_consumers.erl | 16 |
1 files changed, 9 insertions, 7 deletions
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 2086c856..47b4ef3a 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -32,7 +32,7 @@ -record(state, {consumers, use}). --record(consumer, {tag, ack_required, args}). +-record(consumer, {tag, ack_required, prefetch, args}). %% These are held in our process dictionary -record(cr, {ch_pid, @@ -66,7 +66,7 @@ -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. -spec inactive(state()) -> boolean(). -spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]. + non_neg_integer(), rabbit_framing:amqp_table()}]. -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), @@ -114,8 +114,9 @@ all(#state{consumers = Consumers}) -> consumers(Consumers, Acc) -> priority_queue:fold( fun ({ChPid, Consumer}, _P, Acc1) -> - #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, - [{ChPid, CTag, Ack, Args} | Acc1] + #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch, + args = Args} = Consumer, + [{ChPid, CTag, Ack, Prefetch, Args} | Acc1] end, Acc, Consumers). count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). @@ -123,8 +124,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args, - IsEmpty, State = #state{consumers = Consumers}) -> +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, + State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -133,7 +134,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args, end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( - case parse_credit_args(ConsumerPrefetchCount, Args) of + case parse_credit_args(Prefetch, Args) of none -> C1; {0, auto} -> C1; {_Credit, auto} when NoAck -> C1; @@ -142,6 +143,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args, end), Consumer = #consumer{tag = CTag, ack_required = not NoAck, + prefetch = Prefetch, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. |