summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-04 03:50:20 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-04 03:50:20 +0000
commitc84b88f5ab255b801af90b1cfc7b329bf3180615 (patch)
treefdea6baa99518df109cf3d7f9d03b1d2ead6dd2d
parent88753611a142abdbed87a3b84c1080f8e1ee2121 (diff)
parent3445b1a0147b8d61a0e2dce4134bff575ef30965 (diff)
downloadrabbitmq-server-c84b88f5ab255b801af90b1cfc7b329bf3180615.tar.gz
merge bug21966 into bug22300
-rw-r--r--docs/rabbitmqctl.1.pod8
-rw-r--r--src/rabbit_channel.erl19
-rw-r--r--src/rabbit_control.erl13
3 files changed, 39 insertions, 1 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 8921f7d7..3f1ec3b4 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -427,6 +427,14 @@ QoS prefetch count limit in force, 0 if unlimited
=back
+=item list_consumers
+
+List consumers, i.e. subscriptions to a queue's message stream. Each
+line printed shows, separated by tab characters, the id of the channel
+process via which the subscription was created and is managed, the
+consumer tag which uniquely identifies the subscription within a
+channel, and the name of the queue subscribed to.
+
The list_queues, list_exchanges and list_bindings commands accept an
optional virtual host parameter for which to display results,
defaulting to I<"/">. The default can be overridden with the B<-p>
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d0b0ee91..0dbcf115 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -37,7 +37,8 @@
-export([start_link/5, do/2, do/3, shutdown/1]).
-export([send_command/2, deliver/4, conserve_memory/2]).
--export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1]).
+-export([list/0, info_keys/0, info/1, info/2, info_all/0, info_all/1,
+ consumers/1, consumers_all/0]).
-export([init/1, terminate/2, code_change/3,
handle_call/3, handle_cast/2, handle_info/2, handle_pre_hibernate/1]).
@@ -82,6 +83,8 @@
-spec(info/2 :: (pid(), [info_key()]) -> [info()]).
-spec(info_all/0 :: () -> [[info()]]).
-spec(info_all/1 :: ([info_key()]) -> [[info()]]).
+-spec(consumers/1 :: (pid()) -> [{binary(), queue_name()}]).
+-spec(consumers_all/0 :: () -> [{pid(), binary(), queue_name()}]).
-endif.
@@ -131,6 +134,16 @@ info_all() ->
info_all(Items) ->
rabbit_misc:filter_exit_map(fun (C) -> info(C, Items) end, list()).
+consumers(Pid) ->
+ gen_server2:pcall(Pid, 9, consumers, infinity).
+
+consumers_all() ->
+ lists:concat(
+ rabbit_misc:filter_exit_map(
+ fun (C) ->
+ [{C, Tag, QueueName} || {Tag, QueueName} <- consumers(C)]
+ end, list())).
+
%%---------------------------------------------------------------------------
init([Channel, ReaderPid, WriterPid, Username, VHost]) ->
@@ -164,6 +177,10 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(consumers, _From,
+ State = #ch{consumer_mapping = ConsumerMapping}) ->
+ reply(dict:to_list(ConsumerMapping), State);
+
handle_call(_Request, _From, State) ->
noreply(State).
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index 03520002..c8a8d599 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -159,6 +159,7 @@ Available commands:
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
list_channels [<ChannelInfoItem> ...]
+ list_consumers
Quiet output mode is selected with the \"-q\" flag. Informational
messages are suppressed when quiet mode is in effect.
@@ -197,6 +198,9 @@ number, user, vhost, transactional, consumer_count,
messages_unacknowledged, prefetch_count]. The default is to display
pid, user, transactional, consumer_count, messages_unacknowledged.
+The output format for \"list_consumers\" is a list of rows containing
+the channel process id, consumer tag and queue name, in that order.
+
"),
halt(1).
@@ -314,6 +318,15 @@ action(list_channels, Node, Args, Inform) ->
display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
ArgAtoms);
+action(list_consumers, Node, [], Inform) ->
+ Inform("Listing consumers", []),
+ InfoKeys = [channel_pid, consumer_tag, queue_name],
+ display_info_list(
+ [lists:zip(InfoKeys, tuple_to_list(X)) ||
+ X <- rpc_call(Node, rabbit_channel, consumers_all, [])],
+ InfoKeys),
+ ok;
+
action(Command, Node, Args, Inform) ->
{VHost, RemainingArgs} = parse_vhost_flag(Args),
action(Command, Node, VHost, RemainingArgs, Inform).