diff options
author | Simon MacMullen <simon@rabbitmq.com> | 2013-08-21 17:05:03 +0100 |
---|---|---|
committer | Simon MacMullen <simon@rabbitmq.com> | 2013-08-21 17:05:03 +0100 |
commit | 5a38b5e837c68ea8c86acd3edc2bc5b932271b5f (patch) | |
tree | 5c767a2f99b7bdae4d04e907b390a953566280fc | |
parent | 968d65b72c0734b7552aad7225e33646847786c6 (diff) | |
download | rabbitmq-server-bug25725.tar.gz |
Add consumer arguments to events and rabbitmqctl.bug25725
-rw-r--r-- | docs/rabbitmqctl.1.xml | 10 | ||||
-rw-r--r-- | src/rabbit_amqqueue.erl | 6 | ||||
-rw-r--r-- | src/rabbit_amqqueue_process.erl | 23 |
3 files changed, 19 insertions, 20 deletions
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml index b2361cde..d7c93924 100644 --- a/docs/rabbitmqctl.1.xml +++ b/docs/rabbitmqctl.1.xml @@ -1618,14 +1618,10 @@ characters, the name of the queue subscribed to, the id of the channel process via which the subscription was created and is managed, the consumer tag which uniquely identifies - the subscription within a channel, and a boolean + the subscription within a channel, a boolean indicating whether acknowledgements are expected for - messages delivered to this consumer. - </para> - <para> - The output is a list of rows containing, in order, the queue name, - channel process id, consumer tag, and a boolean indicating whether - acknowledgements are expected from the consumer. + messages delivered to this consumer, and any arguments for this + consumer. </para> </listitem> </varlistentry> diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 10f97afd..0673ff8e 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -184,7 +184,7 @@ %%---------------------------------------------------------------------------- -define(CONSUMER_INFO_KEYS, - [queue_name, channel_pid, consumer_tag, ack_required]). + [queue_name, channel_pid, consumer_tag, ack_required, arguments]). recover() -> %% Clear out remnants of old incarnation, in case we restarted @@ -512,8 +512,8 @@ consumers_all(VHostPath) -> map(VHostPath, fun (Q) -> [lists:zip(ConsumerInfoKeys, - [Q#amqqueue.name, ChPid, ConsumerTag, AckRequired]) || - {ChPid, ConsumerTag, AckRequired} <- consumers(Q)] + [Q#amqqueue.name, ChPid, CTag, AckRequired, Args]) || + {ChPid, CTag, AckRequired, Args} <- consumers(Q)] end)). stat(#amqqueue{pid = QPid}) -> delegate:call(QPid, stat). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 547efa45..972e6be0 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1012,8 +1012,9 @@ consumers(#q{active_consumers = ActiveConsumers}) -> consumers(Consumers, Acc) -> priority_queue:fold( - fun ({ChPid, #consumer{tag = CTag, ack_required = AckReq}}, _P, Acc1) -> - [{ChPid, CTag, AckReq} | Acc1] + fun ({ChPid, Consumer}, _P, Acc1) -> + #consumer{tag = CTag, ack_required = Ack, args = Args} = Consumer, + [{ChPid, CTag, Ack, Args} | Acc1] end, Acc, Consumers). emit_stats(State) -> @@ -1022,13 +1023,14 @@ emit_stats(State) -> emit_stats(State, Extra) -> rabbit_event:notify(queue_stats, Extra ++ infos(?STATISTICS_KEYS, State)). -emit_consumer_created(ChPid, ConsumerTag, Exclusive, AckRequired, QName) -> +emit_consumer_created(ChPid, CTag, Exclusive, AckRequired, QName, Args) -> rabbit_event:notify(consumer_created, - [{consumer_tag, ConsumerTag}, + [{consumer_tag, CTag}, {exclusive, Exclusive}, {ack_required, AckRequired}, {channel, ChPid}, - {queue, QName}]). + {queue, QName}, + {arguments, Args}]). emit_consumer_deleted(ChPid, ConsumerTag, QName) -> rabbit_event:notify(consumer_deleted, @@ -1176,7 +1178,7 @@ handle_call({basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, exclusive_consumer = ExclusiveConsumer}, ok = maybe_send_reply(ChPid, OkMsg), emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume, - not NoAck, qname(State1)), + not NoAck, qname(State1), OtherArgs), AC1 = add_consumer({ChPid, Consumer}, State1#q.active_consumers), State2 = State1#q{active_consumers = AC1}, reply(ok, run_message_queue(State2)) @@ -1275,10 +1277,11 @@ handle_call(force_event_refresh, _From, QName = qname(State), case Exclusive of none -> [emit_consumer_created( - Ch, CTag, false, AckRequired, QName) || - {Ch, CTag, AckRequired} <- consumers(State)]; - {Ch, CTag} -> [{Ch, CTag, AckRequired}] = consumers(State), - emit_consumer_created(Ch, CTag, true, AckRequired, QName) + Ch, CTag, false, AckRequired, QName, Args) || + {Ch, CTag, AckRequired, Args} <- consumers(State)]; + {Ch, CTag} -> [{Ch, CTag, AckRequired, Args}] = consumers(State), + emit_consumer_created( + Ch, CTag, true, AckRequired, QName, Args) end, reply(ok, State). |