diff options
author | Matthias Radestock <matthias@lshift.net> | 2010-02-04 03:50:20 +0000 |
---|---|---|
committer | Matthias Radestock <matthias@lshift.net> | 2010-02-04 03:50:20 +0000 |
commit | c84b88f5ab255b801af90b1cfc7b329bf3180615 (patch) | |
tree | fdea6baa99518df109cf3d7f9d03b1d2ead6dd2d | |
parent | 88753611a142abdbed87a3b84c1080f8e1ee2121 (diff) | |
parent | 3445b1a0147b8d61a0e2dce4134bff575ef30965 (diff) | |
download | rabbitmq-server-c84b88f5ab255b801af90b1cfc7b329bf3180615.tar.gz |
merge bug21966 into bug22300
-rw-r--r-- | docs/rabbitmqctl.1.pod | 8 | ||||
-rw-r--r-- | src/rabbit_channel.erl | 19 | ||||
-rw-r--r-- | src/rabbit_control.erl | 13 |
3 files changed, 39 insertions, 1 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod index 8921f7d7..3f1ec3b4 100644 --- a/docs/rabbitmqctl.1.pod +++ b/docs/rabbitmqctl.1.pod @@ -427,6 +427,14 @@ QoS prefetch count limit in force, 0 if unlimited =back +=item list_consumers + +List consumers, i.e. subscriptions to a queue's message stream. Each +line printed shows, separated by tab characters, the id of the channel +process via which the subscription was created and is managed, the +consumer tag which uniquely identifies the subscription within a +channel, and the name of the queue subscribed to. + The list_queues, list_exchanges and list_bindings commands accept an optional virtual host parameter for which to display results, defaulting to I<"/">. The default can be overridden with the B<-p> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d0b0ee91..0dbcf115 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -37,7 +37,8 @@ -export([start_link/5, do/2, do/3, shutdown/1]). -export([send_command/2, deliver/4, conserve_memory/2]). --export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]). +-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1, + consumers/1, consumers_all/0]). -export([init/1, terminate/2, code_change/3, handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]). @@ -82,6 +83,8 @@ -spec(info/2 :: (pid(), [info_key()]) -> [info()]). -spec(info_all/0 :: () -> [[info()]]). -spec(info_all/1 :: ([info_key()]) -> [[info()]]). +-spec(consumers/1 :: (pid()) -> [{binary(), queue_name()}]). +-spec(consumers_all/0 :: () -> [{pid(), binary(), queue_name()}]). -endif. @@ -131,6 +134,16 @@ info_all() -> info_all(Items) -> rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()). +consumers(Pid) -> + gen_server2:pcall(Pid, 9, consumers, infinity). + +consumers_all() -> + lists:concat( + rabbit_misc:filter_exit_map( + fun (C) -> + [{C, Tag, QueueName} || {Tag, QueueName} <- consumers(C)] + end, list())). + %%--------------------------------------------------------------------------- init([Channel, ReaderPid, WriterPid, Username, VHost]) -> @@ -164,6 +177,10 @@ handle_call({info, Items}, _From, State) -> catch Error -> reply({error, Error}, State) end; +handle_call(consumers, _From, + State = #ch{consumer_mapping = ConsumerMapping}) -> + reply(dict:to_list(ConsumerMapping), State); + handle_call(_Request, _From, State) -> noreply(State). diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl index 03520002..c8a8d599 100644 --- a/src/rabbit_control.erl +++ b/src/rabbit_control.erl @@ -159,6 +159,7 @@ Available commands: list_bindings [-p <VHostPath>] list_connections [<ConnectionInfoItem> ...] list_channels [<ChannelInfoItem> ...] + list_consumers Quiet output mode is selected with the \"-q\" flag. Informational messages are suppressed when quiet mode is in effect. @@ -197,6 +198,9 @@ number, user, vhost, transactional, consumer_count, messages_unacknowledged, prefetch_count]. The default is to display pid, user, transactional, consumer_count, messages_unacknowledged. +The output format for \"list_consumers\" is a list of rows containing +the channel process id, consumer tag and queue name, in that order. + "), halt(1). @@ -314,6 +318,15 @@ action(list_channels, Node, Args, Inform) -> display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]), ArgAtoms); +action(list_consumers, Node, [], Inform) -> + Inform("Listing consumers", []), + InfoKeys = [channel_pid, consumer_tag, queue_name], + display_info_list( + [lists:zip(InfoKeys, tuple_to_list(X)) || + X <- rpc_call(Node, rabbit_channel, consumers_all, [])], + InfoKeys), + ok; + action(Command, Node, Args, Inform) -> {VHost, RemainingArgs} = parse_vhost_flag(Args), action(Command, Node, VHost, RemainingArgs, Inform). |