summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-02-04 19:09:46 +0000
committerMatthias Radestock <matthias@lshift.net>2010-02-04 19:09:46 +0000
commit41a0a4ffa30e66a873e1a97c5108069391d8d662 (patch)
tree3f20e3f5231aa5a294aa6821365a280708916d17
parent60fad080c4d98bbadeae1d02b9ee7397bacfd834 (diff)
downloadrabbitmq-server-41a0a4ffa30e66a873e1a97c5108069391d8d662.tar.gz
move consumer listing from channel to queue
The main motivation here is that queues, not channels, are the logical home of subscriptions. And rabbit plug-ins and other extensions may bypass channels when subscribing to queues. The queues also know more about consumers than channels, namely whether a consumer requires acks. So this additional info we can now display too. And with the switch to queues the listing is now scoped by vhost.
-rw-r--r--docs/rabbitmqctl.1.pod22
-rw-r--r--src/rabbit_amqqueue.erl14
-rw-r--r--src/rabbit_amqqueue_process.erl9
-rw-r--r--src/rabbit_control.erl21
-rw-r--r--src/rabbit_tests.erl2
5 files changed, 48 insertions, 20 deletions
diff --git a/docs/rabbitmqctl.1.pod b/docs/rabbitmqctl.1.pod
index 3f1ec3b4..398ab613 100644
--- a/docs/rabbitmqctl.1.pod
+++ b/docs/rabbitmqctl.1.pod
@@ -430,15 +430,19 @@ QoS prefetch count limit in force, 0 if unlimited
=item list_consumers
List consumers, i.e. subscriptions to a queue's message stream. Each
-line printed shows, separated by tab characters, the id of the channel
-process via which the subscription was created and is managed, the
-consumer tag which uniquely identifies the subscription within a
-channel, and the name of the queue subscribed to.
-
-The list_queues, list_exchanges and list_bindings commands accept an
-optional virtual host parameter for which to display results,
-defaulting to I<"/">. The default can be overridden with the B<-p>
-flag.
+line printed shows, separated by tab characters, the name of the queue
+subscribed to, the id of the channel process via which the
+subscription was created and is managed, the consumer tag which
+uniquely identifies the subscription within a channel, and a boolean
+indicating whether acknowledgements are expected for messages
+delivered to this consumer.
+
+=back
+
+The list_queues, list_exchanges, list_bindings and list_consumers
+commands accept an optional virtual host parameter for which to
+display results, defaulting to I<"/">. The default can be overridden
+with the B<-p> flag.
=head1 OUTPUT ESCAPING
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index db7461b0..ea0eadc3 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -37,6 +37,7 @@
-export([lookup/1, with/2, with_or_die/2,
stat/1, stat_all/0, deliver/2, redeliver/2, requeue/3, ack/4]).
-export([list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2]).
+-export([consumers/1, consumers_all/1]).
-export([claim_queue/2]).
-export([basic_get/3, basic_consume/8, basic_cancel/4]).
-export([notify_sent/2, unblock/2]).
@@ -74,6 +75,9 @@
-spec(info/2 :: (amqqueue(), [info_key()]) -> [info()]).
-spec(info_all/1 :: (vhost()) -> [[info()]]).
-spec(info_all/2 :: (vhost(), [info_key()]) -> [[info()]]).
+-spec(consumers/1 :: (amqqueue()) -> [{pid(), ctag(), boolean()}]).
+-spec(consumers_all/1 ::
+ (vhost()) -> [{queue_name(), pid(), ctag(), boolean()}]).
-spec(stat/1 :: (amqqueue()) -> qstats()).
-spec(stat_all/0 :: () -> [qstats()]).
-spec(delete/3 ::
@@ -240,6 +244,16 @@ info_all(VHostPath) -> map(VHostPath, fun (Q) -> info(Q) end).
info_all(VHostPath, Items) -> map(VHostPath, fun (Q) -> info(Q, Items) end).
+consumers(#amqqueue{ pid = QPid }) ->
+ gen_server2:pcall(QPid, 9, consumers, infinity).
+
+consumers_all(VHostPath) ->
+ lists:concat(
+ map(VHostPath,
+ fun (Q) -> [{Q#amqqueue.name, ChPid, ConsumerTag, AckRequired} ||
+ {ChPid, ConsumerTag, AckRequired} <- consumers(Q)]
+ end)).
+
stat(#amqqueue{pid = QPid}) -> gen_server2:call(QPid, stat, infinity).
stat_all() ->
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 06e68a1b..94b89cd5 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -547,6 +547,15 @@ handle_call({info, Items}, _From, State) ->
catch Error -> reply({error, Error}, State)
end;
+handle_call(consumers, _From,
+ State = #q{active_consumers = ActiveConsumers,
+ blocked_consumers = BlockedConsumers}) ->
+ reply(rabbit_misc:queue_fold(
+ fun ({ChPid, #consumer{tag = ConsumerTag,
+ ack_required = AckRequired}}, Acc) ->
+ [{ChPid, ConsumerTag, AckRequired} | Acc]
+ end, [], queue:join(ActiveConsumers, BlockedConsumers)), State);
+
handle_call({deliver_immediately, Txn, Message, ChPid}, _From, State) ->
%% Synchronous, "immediate" delivery mode
%%
diff --git a/src/rabbit_control.erl b/src/rabbit_control.erl
index c8a8d599..208aec09 100644
--- a/src/rabbit_control.erl
+++ b/src/rabbit_control.erl
@@ -159,7 +159,7 @@ Available commands:
list_bindings [-p <VHostPath>]
list_connections [<ConnectionInfoItem> ...]
list_channels [<ChannelInfoItem> ...]
- list_consumers
+ list_consumers [-p <VHostPath>]
Quiet output mode is selected with the \"-q\" flag. Informational
messages are suppressed when quiet mode is in effect.
@@ -198,8 +198,10 @@ number, user, vhost, transactional, consumer_count,
messages_unacknowledged, prefetch_count]. The default is to display
pid, user, transactional, consumer_count, messages_unacknowledged.
-The output format for \"list_consumers\" is a list of rows containing
-the channel process id, consumer tag and queue name, in that order.
+The output format for \"list_consumers\" is a list of rows containing,
+in order, the queue name, channel process id, consumer tag, and a
+boolean indicating whether acknowledgements are expected from the
+consumer.
"),
halt(1).
@@ -301,8 +303,7 @@ action(list_bindings, Node, Args, Inform) ->
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
X <- rpc_call(Node, rabbit_exchange, list_bindings, [VHostArg])],
- InfoKeys),
- ok;
+ InfoKeys);
action(list_connections, Node, Args, Inform) ->
Inform("Listing connections", []),
@@ -318,14 +319,14 @@ action(list_channels, Node, Args, Inform) ->
display_info_list(rpc_call(Node, rabbit_channel, info_all, [ArgAtoms]),
ArgAtoms);
-action(list_consumers, Node, [], Inform) ->
+action(list_consumers, Node, Args, Inform) ->
Inform("Listing consumers", []),
- InfoKeys = [channel_pid, consumer_tag, queue_name],
+ {VHostArg, _} = parse_vhost_flag_bin(Args),
+ InfoKeys = [queue_name, channel_pid, consumer_tag, ack_required],
display_info_list(
[lists:zip(InfoKeys, tuple_to_list(X)) ||
- X <- rpc_call(Node, rabbit_channel, consumers_all, [])],
- InfoKeys),
- ok;
+ X <- rpc_call(Node, rabbit_amqqueue, consumers_all, [VHostArg])],
+ InfoKeys);
action(Command, Node, Args, Inform) ->
{VHost, RemainingArgs} = parse_vhost_flag(Args),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 2a580b98..ad113687 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -31,7 +31,7 @@
-module(rabbit_tests).
--export([all_tests/0, test_parsing/0]).
+-export([all_tests/0, test_parsing/0, test_server_status/0]).
%% Exported so the hook mechanism can call back
-export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]).