diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 14:29:23 +0000 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2014-03-04 14:29:23 +0000 |
commit | b2e24166a4b22c0794a2e058f1cfca98829bc811 (patch) | |
tree | ba5336889639f7f5252c46c6f048af2a760a9443 | |
parent | 75715c5369f5ec5a71423cb2f701e5ec9ad6bf24 (diff) | |
download | rabbitmq-server-b2e24166a4b22c0794a2e058f1cfca98829bc811.tar.gz |
Management information on per-consumer prefetch
-rw-r--r-- | docs/rabbitmqctl.1.xml | 5 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 15 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 37 | ||||
-rw-r--r-- | src/rabbit_queue_consumers.erl | 16 |
4 files changed, 42 insertions, 31 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index a7e42503..bd4563fc 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1604,8 +1604,9 @@ and is managed, the consumer tag which uniquely identifies the subscription within a channel, a boolean indicating whether acknowledgements are expected for - messages delivered to this consumer, and any arguments for this - consumer. + messages delivered to this consumer, an integer indicating + the prefetch limit (with 0 meaning 'none'), and any arguments + for this consumer. </para> </listitem> </varlistentry> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 2bf5c840..58a07e81 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -114,11 +114,12 @@ -spec(notify_policy_changed/1 :: (rabbit_types:amqqueue()) -> 'ok'). -spec(consumers/1 :: (rabbit_types:amqqueue()) -> [{pid(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]). + non_neg_integer(), rabbit_framing:amqp_table()}]). -spec(consumer_info_keys/0 :: () -> rabbit_types:info_keys()). -spec(consumers_all/1 :: (rabbit_types:vhost()) - -> [{name(), pid(), rabbit_types:ctag(), boolean()}]). + -> [{name(), pid(), rabbit_types:ctag(), boolean(), + non_neg_integer(), rabbit_framing:amqp_table()}]). -spec(stat/1 :: (rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}). @@ -185,7 +186,8 @@ %%---------------------------------------------------------------------------- -define(CONSUMER_INFO_KEYS, - [queue_name, channel_pid, consumer_tag, ack_required, arguments]). + [queue_name, channel_pid, consumer_tag, ack_required, prefetch_count, + arguments]). recover() -> %% Clear out remnants of old incarnation, in case we restarted @@ -533,9 +535,10 @@ consumers_all(VHostPath) -> lists:append( map(VHostPath, fun (Q) -> - [lists:zip(ConsumerInfoKeys, - [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) || - {ChPid, CTag, AckRequired, Args} <- consumers(Q)] + [lists:zip( + ConsumerInfoKeys, + [Q#amqqueue.name, ChPid, CTag, AckRequired, Prefetch, Args]) || + {ChPid, CTag, AckRequired, Prefetch, Args} <- consumers(Q)] end)). stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 97a9c312..d415b358 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -315,7 +315,7 @@ terminate_shutdown(Fun, State) -> QName = qname(State), notify_decorators(shutdown, State), [emit_consumer_deleted(Ch, CTag, QName) || - {Ch, CTag, _, _} <- + {Ch, CTag, _, _, _} <- rabbit_queue_consumers:all(Consumers)], State1#q{backing_queue_state = Fun(BQS)} end. @@ -826,14 +826,16 @@ emit_stats(State, Extra) -> not lists:member(K, ExtraKs)], rabbit_event:notify(queue_stats, Extra ++ Infos). -emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args, Ref) -> +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, + PrefetchCount, Args, Ref) -> rabbit_event:notify(consumer_created, - [{consumer_tag, CTag}, - {exclusive, Exclusive}, - {ack_required, AckRequired}, - {channel, ChPid}, - {queue, QName}, - {arguments, Args}], + [{consumer_tag, CTag}, + {exclusive, Exclusive}, + {ack_required, AckRequired}, + {channel, ChPid}, + {queue, QName}, + {prefetch_count, PrefetchCount}, + {arguments, Args}], Ref). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> @@ -961,7 +963,7 @@ handle_call({basic_get, ChPid, NoAck, LimiterPid}, _From, end; handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg}, + PrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg}, _From, State = #q{consumers = Consumers, exclusive_consumer = Holder}) -> case check_exclusive_access(Holder, ExclusiveConsume, State) of @@ -969,8 +971,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, ok -> Consumers1 = rabbit_queue_consumers:add( ChPid, ConsumerTag, NoAck, LimiterPid, LimiterActive, - ConsumerPrefetchCount, - Args, is_empty(State), Consumers), + PrefetchCount, Args, is_empty(State), + Consumers), ExclusiveConsumer = if ExclusiveConsume -> {ChPid, ConsumerTag}; true -> Holder @@ -980,7 +982,8 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1), Args, none), + not NoAck, qname(State1), + PrefetchCount, Args, none), notify_decorators(State1), reply(ok, run_message_queue(State1)) end; @@ -1069,11 +1072,13 @@ handle_call({force_event_refresh, Ref}, _From, AllConsumers = rabbit_queue_consumers:all(Consumers), case Exclusive of none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName, Args, Ref) || - {Ch, CTag, AckRequired, Args} <- AllConsumers]; - {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = AllConsumers, + Ch, CTag, false, AckRequired, QName, Prefetch, + Args, Ref) || + {Ch, CTag, AckRequired, Prefetch, Args} + <- AllConsumers]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Prefetch, Args}] = AllConsumers, emit_consumer_created( - Ch, CTag, true, AckRequired, QName, Args, Ref) + Ch, CTag, true, AckRequired, QName, Prefetch, Args, Ref) end, reply(ok, State). diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index 2086c856..47b4ef3a 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -32,7 +32,7 @@ -record(state, {consumers, use}). --record(consumer, {tag, ack_required, args}). +-record(consumer, {tag, ack_required, prefetch, args}). %% These are held in our process dictionary -record(cr, {ch_pid, @@ -66,7 +66,7 @@ -spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'. -spec inactive(state()) -> boolean(). -spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(), - rabbit_framing:amqp_table()}]. + non_neg_integer(), rabbit_framing:amqp_table()}]. -spec count() -> non_neg_integer(). -spec unacknowledged_message_count() -> non_neg_integer(). -spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(), @@ -114,8 +114,9 @@ all(#state{consumers = Consumers}) -> consumers(Consumers, Acc) -> priority_queue:fold( fun ({ChPid, Consumer}, _P, Acc1) -> - #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, - [{ChPid, CTag, Ack, Args} | Acc1] + #consumer{tag = CTag, ack_required = Ack, prefetch = Prefetch, + args = Args} = Consumer, + [{ChPid, CTag, Ack, Prefetch, Args} | Acc1] end, Acc, Consumers). count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). @@ -123,8 +124,8 @@ count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). -add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args, - IsEmpty, State = #state{consumers = Consumers}) -> +add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, + State = #state{consumers = Consumers}) -> C = #cr{consumer_count = Count, limiter = Limiter} = ch_record(ChPid, LimiterPid), Limiter1 = case LimiterActive of @@ -133,7 +134,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args, end, C1 = C#cr{consumer_count = Count + 1, limiter = Limiter1}, update_ch_record( - case parse_credit_args(ConsumerPrefetchCount, Args) of + case parse_credit_args(Prefetch, Args) of none -> C1; {0, auto} -> C1; {_Credit, auto} when NoAck -> C1; @@ -142,6 +143,7 @@ add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, ConsumerPrefetchCount, Args, end), Consumer = #consumer{tag = CTag, ack_required = not NoAck, + prefetch = Prefetch, args = Args}, State#state{consumers = add_consumer({ChPid, Consumer}, Consumers)}. |