summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2014-03-04 14:29:23 +0000
committerSimon MacMullen <simon@rabbitmq.com>2014-03-04 14:29:23 +0000
commitb2e24166a4b22c0794a2e058f1cfca98829bc811 (patch)
treeba5336889639f7f5252c46c6f048af2a760a9443
parent75715c5369f5ec5a71423cb2f701e5ec9ad6bf24 (diff)
downloadrabbitmq-server-b2e24166a4b22c0794a2e058f1cfca98829bc811.tar.gz
Management information on per-consumer prefetch
-rw-r--r--docs/rabbitmqctl.1.xml5
-rw-r--r--src/rabbit_amqqueue.erl15
-rw-r--r--src/rabbit_amqqueue_process.erl37
-rw-r--r--src/rabbit_queue_consumers.erl16
4 files changed, 42 insertions, 31 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index a7e42503..bd4563fc 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -1604,8 +1604,9 @@
and is managed, the consumer tag which uniquely identifies
the subscription within a channel, a boolean
indicating whether acknowledgements are expected for
- messages delivered to this consumer, and any arguments for this
- consumer.
+ messages delivered to this consumer, an integer indicating
+ the prefetch limit (with 0 meaning 'none'), and any arguments
+ for this consumer.
</para>
</listitem>
</varlistentry>
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 2bf5c840..58a07e81 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -114,11 +114,12 @@
-spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok').
-spec(consumers/1 :: (rabbit_types:amqqueue())
-> [{pid(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}]).
+ non_neg_integer(), rabbit_framing:amqp_table()}]).
-spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()).
-spec(consumers_all/1 ::
(rabbit_types:vhost())
- -> [{name(), pid(), rabbit_types:ctag(), boolean()}]).
+ -> [{name(), pid(), rabbit_types:ctag(), boolean(),
+ non_neg_integer(), rabbit_framing:amqp_table()}]).
-spec(stat/1 ::
(rabbit_types:amqqueue())
-> {'ok', non_neg_integer(), non_neg_integer()}).
@@ -185,7 +186,8 @@
%%----------------------------------------------------------------------------
-define(CONSUMER_INFO_KEYS,
- [queue_name, channel_pid, consumer_tag, ack_required, arguments]).
+ [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count,
+ arguments]).
recover() ->
%% Clear out remnants of old incarnation, in case we restarted
@@ -533,9 +535,10 @@ consumers_all(VHostPath) ->
lists:append(
map(VHostPath,
fun (Q) ->
- [lists:zip(ConsumerInfoKeys,
- [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) ||
- {ChPid, CTag, AckRequired, Args} <- consumers(Q)]
+ [lists:zip(
+ ConsumerInfoKeys,
+ [Q#amqqueue.name, ChPid, CTag, AckRequired, Prefetch, Args]) ||
+ {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)]
end)).
stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat).
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 97a9c312..d415b358 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -315,7 +315,7 @@ terminate_shutdown(Fun, State) ->
QName = qname(State),
notify_decorators(shutdown, State),
[emit_consumer_deleted(Ch, CTag, QName) ||
- {Ch, CTag, _, _} <-
+ {Ch, CTag, _, _, _} <-
rabbit_queue_consumers:all(Consumers)],
State1#q{backing_queue_state = Fun(BQS)}
end.
@@ -826,14 +826,16 @@ emit_stats(State, Extra) ->
not lists:member(K, ExtraKs)],
rabbit_event:notify(queue_stats, Extra ++ Infos).
-emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) ->
+emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName,
+ PrefetchCount, Args, Ref) ->
rabbit_event:notify(consumer_created,
- [{consumer_tag, CTag},
- {exclusive, Exclusive},
- {ack_required, AckRequired},
- {channel, ChPid},
- {queue, QName},
- {arguments, Args}],
+ [{consumer_tag, CTag},
+ {exclusive, Exclusive},
+ {ack_required, AckRequired},
+ {channel, ChPid},
+ {queue, QName},
+ {prefetch_count, PrefetchCount},
+ {arguments, Args}],
Ref).
emit_consumer_deleted(ChPid, ConsumerTag, QName) ->
@@ -961,7 +963,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From,
end;
handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
+ PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg},
_From, State = #q{consumers = Consumers,
exclusive_consumer = Holder}) ->
case check_exclusive_access(Holder, ExclusiveConsume, State) of
@@ -969,8 +971,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
ok -> Consumers1 = rabbit_queue_consumers:add(
ChPid, ConsumerTag, NoAck,
LimiterPid, LimiterActive,
- ConsumerPrefetchCount,
- Args, is_empty(State), Consumers),
+ PrefetchCount, Args, is_empty(State),
+ Consumers),
ExclusiveConsumer =
if ExclusiveConsume -> {ChPid, ConsumerTag};
true -> Holder
@@ -980,7 +982,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, qname(State1), Args, none),
+ not NoAck, qname(State1),
+ PrefetchCount, Args, none),
notify_decorators(State1),
reply(ok, run_message_queue(State1))
end;
@@ -1069,11 +1072,13 @@ handle_call({force_event_refresh, Ref}, _From,
AllConsumers = rabbit_queue_consumers:all(Consumers),
case Exclusive of
none -> [emit_consumer_created(
- Ch, CTag, false, AckRequired, QName, Args, Ref) ||
- {Ch, CTag, AckRequired, Args} <- AllConsumers];
- {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers,
+ Ch, CTag, false, AckRequired, QName, Prefetch,
+ Args, Ref) ||
+ {Ch, CTag, AckRequired, Prefetch, Args}
+ <- AllConsumers];
+ {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers,
emit_consumer_created(
- Ch, CTag, true, AckRequired, QName, Args, Ref)
+ Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref)
end,
reply(ok, State).
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 2086c856..47b4ef3a 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -32,7 +32,7 @@
-record(state, {consumers, use}).
--record(consumer, {tag, ack_required, args}).
+-record(consumer, {tag, ack_required, prefetch, args}).
%% These are held in our process dictionary
-record(cr, {ch_pid,
@@ -66,7 +66,7 @@
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
-spec inactive(state()) -> boolean().
-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
- rabbit_framing:amqp_table()}].
+ non_neg_integer(), rabbit_framing:amqp_table()}].
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
@@ -114,8 +114,9 @@ all(#state{consumers = Consumers}) ->
consumers(Consumers, Acc) ->
priority_queue:fold(
fun ({ChPid, Consumer}, _P, Acc1) ->
- #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer,
- [{ChPid, CTag, Ack, Args} | Acc1]
+ #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch,
+ args = Args} = Consumer,
+ [{ChPid, CTag, Ack, Prefetch, Args} | Acc1]
end, Acc, Consumers).
count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
@@ -123,8 +124,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]).
unacknowledged_message_count() ->
lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]).
-add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args,
- IsEmpty, State = #state{consumers = Consumers}) ->
+add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty,
+ State = #state{consumers = Consumers}) ->
C = #cr{consumer_count = Count,
limiter = Limiter} = ch_record(ChPid, LimiterPid),
Limiter1 = case LimiterActive of
@@ -133,7 +134,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args,
end,
C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1},
update_ch_record(
- case parse_credit_args(ConsumerPrefetchCount, Args) of
+ case parse_credit_args(Prefetch, Args) of
none -> C1;
{0, auto} -> C1;
{_Credit, auto} when NoAck -> C1;
@@ -142,6 +143,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args,
end),
Consumer = #consumer{tag = CTag,
ack_required = not NoAck,
+ prefetch = Prefetch,
args = Args},
State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}.